The design makes sense, but I am not understanding how the listener will
let the Server object know that a new message has arrived.
I am assuming that in the Listener you do the following after you have
received the message.
No explicit signal from your code per se. Signal/Wait logic is built *into
a Blocking Bounded Queue (BBQ) implementation. The Enqueue and Dequeue
method should do the right things with your threads. Check out my Bounded
Blocking One Lock Queue. Note I have done some ad-hoc changes on this and
have not did heavy testing for correctness, but should be close. Please let
me know if you see a thread issue or other. This will allow unlimited
Consumers and Producer threads in thread safe manner. Note unbounded queues
are much easer to implement but do not give you the blocking behavior you
need in a Consumer/Producer deal and allow unbounded range which could run
you out of memory and allow a producer to run wild. Anyway, here it is...
using System;
using System.Threading;
using System.Collections;
namespace Queues
{
/// <summary>
/// Author: William Stacey
/// Modified Date: 08/03/2004
/// My One Lock Bounded Blocking Queue (e.g. Bounded Buffer.)
/// Generally the best blocking queue for normal usage on single and
multi-cpu.
/// </summary>
public class OneLockQueue : ICollection
{
#region Fields
private object[] buffer;
private int count;
private int size;
private int head;
private int tail;
private readonly object syncRoot;
#endregion
#region Constructors
/// <summary>
/// Create instance of Queue with Bounded number of elements. After that
/// many elements are used, another Enqueue operation will "block" or wait
/// until a Consumer calls Dequeue to free a slot. Likewise, if the queue
/// is empty, a call to Dequeue will block until another thread calls
/// Enqueue.
/// </summary>
/// <param name="size"></param>
public OneLockQueue(int size)
{
syncRoot = new object();
this.size = size;
buffer = new object[size];
count = 0;
head = 0;
tail = 0;
}
#endregion
#region Properties
/// <summary>
/// Gets the object values currently in the queue. If queue is empty,
this
/// will return a zero length array. The returned array lenght can be
/// 0 to Size.
/// </summary>
public object[] Values
{
get
{
// Copy used elements to a new array of "count" size. Note a simple
// Buffer copy will not work as head could be anywhere and we want
// a zero based array.
object[] values;
lock(syncRoot)
{
values = new object[count];
int pos = head;
for(int i = 0; i < count; i++)
{
values
= buffer[pos];
pos = (pos + 1) % size;
}
}
return values;
}
}
#endregion
#region Public Methods
/// <summary>
/// Enqueue "value" object into queue. If queue is full, this method will
/// block until another thread calls one of the Dequeue methods.
/// </summary>
/// <param name="value"></param>
public void Enqueue(object value)
{
lock(syncRoot)
{
while(count == size)
{
try
{
Monitor.Wait(syncRoot);
}
catch
{
Monitor.PulseAll(syncRoot);
throw;
}
}
buffer[tail] = value;
tail = (tail + 1) % size;
count++;
if ( count == 1 ) // Could have blocking Dequeue thread(s).
Monitor.PulseAll(syncRoot);
}
}
/// <summary>
/// Non-blocking version of Enqueue(). If Enqueue is successfull, this
will
/// return true, other false if queue is full.
/// </summary>
/// <param name="value"></param>
/// <returns>true if successfull, otherwise false.</returns>
public bool TryEnqueue(object value)
{
lock(syncRoot)
{
if ( count == size )
return false;
buffer[tail] = value;
tail = (tail + 1) % size;
count++;
if ( count == 1 ) // Could have blocking Dequeue thread(s).
Monitor.PulseAll(syncRoot);
}
return true;
}
/// <summary>
/// Removes and returns the object at the beginning of the Queue.
/// If queue is empty, method will block until another thread calls one of
/// the Enqueue methods.
/// </summary>
/// <returns>The object that is removed from the beginning of the
Queue.</returns>
public object Dequeue()
{
object value;
lock(syncRoot)
{
while(count == 0)
{
try
{
Monitor.Wait(syncRoot);
}
catch
{
Monitor.PulseAll(syncRoot);
throw;
}
}
value = buffer[head];
buffer[head] = null;
head = (head + 1) % size;
count--;
if ( count == (size - 1) ) // Could have blocking Enqueue thread(s).
Monitor.PulseAll(syncRoot);
}
return value;
}
/// <summary>
/// Non-blocking version of Dequeue. Will return false if queue is empty
and set
/// value to null, otherwise will return true and set value to the
dequeued object.
/// </summary>
/// <param name="value">The object that is removed from the beginning of
the Queue or null if empty.</param>
/// <returns>true if successfull, otherwise false.</returns>
public bool TryDequeue(out object value)
{
lock(syncRoot)
{
if ( count == 0 )
{
value = null;
return false;
}
value = buffer[head];
buffer[head] = null;
head = (head + 1) % size;
count--;
if ( count == (size - 1) ) // Could have blocking Enqueue thread(s).
Monitor.PulseAll(syncRoot);
}
return true;
}
/// <summary>
/// Returns the object at the beginning of the Queue without removing it.
/// </summary>
/// <returns>The object at the beginning of the Queue.</returns>
/// <exception cref="InvalidOpertionException">The Queue is
empty.</exception>
public object Peek()
{
lock(syncRoot)
{
if ( count == 0 )
throw new InvalidOperationException("The Queue is empty.");
object value = buffer[head];
return value;
}
}
/// <summary>
/// Returns the object at the beginning of the Queue without removing it.
/// Contrary to the Peek method, this method will not throw exception if
/// queue is empty, but instead will return false.
/// </summary>
/// <param name="value">The object at the beginning of the Queue or null
if empty.</param>
/// <returns>The object at the beginning of the Queue.</returns>
public bool TryPeek(out object value)
{
lock(syncRoot)
{
if ( count == 0 )
{
value = null;
return false;
}
value = buffer[head];
}
return true;
}
#endregion
#region ICollection Members
public bool IsSynchronized
{
get { return true; }
}
public int Size
{
get { return this.size; }
}
public int Count
{
get { lock(syncRoot) { return count; } }
}
public void CopyTo(Array array, int index)
{
object[] tmpArray = Values;
tmpArray.CopyTo(array, index);
}
public object SyncRoot
{
get { return this.syncRoot; }
}
#endregion
#region IEnumerable Members
public IEnumerator GetEnumerator()
{
throw new NotImplementedException("Not Implemented.");
}
#endregion
} // End OneLockQueue
}
Usage Example:
------------------------
OneLockQueue olq = new OneLockQueue(5);
while(olq.TryEnqueue("two") == true)
{
Console.WriteLine("Enqueued:");
}
object obj;
while(olq.TryDequeue(out obj))
{
Console.WriteLine("Dequeued:");
}
HTH