L
les
Here's a class which uses 2.0 generics to implement an inter-thread
message queue in C#. Any number of threads can post and read from the
queue simultaneously, and the message object can be any type.
There's a test driver at the bottom which demonstrates usage.
/*-----------------------------------------------------------------------------------------------------*/
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace Lib101
{
/// <summary>
/// InterthreadMessageQueue is a queue designed to be used by
multiple threads to exchange messages.
/// Any thread can post an item to the queue with PostItem(), and
any thread can retrieve items with GetItem().
/// GetItem has a maxWait parameter which specifies the amount of
time the receiving thread will block
/// waiting for an item. If this value is -1, blocking is
indefinite.
/// </summary>
public class InterthreadMessageQueue<T>
{
System.Collections.Generic.Queue<T> _queue = new
System.Collections.Generic.Queue<T>();
/// <summary>
/// Post a message to the queue.
/// </summary>
public void PostItem(T item)
{
lock (_queue)
{
_queue.Enqueue(item);
if (_queue.Count == 1)
{
Monitor.Pulse(_queue);
}
}
}
/// <summary>
/// Retrieve a message from the queue.
/// </summary>
/// <param name="maxWait">Number of milliseconds to block if
nothing is available. -1 means "block indefinitely"</param>
/// <returns>The next item in the queue, or default(T) if queue
is empty</returns>
public T GetItem(int maxWait)
{
lock (_queue)
{
if (_queue.Count == 0)
{
if (maxWait == 0)
return default(T);
Monitor.Wait(_queue, maxWait);
if (_queue.Count == 0)
return default(T);
}
return _queue.Dequeue();
}
}
}
public class InterthreadMessageQueueTester
{
static InterthreadMessageQueue<string> _testqueue = new
InterthreadMessageQueue<string>();
public static void Main(string[] args)
{
Thread t1 = new Thread(new ThreadStart(msgGenerator));
t1.Start();
Thread t2 = new Thread(new ThreadStart(msgGenerator));
t2.Start();
string msgid = null;
do
{
msgid = _testqueue.GetItem(8000);
Console.WriteLine("Received msgid={0}", msgid);
} while (true);
}
static void msgGenerator()
{
Thread.Sleep(5000);
for (int i = 1; i < 1000; i++)
{
string msg = string.Format("{0}.{1}",
Thread.CurrentThread.ManagedThreadId, i);
_testqueue.PostItem(msg);
Console.WriteLine("Posted msgid={0}", msg);
Thread.Sleep(15);
}
}
}
}
message queue in C#. Any number of threads can post and read from the
queue simultaneously, and the message object can be any type.
There's a test driver at the bottom which demonstrates usage.
/*-----------------------------------------------------------------------------------------------------*/
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace Lib101
{
/// <summary>
/// InterthreadMessageQueue is a queue designed to be used by
multiple threads to exchange messages.
/// Any thread can post an item to the queue with PostItem(), and
any thread can retrieve items with GetItem().
/// GetItem has a maxWait parameter which specifies the amount of
time the receiving thread will block
/// waiting for an item. If this value is -1, blocking is
indefinite.
/// </summary>
public class InterthreadMessageQueue<T>
{
System.Collections.Generic.Queue<T> _queue = new
System.Collections.Generic.Queue<T>();
/// <summary>
/// Post a message to the queue.
/// </summary>
public void PostItem(T item)
{
lock (_queue)
{
_queue.Enqueue(item);
if (_queue.Count == 1)
{
Monitor.Pulse(_queue);
}
}
}
/// <summary>
/// Retrieve a message from the queue.
/// </summary>
/// <param name="maxWait">Number of milliseconds to block if
nothing is available. -1 means "block indefinitely"</param>
/// <returns>The next item in the queue, or default(T) if queue
is empty</returns>
public T GetItem(int maxWait)
{
lock (_queue)
{
if (_queue.Count == 0)
{
if (maxWait == 0)
return default(T);
Monitor.Wait(_queue, maxWait);
if (_queue.Count == 0)
return default(T);
}
return _queue.Dequeue();
}
}
}
public class InterthreadMessageQueueTester
{
static InterthreadMessageQueue<string> _testqueue = new
InterthreadMessageQueue<string>();
public static void Main(string[] args)
{
Thread t1 = new Thread(new ThreadStart(msgGenerator));
t1.Start();
Thread t2 = new Thread(new ThreadStart(msgGenerator));
t2.Start();
string msgid = null;
do
{
msgid = _testqueue.GetItem(8000);
Console.WriteLine("Received msgid={0}", msgid);
} while (true);
}
static void msgGenerator()
{
Thread.Sleep(5000);
for (int i = 1; i < 1000; i++)
{
string msg = string.Format("{0}.{1}",
Thread.CurrentThread.ManagedThreadId, i);
_testqueue.PostItem(msg);
Console.WriteLine("Posted msgid={0}", msg);
Thread.Sleep(15);
}
}
}
}