Inter-thread message queue in C#

L

les

Here's a class which uses 2.0 generics to implement an inter-thread
message queue in C#. Any number of threads can post and read from the
queue simultaneously, and the message object can be any type.

There's a test driver at the bottom which demonstrates usage.

/*-----------------------------------------------------------------------------------------------------*/

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace Lib101
{
/// <summary>
/// InterthreadMessageQueue is a queue designed to be used by
multiple threads to exchange messages.
/// Any thread can post an item to the queue with PostItem(), and
any thread can retrieve items with GetItem().
/// GetItem has a maxWait parameter which specifies the amount of
time the receiving thread will block
/// waiting for an item. If this value is -1, blocking is
indefinite.
/// </summary>
public class InterthreadMessageQueue<T>
{
System.Collections.Generic.Queue<T> _queue = new
System.Collections.Generic.Queue<T>();

/// <summary>
/// Post a message to the queue.
/// </summary>
public void PostItem(T item)
{
lock (_queue)
{
_queue.Enqueue(item);
if (_queue.Count == 1)
{
Monitor.Pulse(_queue);
}
}
}
/// <summary>
/// Retrieve a message from the queue.
/// </summary>
/// <param name="maxWait">Number of milliseconds to block if
nothing is available. -1 means "block indefinitely"</param>
/// <returns>The next item in the queue, or default(T) if queue
is empty</returns>
public T GetItem(int maxWait)
{
lock (_queue)
{
if (_queue.Count == 0)
{
if (maxWait == 0)
return default(T);
Monitor.Wait(_queue, maxWait);
if (_queue.Count == 0)
return default(T);
}
return _queue.Dequeue();
}
}
}

public class InterthreadMessageQueueTester
{
static InterthreadMessageQueue<string> _testqueue = new
InterthreadMessageQueue<string>();
public static void Main(string[] args)
{
Thread t1 = new Thread(new ThreadStart(msgGenerator));
t1.Start();
Thread t2 = new Thread(new ThreadStart(msgGenerator));
t2.Start();

string msgid = null;
do
{
msgid = _testqueue.GetItem(8000);
Console.WriteLine("Received msgid={0}", msgid);
} while (true);

}
static void msgGenerator()
{
Thread.Sleep(5000);
for (int i = 1; i < 1000; i++)
{
string msg = string.Format("{0}.{1}",
Thread.CurrentThread.ManagedThreadId, i);
_testqueue.PostItem(msg);
Console.WriteLine("Posted msgid={0}", msg);
Thread.Sleep(15);
}
}
}

}
 
G

Guest

Did you have a question?

If you are just letting people know about this wonderful solution I suggest
you post it on CodeProject.com
 
A

andrew queisser

Here's a class which uses 2.0 generics to implement an inter-thread
message queue in C#. Any number of threads can post and read from the
queue simultaneously, and the message object can be any type.

[code snipped]

Hi les,

I was just looking at something like that a few days ago and I have a
question:

Is it possible that one of two threads waiting on an empty queue might get
stuck? Here's the scenario:

1) Queue is empty
2) T1 calls GetItem, enters Monitor.Wait
3) T2 calls GetItem, enters Monitor.Wait
4) T3 calls PostItem and calls Monitor.Pulse since the queue is empty
5) T3 calls PostItem again before T1 or T2 have a chance to run.
Monitor.Pulse doesn't get called since the queue isn't empty anyomoe
6) T1 and T2 compete for the lock, T1 wins and dequeues one item
7) T2 is now stuck in Monitor.Wait although the queue is non-empty

I'm not really sure if the above scenario can really happen and T2 would
only get stuck for "maxWait" since you don't check the return code of
Wait(). In my code I call the equivalent of "Pulse" each time I leave
GetItem to prevent stuckness.


Andrew
 
A

andrew queisser

andrew queisser said:
Here's a class which uses 2.0 generics to implement an inter-thread
message queue in C#. Any number of threads can post and read from the
queue simultaneously, and the message object can be any type.

[code snipped]

Hi les,

I was just looking at something like that a few days ago and I have a
question:

Is it possible that one of two threads waiting on an empty queue might get
stuck? Here's the scenario:

1) Queue is empty
2) T1 calls GetItem, enters Monitor.Wait
3) T2 calls GetItem, enters Monitor.Wait
4) T3 calls PostItem and calls Monitor.Pulse since the queue is empty
5) T3 calls PostItem again before T1 or T2 have a chance to run.
Monitor.Pulse doesn't get called since the queue isn't empty anyomoe
6) T1 and T2 compete for the lock, T1 wins and dequeues one item
7) T2 is now stuck in Monitor.Wait although the queue is non-empty

I'm not really sure if the above scenario can really happen and T2 would
only get stuck for "maxWait" since you don't check the return code of
Wait(). In my code I call the equivalent of "Pulse" each time I leave
GetItem to prevent stuckness.
Ok, I just verified that it can indeed happen if the priority of T1 and T2
are below that of T3.

Andrew
 
B

Brian Gideon

andrew said:
Is it possible that one of two threads waiting on an empty queue might get
stuck?

Yes. But, like you said the timeout will expire and any "live locked"
consumers will eventually wake. That is unless the timeout is too long
or, worse yet, infinite. So technically this implementation is safe
for multiple producers, but only one consumer.

Brian
 
W

William Stacey [MVP]

Yeh. That is one reason why it is safer to use PulseAll at the potential
risk of some slight overhead. Here is mine on the CodeProject that shows
this:
http://codeproject.com/csharp/boundedblockingqueue.asp

--
William Stacey [MVP]

|
| | > Here's a class which uses 2.0 generics to implement an inter-thread
| > message queue in C#. Any number of threads can post and read from the
| > queue simultaneously, and the message object can be any type.
| >
|
| [code snipped]
|
| Hi les,
|
| I was just looking at something like that a few days ago and I have a
| question:
|
| Is it possible that one of two threads waiting on an empty queue might get
| stuck? Here's the scenario:
|
| 1) Queue is empty
| 2) T1 calls GetItem, enters Monitor.Wait
| 3) T2 calls GetItem, enters Monitor.Wait
| 4) T3 calls PostItem and calls Monitor.Pulse since the queue is empty
| 5) T3 calls PostItem again before T1 or T2 have a chance to run.
| Monitor.Pulse doesn't get called since the queue isn't empty anyomoe
| 6) T1 and T2 compete for the lock, T1 wins and dequeues one item
| 7) T2 is now stuck in Monitor.Wait although the queue is non-empty
|
| I'm not really sure if the above scenario can really happen and T2 would
| only get stuck for "maxWait" since you don't check the return code of
| Wait(). In my code I call the equivalent of "Pulse" each time I leave
| GetItem to prevent stuckness.
|
|
| Andrew
|
|
 
J

Jon Skeet [C# MVP]

William Stacey said:
Yeh. That is one reason why it is safer to use PulseAll at the potential
risk of some slight overhead. Here is mine on the CodeProject that shows
this:
http://codeproject.com/csharp/boundedblockingqueue.asp

Alternatively, *always* pulse the monitor, whether or not the queue was
empty before.

I would also suggest going round the loop "while" the count is zero,
rather than using a single "if".

See http://www.pobox.com/~skeet/csharp/threads/deadlocks.shtml (half
way down) for my implementation of a producer/consumer queue.
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Similar Threads

Passing a generic type of class ? 1
message queue 2
is it any point to have readonly field in this code 1
c# Thread problem 20
synchronizations of threads 1
Thread 1
Help with thread 1
Thread GC collection 6

Top