Possible Threading Issue

N

Nicholas

I have this class that provides a disk-based queue for storing various
items. As items are queued the event delegate is called and the consumer
dequeues the object and processes it. This seems to work off and on.
Sometimes the queue does not pick up a queued item until another item is
queued. Can anyone see a problem with this class?

***
Here's the interface and EventArgs...
***
/// <summary>
/// Event called when items are queued.
/// </summary>
public delegate void ItemQueuedEventHandler(object sender,
ItemQueuedEventArgs e);
/// <summary>
/// Item queued event data.
/// </summary>
public class ItemQueuedEventArgs : System.EventArgs
{
internal ItemQueuedEventArgs(int count) : base()
{
this._count = count;
}
/// <summary>
/// Count of items in the queue.
/// </summary>
public int Count
{
get { return this._count; }
} int _count = 0;
}
/// <summary>
/// Base definition of queue types.
/// </summary>
public interface IQueue : IDisposable
{
/// <summary>
/// Current count of all items in the queue.
/// </summary>
int Count { get; }
/// <summary>
/// Removes all objects in the queue.
/// </summary>
void Clear();
/// <summary>
/// Adds an object to the queue.
/// </summary>
/// <param name="obj">The object to add.</param>
void Enqueue(object obj);
/// <summary>
/// Removed an item from the queue.
/// </summary>
/// <returns>The first object from the top of the queue.</returns>
object Dequeue();
/// <summary>
/// Event called when items are queued.
/// </summary>
event ItemQueuedEventHandler ItemQueued;
}

***
Here's the class...
***
using System;
/// <summary>
/// Provides an event enabled disk-based queue.
/// </summary>
public class DiskQueue : IQueue
{
/// <summary>
/// Event called when new items are queued.
/// </summary>
public event ItemQueuedEventHandler ItemQueued;
/// <summary>
/// Extension appended to all files in the queue.
/// </summary>
public readonly string Extension = ".q";
System.IO.FileSystemWatcher _fsw = null;
static readonly object diskQueueLock = new object();
/// <summary>
/// Creates a new instance of the DiskQueue class.
/// </summary>
/// <remarks>
/// All items queued will be serialized with a binary formatter.
/// You can retrieve the queue location from the Path property.
/// </remarks>
public DiskQueue()
{
this.Path = this._path;
}
/// <summary>
/// Creates a new instance of the DiskQueue class.
/// </summary>
/// <param name="path">The path to place queued items.</param>
/// <remarks>All items queued will be serialized with a binary
formatter.</remarks>
public DiskQueue(string path)
{
this.Path = path;
}
/// <summary>
/// Creates a new instance of the DiskQueue class.
/// </summary>
/// <param name="formatter">The formatter to use for serialization.</param>
/// <param name="path">The path to place queued items.</param>
public DiskQueue(string path, System.Runtime.Serialization.IFormatter
formatter)
{
this.Formatter = formatter;
this.Path = path;
}
/// <summary>
/// Gets an object to synchronize access to the collection.
/// </summary>
public object SyncRoot
{
get { return diskQueueLock; }
}
/// <summary>
/// Gets or sets the formatter used to serialize queued items.
/// </summary>
/// <exception cref="System.ArgumentNullException">Formatter is
null.</exception>
/// <remarks>Be careful changing the formatter after instantiation. If there
are items
/// left in the queue that were formatted with the former they will throw
exceptions
/// on Dequeue.</remarks>
public System.Runtime.Serialization.IFormatter Formatter
{
get { return this._formatter; }
set
{
if (value == null)
throw new ArgumentNullException("Formatter");
this._formatter = value;
}
} System.Runtime.Serialization.IFormatter _formatter = new
System.Runtime.Serialization.Formatters.Binary.BinaryFormatter();
/// <summary>
/// Gets or sets the path to the queue folder.
/// </summary>
/// <exception cref="System.ArgumentNullException">Path is null.</exception>
public string Path
{
get { return this._path; }
set
{
if (value == null)
throw new ArgumentNullException("Path");
string path = value;
if (path.EndsWith("\\")) path = path.Substring(0, path.Length - 1);
this._path = path;
if (!System.IO.Directory.Exists(this.Path))
System.IO.Directory.CreateDirectory(this.Path);
if (this._fsw != null)
this._fsw.Path = path;
}
} string _path =
System.Environment.GetFolderPath(System.Environment.SpecialFolder.CommonAppl
icationData) + Guid.NewGuid().ToString();
/// <summary>
/// Get the number of items in the queue.
/// </summary>
public int Count
{
get
{
return System.IO.Directory.GetFiles(this.Path, "*" + this.Extension).Length;
}
}
/// <summary>
/// Begins watching the folder for queued items and raises an event when
items are queued.
/// </summary>
public void Start()
{
this._fsw = new System.IO.FileSystemWatcher(this.Path, "*" +
this.Extension);
this._fsw.EnableRaisingEvents = true;
this._fsw.Created += new System.IO.FileSystemEventHandler(_fsw_Created);
this._fsw.Changed += new System.IO.FileSystemEventHandler(_fsw_Created);
this._fsw.Renamed += new System.IO.RenamedEventHandler(_fsw_Renamed);
}
/// <summary>
/// Adds an item to the queue.
/// </summary>
/// <param name="graph">The object to queue.</param>
/// <exception cref="System.Exception">Could not enqueue the
item.</exception>
/// <exception
cref="System.Runtime.Serialization.SerializationException">Error serializing
the item.</exception>
/// <exception cref="System.ArgumentNullException"><c>graph</c> was
null.</exception>
public void Enqueue(object graph)
{
if (graph == null)
throw new ArgumentNullException("graph");
Exception e = null;
System.Threading.Thread.Sleep(10);
lock (diskQueueLock)
{
try
{
string file = this.Path + "\\" + DateTime.Now.Ticks.ToString();
System.IO.FileStream fs = System.IO.File.Create(file);
this.Formatter.Serialize(fs, graph);
fs.Close();
System.IO.File.Move(file, file + this.Extension);
}
catch (System.Runtime.Serialization.SerializationException sex)
{
e = sex;
}
catch (Exception ex)
{
e = ex;
}
}
if (e != null)
throw e;
}
/// <summary>
/// Removes an item from the queue.
/// </summary>
/// <returns>The deserialized object.</returns>
/// <exception cref="System.Exception">Could not dequeue the
item.</exception>
/// <exception
cref="System.Runtime.Serialization.SerializationException">Error
deserializing the item.</exception>
/// <exception cref="System.NullReferenceException">No items
queued.</exception>
public object Dequeue()
{
if (this.Count < 1)
throw new NullReferenceException("No items queued!");
Exception e = null;
object o = null;
lock (diskQueueLock)
{
try
{
string file = System.IO.Directory.GetFiles(this.Path, "*" +
this.Extension)[0];
System.IO.FileStream fs = System.IO.File.Open(file, System.IO.FileMode.Open,
System.IO.FileAccess.Read, System.IO.FileShare.Read);
o = this.Formatter.Deserialize(fs);
fs.Close();
System.IO.File.Delete(file);
}
catch (System.Runtime.Serialization.SerializationException sex)
{
e = sex;
}
catch (Exception ex)
{
e = ex;
}
}
if (e != null)
throw e;
else
return o;
}
/// <summary>
/// Clears all items in the queue.
/// </summary>
/// <exception cref="System.Exception">Error clearing the queue.</exception>
public void Clear()
{
Exception iex = null;
Exception e = null;
lock (diskQueueLock)
{
try
{
foreach (string file in System.IO.Directory.GetFiles(this.Path, "*" +
this.Extension))
{
try { System.IO.File.Delete(file); }
catch (Exception ex) { iex = ex; }
}
if (iex != null)
e = iex;
}
catch (Exception ex)
{
e = ex;
}
}
if (e != null)
throw e;
}
private void _fsw_Created(object sender, System.IO.FileSystemEventArgs e)
{
if (this.ItemQueued != null)
this.ItemQueued(typeof(DiskQueue).ToString(), new
ItemQueuedEventArgs(this.Count));
}
private void _fsw_Renamed(object sender, System.IO.RenamedEventArgs e)
{
this._fsw_Created(sender, e);
}
#region IDisposable Members
/// <summary>
/// Destroys this instance and releases all resources.
/// </summary>
public void Dispose()
{
try
{
this._formatter = null;
//this.Clear();
if (this._fsw != null)
{
this._fsw.Created -= new System.IO.FileSystemEventHandler(_fsw_Created);
this._fsw.Renamed -= new System.IO.RenamedEventHandler(_fsw_Renamed);
this._fsw.Dispose();
}
}
catch { /* ignored */ }
}
#endregion
}
}
 
D

Dave

I haven't investigated it to the point where I know why you observe that
behavior but there are a few questionable things in the code that I'll point
out in the code in method Enqueue (and similarly for the Dequeue method).

1. What is the purpose of the sleep statement? What happens if you remove
it, and what problem are you trying to solve with it?
My experience is that this usually is masking some other problem.

2. You should close the FileStream object in a finally block. The way the
code is structured is such that if the binary serializer ever throws an
exception the fs object's close method will never get called. This will leak
resources and perhaps cause other problems.

You can instead use (error handling omitted)...

try
{
using ( System.IO.FileStream fs = System.IO.File.Create(file) )
{
this.Formatter.Serialize(fs, graph);
}
}
catch <snip>

3. Minor point: I'd change this statement....
string file = this.Path + "\\" + DateTime.Now.Ticks.ToString();
A more platform neutral means of building a path is to use
string file =
System.IO.Path.Combine(this.Path,DateTime.Now.Ticks.ToString()); // uses
correct separator
Also, I'd probably test for a duplicate file name and do something like
this...
while ( System.IO.File.Exists(file) )
file += "_"; // this is probably more paraniod then you need to be.

4. You catch and save the exception object in each of the catch blocks but
all you do with it is rethrow the same object. This will actually cause a
loss of debugging information. A catch handler further up the call chain
that examines the stack trace will see the line of code where you caught the
exception as being the ultimate source rather then the actual line of code
that caused the exception. If all you do is catch and rethrow the same
object without adding additional context information there is no benefit to
the catch (unless you are deliberately trying to hide information).

If you want to add extra context information a better strategy is to do
something like this...

try { ... code here }
catch(System.Runtime.Serialization.SerializationException sex)
{
throw new ApplicationException("Unable to serialize object; is object
valid?",sex);
}
catch (Exception ex)
{

throw new ApplicationException("Unknown error enqueueing item.",ex);
}

This preserves the original exception and allows you to add some context to
it. In general if you don't do anything useful in a catch block you are
better off not catching it.



Nicholas said:
I have this class that provides a disk-based queue for storing various
items. As items are queued the event delegate is called and the consumer
dequeues the object and processes it. This seems to work off and on.
Sometimes the queue does not pick up a queued item until another item is
queued. Can anyone see a problem with this class?
<snip>
 
N

Nicholas

Thanks for the response. I've updated my code using your suggestions.

Since I am basing the file names on time, I used the sleep to wait before
creating a new file. Your idea with the loop is better.

I overlooked the placement of the fs.Close() call and changed that as well.

I'm going to test again and see if I still experience the seemingly random
"issue". Any other suggestions are welcome.

Nick
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Top