How to improve performance of Queue accessing between 2 threads?

G

Guest

Dear Guys,

I write program that sharing Queue between 2 threads which 1 thread add data
to Queue. And another thread get data from Queue to process. My situation
is if there are alot of data to add (like loop to add). The 2nd thread which
try to get data from Queue cannot access or rarely to access that Queue which
make the program has low performance.

How can I do to improve this situation? I want 2nd thread can access as
fast as possible once Queue is not empty.

Here is some snippet code:-

Thread#1

For i as Integer = 1 To 1000
SyncLock _queue.SyncRoot
_queue.Enqueue(i)
_newItemEvent.Set()
End SyncRoot
Next


Thread#2

Dim item As Integer
While _queue.Count <= 0
Thread.SpinWait(0)
End While

While _queue.Count > 0
SyncLock _queue.SyncRoot
item = DirectCast(_queue.Dequeue(), Integer)

....
End SyncLock
End While


Thanks,
Thana N.
 
G

Greg Young

Also just for your example ...
For i as Integer = 1 To 1000
SyncLock _queue.SyncRoot
_queue.Enqueue(i)
_newItemEvent.Set()
End SyncRoot
Next


The other thread cannot access the data because you hold the lock on the
data for the duration of all of the inserts.

Cheers,

Greg
 
C

Chris Mullins

I had access to a 16-process or Itanium Box, a 4 Processor Itanium box, a 2
Processor AMD machine, and my single processor laptop.

I took some time to run some locking / no-locking tests and came up with
surprising results. I keep meaning to write this up as a blog entry, but
unfortuantly I didn't to a thorough enough job of data capture.

On the 1 and 2 processor box, locking was faster. On the 4 processor box,
Lock-Free was a tiny bit faster. On the 16 processor box it wasn't even
close - lock free won by more than an an order of magnitude.

What this ended up telling me is that I need a "smart queue" and "smart
list" that can look at the number of processors and make the decision which
algorithm to use.
 
M

Markus Stoeger

Thana said:
How can I do to improve this situation? I want 2nd thread can access as
fast as possible once Queue is not empty.

Two things... #1 I assume that your _newItemEvent is either an
AutoReset- or ManualResetEvent? Thats slow. Try to replace that with the
Monitor.Pulse/Monitor.Wait methods.
#2 Do you really have to use Thread.SpinWait? I think Monitor.Wait would
be better here.

hth,
Max
 
B

Brian Gideon

Thana,

It looks like what you want is a blocking queue. It's easier if you
wrap that logic in separate class. The following implementation is a
port from the this article
http://www.yoda.arachsys.com/csharp/threads/deadlocks.shtml.

Public Class BlockingQueue

ReadOnly lockObject As Object = New Object
Private queue As Queue = New Queue

Public Sub Enqueue(ByVal o As Object)
SyncLock lockObject
queue.Enqueue(o)
Monitor.Pulse(lockObject)
End SyncLock
End Sub

Public Function Dequeue() As Object
SyncLock lockObject
While queue.Count = 0
Monitor.Wait(lockObject)
End While
Return queue.Dequeue
End SyncLock
End Function

End Class

Brian
 
W

William Stacey [MVP]

Couple issues here. First, don't use spin locks that way - big waste of
cpu. Moreover, the sync wrapper takes a lock for all methods and
properties. So the 2nd thread is taking an releasing the lock 4 times for a
single dequeue operation. As Brian said, use a blocking queue for this kind
of producer/consumer pattern. Here is my 1 lock blocking queue below.

private void button5_Click(object sender, EventArgs e)
{
BlockingQueue<int> bq = new BlockingQueue<int>();

new Thread(delegate()
{
int i;
while( bq.TryDequeue(200, out i) )
Console.WriteLine(i);
Console.WriteLine("Consumer thread completed.");
}).Start();

for (int i = 0; i < 100; i++)
{
bq.Enqueue(i);
}

}

/// <summary>
/// Represents a first-in, first-out collection of objects.
/// </summary>
/// <typeparam name="T">Type of element queue will contain.</typeparam>
public class BlockingQueue<T> : IEnumerable<T>, ICollection
{
private bool isOpened = true;
private readonly Queue<T> q;
private readonly object syncRoot = new object();

/// <summary>
/// Initializes a new instance of the BlockingQueue class.
/// </summary>
public BlockingQueue()
{
q = new Queue<T>();
}

/// <summary>
/// Initializes a new instance of the BlockingQueue class.
/// </summary>
/// <param name="capacity">The initial number of elements the queue
can contain.</param>
public BlockingQueue(int capacity)
{
q = new Queue<T>(capacity);
}

/// <summary>
/// Initializes a new instance of the BlockingQueue class.
/// </summary>
/// <param name="collection">A collection whose elements are copied
to the new queue.</param>
public BlockingQueue(IEnumerable<T> collection)
{
q = new Queue<T>(collection);
}

/// <summary>
/// Gets the number of elements in the queue.
/// </summary>
public int Count
{
get
{
lock ( syncRoot )
{
return q.Count;
}
}
}

/// <summary>
/// Remove all objects from the BlockingQueue<T>.
/// </summary>
public void Clear()
{
lock ( syncRoot )
{
q.Clear();
}
}

/// <summary>
/// Closes the queue.
/// </summary>
public void Close()
{
lock ( syncRoot )
{
if ( ! this.isOpened )
return; // Already closed.

isOpened = false;
q.Clear();
Monitor.PulseAll(syncRoot); // resume any waiting threads
so they see the queue is closed.
}
}

/// <summary>
/// Gets a value indicating if queue is opened.
/// </summary>
public bool Opened
{
get
{
lock ( syncRoot )
{
return this.isOpened;
}
}
}

/// <summary>
/// Determines whether an element is in the
System.Collections.Generic.Queue<T>.
/// </summary>
/// <param name="item">The object to locate in the
System.Collections.Generic.Queue<T>. The value can be null for reference
types.</param>
/// <returns>true if item is found in the
System.Collections.Generic.Queue<T>; otherwise, false.</returns>
public bool Contains(T item)
{
lock ( syncRoot )
{
return q.Contains(item);
}
}

/// <summary>
/// Copies the System.Collections.Generic.Queue<T> elements to an
existing one-dimensional System.Array, starting at the specified array
index.
/// </summary>
/// <param name="array">The one-dimensional System.Array that is the
destination of the elements
/// copied from System.Collections.Generic.Queue<T>. The
System.Array must have zero-based indexing.
/// </param>
/// <param name="arrayIndex">The zero-based index in array at which
copying begins.</param>
public void CopyTo(T[] array, int arrayIndex)
{
lock ( syncRoot )
{
q.CopyTo(array, arrayIndex);
}
}

public T[] ToArray()
{
lock ( syncRoot )
{
return q.ToArray();
}
}

public IEnumerator<T> GetEnumerator()
{
return new BlockingQueue<T>.Enumerator(this, -1);
}
public IEnumerator<T> GetEnumerator(int millisecondsTimeout)
{
return new BlockingQueue<T>.Enumerator(this,
millisecondsTimeout);
}
IEnumerator IEnumerable.GetEnumerator()
{
return new BlockingQueue<T>.Enumerator(this, -1);
}

/// <summary>
/// Sets the capacity to the actual number of elements in the
System.Collections.Generic.Queue<T>,
/// if that number is less than 90 percent of current capacity.
/// </summary>
public void TrimExcess()
{
lock ( syncRoot )
{
q.TrimExcess();
}
}

/// <summary>
/// Removes and returns the object at the beginning of the Queue.
/// </summary>
/// <returns>Object in queue.</returns>
public T Dequeue()
{
return Dequeue(Timeout.Infinite);
}

/// <summary>
/// Removes and returns the object at the beginning of the Queue.
/// </summary>
/// <param name="timeout">Time to wait before returning (in
milliseconds).</param>
/// <returns>Object in queue.</returns>
public T Dequeue(int millisecondsTimeout)
{
lock ( syncRoot )
{
while ( isOpened && (q.Count == 0) )
{
if ( ! Monitor.Wait(syncRoot, millisecondsTimeout) )
throw new TimeoutException("Operation timeout");
}

if ( ! isOpened )
throw new InvalidOperationException("Queue closed");
return q.Dequeue();
}
}

public bool TryDequeue(int millisecondsTimeout, out T value)
{
lock (syncRoot)
{
while (isOpened && (q.Count == 0))
{
if ( ! Monitor.Wait(syncRoot, millisecondsTimeout) )
{
value = default(T);
return false;
}
}

if (! isOpened)
throw new InvalidOperationException("Queue closed");
value = q.Dequeue();
return true;
}
}

/// <summary>
/// Returns the object at the beginning of the BlockingQueue<T>
/// without removing it.
/// </summary>
/// <returns>The object at the beginning of the
BlockingQueue<T>.</returns>
public T Peek()
{
return Peek(-1);
}

/// <summary>
/// Returns the object at the beginning of the BlockingQueue<T>
/// without removing it.
/// </summary>
/// <returns>The object at the beginning of the
BlockingQueue<T>.</returns>
/// <param name="millisecondsTimeout">Time to wait before returning
(in milliseconds).</param>
public T Peek(int millisecondsTimeout)
{
lock ( syncRoot )
{
while ( isOpened && (q.Count == 0) )
{
if ( ! Monitor.Wait(syncRoot, millisecondsTimeout) )
throw new TimeoutException("Operation timeout");
}

if ( ! isOpened )
throw new InvalidOperationException("Queue closed");
return q.Peek();
}
}

/// <summary>
/// Adds an object to the end of the Queue.
/// </summary>
/// <param name="obj">Object to put in queue.</param>
public void Enqueue(T item)
{
lock ( syncRoot )
{
if ( ! isOpened )
throw new InvalidOperationException("Queue closed");
q.Enqueue(item);
Monitor.Pulse(syncRoot); // Move 1 waiting thread to the
"ready" queue in this monitor object.
} // Exiting lock will free thread(s) in the "ready" queue for
this monitor object.
}

[Serializable, StructLayout(LayoutKind.Sequential)]
public struct Enumerator : IEnumerator<T>, IDisposable, IEnumerator
{
private BlockingQueue<T> q;
private IEnumerator<T> e;

internal Enumerator(BlockingQueue<T> q, int timeout)
{
this.q = q;
if (!Monitor.TryEnter(this.q.SyncRoot, timeout))
throw new TimeoutException("Timeout waiting for
enumerator lock on BlockingQueue<T>.");
this.e = this.q.q.GetEnumerator(); // Get the contained
Queue<T> enumerator.
}

public void Dispose()
{
this.e.Dispose();
Monitor.Exit(q.SyncRoot);
}

public bool MoveNext()
{
return e.MoveNext();
}

public T Current
{
get
{
return e.Current;
}
}

object IEnumerator.Current
{
get
{
return ((IEnumerator)e).Current;
}
}

void IEnumerator.Reset()
{
e.Reset();
}
}

#region ICollection Members

/// <summary>
/// Copies the BlockingQueue<T> elements to an existing
one-dimensional System.Array, starting at the specified array index.
/// </summary>
/// <param name="array"></param>
/// <param name="index"></param>
public void CopyTo(Array array, int index)
{
lock (syncRoot)
{
((ICollection)q).CopyTo(array, index);
}
}

/// <summary>
/// Get a value that indicates if the queue is synchronized.
/// </summary>
public bool IsSynchronized
{
get
{
return true;
}
}

public object SyncRoot
{
get
{
return this.syncRoot;
}
}

#endregion
}


--
William Stacey [MVP]

| Dear Guys,
|
| I write program that sharing Queue between 2 threads which 1 thread add
data
| to Queue. And another thread get data from Queue to process. My
situation
| is if there are alot of data to add (like loop to add). The 2nd thread
which
| try to get data from Queue cannot access or rarely to access that Queue
which
| make the program has low performance.
|
| How can I do to improve this situation? I want 2nd thread can access as
| fast as possible once Queue is not empty.
|
| Here is some snippet code:-
|
| Thread#1
|
| For i as Integer = 1 To 1000
| SyncLock _queue.SyncRoot
| _queue.Enqueue(i)
| _newItemEvent.Set()
| End SyncRoot
| Next
|
|
| Thread#2
|
| Dim item As Integer
| While _queue.Count <= 0
| Thread.SpinWait(0)
| End While
|
| While _queue.Count > 0
| SyncLock _queue.SyncRoot
| item = DirectCast(_queue.Dequeue(), Integer)
|
| ....
| End SyncLock
| End While
|
|
| Thanks,
| Thana N.
|
 
G

Guest

Thanks for your reply, but I've forgot to tell you all that I still using
..NET 1.1 not 2.0. Any suggestion for 1.1.

Thanks again,
 
G

Guest

Thanks for your reply, but I've forgot to tell you all that I still using
..NET 1.1 not 2.0. Any suggestion for 1.1.

Rgrds,
Thana N.

William Stacey said:
Couple issues here. First, don't use spin locks that way - big waste of
cpu. Moreover, the sync wrapper takes a lock for all methods and
properties. So the 2nd thread is taking an releasing the lock 4 times for a
single dequeue operation. As Brian said, use a blocking queue for this kind
of producer/consumer pattern. Here is my 1 lock blocking queue below.

private void button5_Click(object sender, EventArgs e)
{
BlockingQueue<int> bq = new BlockingQueue<int>();

new Thread(delegate()
{
int i;
while( bq.TryDequeue(200, out i) )
Console.WriteLine(i);
Console.WriteLine("Consumer thread completed.");
}).Start();

for (int i = 0; i < 100; i++)
{
bq.Enqueue(i);
}

}

/// <summary>
/// Represents a first-in, first-out collection of objects.
/// </summary>
/// <typeparam name="T">Type of element queue will contain.</typeparam>
public class BlockingQueue<T> : IEnumerable<T>, ICollection
{
private bool isOpened = true;
private readonly Queue<T> q;
private readonly object syncRoot = new object();

/// <summary>
/// Initializes a new instance of the BlockingQueue class.
/// </summary>
public BlockingQueue()
{
q = new Queue<T>();
}

/// <summary>
/// Initializes a new instance of the BlockingQueue class.
/// </summary>
/// <param name="capacity">The initial number of elements the queue
can contain.</param>
public BlockingQueue(int capacity)
{
q = new Queue<T>(capacity);
}

/// <summary>
/// Initializes a new instance of the BlockingQueue class.
/// </summary>
/// <param name="collection">A collection whose elements are copied
to the new queue.</param>
public BlockingQueue(IEnumerable<T> collection)
{
q = new Queue<T>(collection);
}

/// <summary>
/// Gets the number of elements in the queue.
/// </summary>
public int Count
{
get
{
lock ( syncRoot )
{
return q.Count;
}
}
}

/// <summary>
/// Remove all objects from the BlockingQueue<T>.
/// </summary>
public void Clear()
{
lock ( syncRoot )
{
q.Clear();
}
}

/// <summary>
/// Closes the queue.
/// </summary>
public void Close()
{
lock ( syncRoot )
{
if ( ! this.isOpened )
return; // Already closed.

isOpened = false;
q.Clear();
Monitor.PulseAll(syncRoot); // resume any waiting threads
so they see the queue is closed.
}
}

/// <summary>
/// Gets a value indicating if queue is opened.
/// </summary>
public bool Opened
{
get
{
lock ( syncRoot )
{
return this.isOpened;
}
}
}

/// <summary>
/// Determines whether an element is in the
System.Collections.Generic.Queue<T>.
/// </summary>
/// <param name="item">The object to locate in the
System.Collections.Generic.Queue<T>. The value can be null for reference
types.</param>
/// <returns>true if item is found in the
System.Collections.Generic.Queue<T>; otherwise, false.</returns>
public bool Contains(T item)
{
lock ( syncRoot )
{
return q.Contains(item);
}
}

/// <summary>
/// Copies the System.Collections.Generic.Queue<T> elements to an
existing one-dimensional System.Array, starting at the specified array
index.
/// </summary>
/// <param name="array">The one-dimensional System.Array that is the
destination of the elements
/// copied from System.Collections.Generic.Queue<T>. The
System.Array must have zero-based indexing.
/// </param>
/// <param name="arrayIndex">The zero-based index in array at which
copying begins.</param>
public void CopyTo(T[] array, int arrayIndex)
{
lock ( syncRoot )
{
q.CopyTo(array, arrayIndex);
}
}

public T[] ToArray()
{
lock ( syncRoot )
{
return q.ToArray();
}
}

public IEnumerator<T> GetEnumerator()
{
return new BlockingQueue<T>.Enumerator(this, -1);
}
public IEnumerator<T> GetEnumerator(int millisecondsTimeout)
{
return new BlockingQueue<T>.Enumerator(this,
millisecondsTimeout);
}
IEnumerator IEnumerable.GetEnumerator()
{
return new BlockingQueue<T>.Enumerator(this, -1);
}

/// <summary>
/// Sets the capacity to the actual number of elements in the
System.Collections.Generic.Queue<T>,
/// if that number is less than 90 percent of current capacity.
/// </summary>
public void TrimExcess()
{
lock ( syncRoot )
{
q.TrimExcess();
}
}

/// <summary>
/// Removes and returns the object at the beginning of the Queue.
/// </summary>
/// <returns>Object in queue.</returns>
public T Dequeue()
{
return Dequeue(Timeout.Infinite);
}

/// <summary>
/// Removes and returns the object at the beginning of the Queue.
/// </summary>
/// <param name="timeout">Time to wait before returning (in
milliseconds).</param>
/// <returns>Object in queue.</returns>
public T Dequeue(int millisecondsTimeout)
{
lock ( syncRoot )
{
while ( isOpened && (q.Count == 0) )
{
if ( ! Monitor.Wait(syncRoot, millisecondsTimeout) )
throw new TimeoutException("Operation timeout");
}

if ( ! isOpened )
throw new InvalidOperationException("Queue closed");
return q.Dequeue();
}
}

public bool TryDequeue(int millisecondsTimeout, out T value)
{
lock (syncRoot)
{
while (isOpened && (q.Count == 0))
{
if ( ! Monitor.Wait(syncRoot, millisecondsTimeout) )
{
value = default(T);
return false;
}
}

if (! isOpened)
throw new InvalidOperationException("Queue closed");
value = q.Dequeue();
return true;
}
}

/// <summary>
/// Returns the object at the beginning of the BlockingQueue<T>
/// without removing it.
/// </summary>
/// <returns>The object at the beginning of the
BlockingQueue<T>.</returns>
public T Peek()
{
return Peek(-1);
}

/// <summary>
/// Returns the object at the beginning of the BlockingQueue<T>
/// without removing it.
/// </summary>
/// <returns>The object at the beginning of the
BlockingQueue<T>.</returns>
/// <param name="millisecondsTimeout">Time to wait before returning
(in milliseconds).</param>
public T Peek(int millisecondsTimeout)
{
lock ( syncRoot )
{
while ( isOpened && (q.Count == 0) )
{
if ( ! Monitor.Wait(syncRoot, millisecondsTimeout) )
throw new TimeoutException("Operation timeout");
}

if ( ! isOpened )
throw new InvalidOperationException("Queue closed");
return q.Peek();
}
}

/// <summary>
/// Adds an object to the end of the Queue.
/// </summary>
/// <param name="obj">Object to put in queue.</param>
public void Enqueue(T item)
{
lock ( syncRoot )
{
if ( ! isOpened )
throw new InvalidOperationException("Queue closed");
q.Enqueue(item);
Monitor.Pulse(syncRoot); // Move 1 waiting thread to the
"ready" queue in this monitor object.
} // Exiting lock will free thread(s) in the "ready" queue for
this monitor object.
}

[Serializable, StructLayout(LayoutKind.Sequential)]
public struct Enumerator : IEnumerator<T>, IDisposable, IEnumerator
{
private BlockingQueue<T> q;
private IEnumerator<T> e;
 
G

Guest

I understand your point Greg, that code just my simulation code. My real
situation is it is a TCP Server that receives multiple messages at the same
time and these messages will be added to queue for another thread to get
these messages from the queue to send out to another server on 1-connection
and it cannot utilize my netwidth bandwidth, it just use 20% of it. I think
queue accessing will be the one of my delay. Any suggestion for my situation.

And again, my code is in .NET 1.1.

Thanks again for your reply.
Thana N.
 
B

Brian Gideon

Thana,

The implementation I posted, although not as complete as William's, is
compatible with 1.1 and should work fine.

Brian
 
W

William Stacey [MVP]

Here is a non-generic version. hth

using System;
using System.Collections;
using System.Threading;

namespace WJS.Threading
{
/// <summary>
/// A blocking queue derived from Queue. The dequeue method will block
on an empty queue.
/// Enqueue operations will not block as queue is only bounded by array
size or MaxSize parameter.
/// <remarks>This class is thread safe for multiple consumer and
producer threads.</remarks>
/// </summary>
public class BlockingQueue : Queue
{
private bool opened = true;
private int maxSize = int.MaxValue;
private readonly object syncRoot = new object();

/// <summary>
/// Create new BlockingQueue.
/// </summary>
/// <param name="col">The System.Collections.ICollection to copy
elements from.</param>
public BlockingQueue(ICollection col) : base(col)
{
}

/// <summary>
/// Create new BlockingQueue.
/// </summary>
/// <param name="capacity">The initial number of elements that the
queue can contain.</param>
/// <param name="growFactor">The factor by which the capacity of the
queue is expanded.</param>
public BlockingQueue(int capacity, float growFactor) :
base(capacity, growFactor)
{
}

/// <summary>
/// Create new BlockingQueue.
/// </summary>
/// <param name="capacity">The initial number of elements that the
queue can contain.</param>
public BlockingQueue(int capacity) : base(capacity)
{
}

/// <summary>
/// Create new BlockingQueue.
/// </summary>
public BlockingQueue() : base()
{
}

/// <summary>
/// BlockingQueue Destructor (Close queue, resume any waiting
thread).
/// </summary>
~BlockingQueue()
{
Close();
}

/// <summary>
/// Gets or sets the maximum size of the queue. After MaxSize is
reached, an additional Enqueue operation will
/// throw an exception.
/// </summary>
public int MaxSize
{
get
{
lock ( syncRoot )
{
return this.maxSize;
}
}
set
{
lock ( syncRoot )
{
if ( value < 0 )
throw new ArgumentOutOfRangeException("MaxSize must
be >= 0.");
}
}
}

/// <summary>
/// Remove all objects from the Queue.
/// </summary>
public override void Clear()
{
lock ( syncRoot )
{
base.Clear();
}
}

/// <summary>
/// Remove all objects from the Queue, resume all dequeue threads.
/// </summary>
public void Close()
{
lock ( syncRoot )
{
if ( ! Opened )
return; // Already closed.

opened = false;
base.Clear();
Monitor.PulseAll(syncRoot); // resume any waiting threads
}
}

/// <summary>
/// Gets flag indicating if queue has been closed.
/// </summary>
public bool Opened
{
get
{
lock ( syncRoot )
{
return this.opened;
}
}
}

/// <summary>
/// Removes and returns the object at the beginning of the Queue.
/// </summary>
/// <returns>Object in queue.</returns>
public override object Dequeue()
{
return Dequeue(Timeout.Infinite);
}

/// <summary>
/// Removes and returns the object at the beginning of the Queue.
/// </summary>
/// <param name="timeout">time to wait before returning</param>
/// <returns>Object in queue.</returns>
public object Dequeue(TimeSpan timeout)
{
return Dequeue(timeout.Milliseconds);
}

/// <summary>
/// Removes and returns the object at the beginning of the Queue.
/// </summary>
/// <param name="timeout">Time to wait before returning (in
milliseconds)</param>
/// <returns>Object in queue.</returns>
public object Dequeue(int timeout)
{
lock ( syncRoot )
{
while ( Opened && (base.Count == 0) )
{
if ( ! Monitor.Wait(syncRoot, timeout) )
throw new InvalidOperationException("Timeout");
}

if ( Opened )
return base.Dequeue();
else
throw new InvalidOperationException("Queue closed");
}
}

/// <summary>
/// Adds an object to the end of the Queue.
/// </summary>
/// <param name="obj">Object to put in queue</param>
public override void Enqueue(object obj)
{
lock ( syncRoot )
{
if ( Opened )
throw new InvalidOperationException("Queue closed");
if ( base.Count >= this.maxSize )
throw new InvalidOperationException("Queue full. MaxSize
reached.");
base.Enqueue(obj);
Monitor.Pulse(syncRoot); // Move 1 waiting thread to ready
queue in this monitor object.
} // Exiting lock will start all threads in the ready queue for
this monitor object.
}
}
}


--
William Stacey [MVP]

| Thanks for your reply, but I've forgot to tell you all that I still using
| .NET 1.1 not 2.0. Any suggestion for 1.1.
|
| Rgrds,
| Thana N.
|
| "William Stacey [MVP]" wrote:
|
| > Couple issues here. First, don't use spin locks that way - big waste of
| > cpu. Moreover, the sync wrapper takes a lock for all methods and
| > properties. So the 2nd thread is taking an releasing the lock 4 times
for a
| > single dequeue operation. As Brian said, use a blocking queue for this
kind
| > of producer/consumer pattern. Here is my 1 lock blocking queue below.
| >
| > private void button5_Click(object sender, EventArgs e)
| > {
| > BlockingQueue<int> bq = new BlockingQueue<int>();
| >
| > new Thread(delegate()
| > {
| > int i;
| > while( bq.TryDequeue(200, out i) )
| > Console.WriteLine(i);
| > Console.WriteLine("Consumer thread completed.");
| > }).Start();
| >
| > for (int i = 0; i < 100; i++)
| > {
| > bq.Enqueue(i);
| > }
| >
| > }
| >
| > /// <summary>
| > /// Represents a first-in, first-out collection of objects.
| > /// </summary>
| > /// <typeparam name="T">Type of element queue will
contain.</typeparam>
| > public class BlockingQueue<T> : IEnumerable<T>, ICollection
| > {
| > private bool isOpened = true;
| > private readonly Queue<T> q;
| > private readonly object syncRoot = new object();
| >
| > /// <summary>
| > /// Initializes a new instance of the BlockingQueue class.
| > /// </summary>
| > public BlockingQueue()
| > {
| > q = new Queue<T>();
| > }
| >
| > /// <summary>
| > /// Initializes a new instance of the BlockingQueue class.
| > /// </summary>
| > /// <param name="capacity">The initial number of elements the
queue
| > can contain.</param>
| > public BlockingQueue(int capacity)
| > {
| > q = new Queue<T>(capacity);
| > }
| >
| > /// <summary>
| > /// Initializes a new instance of the BlockingQueue class.
| > /// </summary>
| > /// <param name="collection">A collection whose elements are
copied
| > to the new queue.</param>
| > public BlockingQueue(IEnumerable<T> collection)
| > {
| > q = new Queue<T>(collection);
| > }
| >
| > /// <summary>
| > /// Gets the number of elements in the queue.
| > /// </summary>
| > public int Count
| > {
| > get
| > {
| > lock ( syncRoot )
| > {
| > return q.Count;
| > }
| > }
| > }
| >
| > /// <summary>
| > /// Remove all objects from the BlockingQueue<T>.
| > /// </summary>
| > public void Clear()
| > {
| > lock ( syncRoot )
| > {
| > q.Clear();
| > }
| > }
| >
| > /// <summary>
| > /// Closes the queue.
| > /// </summary>
| > public void Close()
| > {
| > lock ( syncRoot )
| > {
| > if ( ! this.isOpened )
| > return; // Already closed.
| >
| > isOpened = false;
| > q.Clear();
| > Monitor.PulseAll(syncRoot); // resume any waiting
threads
| > so they see the queue is closed.
| > }
| > }
| >
| > /// <summary>
| > /// Gets a value indicating if queue is opened.
| > /// </summary>
| > public bool Opened
| > {
| > get
| > {
| > lock ( syncRoot )
| > {
| > return this.isOpened;
| > }
| > }
| > }
| >
| > /// <summary>
| > /// Determines whether an element is in the
| > System.Collections.Generic.Queue<T>.
| > /// </summary>
| > /// <param name="item">The object to locate in the
| > System.Collections.Generic.Queue<T>. The value can be null for reference
| > types.</param>
| > /// <returns>true if item is found in the
| > System.Collections.Generic.Queue<T>; otherwise, false.</returns>
| > public bool Contains(T item)
| > {
| > lock ( syncRoot )
| > {
| > return q.Contains(item);
| > }
| > }
| >
| > /// <summary>
| > /// Copies the System.Collections.Generic.Queue<T> elements to
an
| > existing one-dimensional System.Array, starting at the specified array
| > index.
| > /// </summary>
| > /// <param name="array">The one-dimensional System.Array that is
the
| > destination of the elements
| > /// copied from System.Collections.Generic.Queue<T>. The
| > System.Array must have zero-based indexing.
| > /// </param>
| > /// <param name="arrayIndex">The zero-based index in array at
which
| > copying begins.</param>
| > public void CopyTo(T[] array, int arrayIndex)
| > {
| > lock ( syncRoot )
| > {
| > q.CopyTo(array, arrayIndex);
| > }
| > }
| >
| > public T[] ToArray()
| > {
| > lock ( syncRoot )
| > {
| > return q.ToArray();
| > }
| > }
| >
| > public IEnumerator<T> GetEnumerator()
| > {
| > return new BlockingQueue<T>.Enumerator(this, -1);
| > }
| > public IEnumerator<T> GetEnumerator(int millisecondsTimeout)
| > {
| > return new BlockingQueue<T>.Enumerator(this,
| > millisecondsTimeout);
| > }
| > IEnumerator IEnumerable.GetEnumerator()
| > {
| > return new BlockingQueue<T>.Enumerator(this, -1);
| > }
| >
| > /// <summary>
| > /// Sets the capacity to the actual number of elements in the
| > System.Collections.Generic.Queue<T>,
| > /// if that number is less than 90 percent of current capacity.
| > /// </summary>
| > public void TrimExcess()
| > {
| > lock ( syncRoot )
| > {
| > q.TrimExcess();
| > }
| > }
| >
| > /// <summary>
| > /// Removes and returns the object at the beginning of the
Queue.
| > /// </summary>
| > /// <returns>Object in queue.</returns>
| > public T Dequeue()
| > {
| > return Dequeue(Timeout.Infinite);
| > }
| >
| > /// <summary>
| > /// Removes and returns the object at the beginning of the
Queue.
| > /// </summary>
| > /// <param name="timeout">Time to wait before returning (in
| > milliseconds).</param>
| > /// <returns>Object in queue.</returns>
| > public T Dequeue(int millisecondsTimeout)
| > {
| > lock ( syncRoot )
| > {
| > while ( isOpened && (q.Count == 0) )
| > {
| > if ( ! Monitor.Wait(syncRoot, millisecondsTimeout) )
| > throw new TimeoutException("Operation timeout");
| > }
| >
| > if ( ! isOpened )
| > throw new InvalidOperationException("Queue closed");
| > return q.Dequeue();
| > }
| > }
| >
| > public bool TryDequeue(int millisecondsTimeout, out T value)
| > {
| > lock (syncRoot)
| > {
| > while (isOpened && (q.Count == 0))
| > {
| > if ( ! Monitor.Wait(syncRoot, millisecondsTimeout) )
| > {
| > value = default(T);
| > return false;
| > }
| > }
| >
| > if (! isOpened)
| > throw new InvalidOperationException("Queue closed");
| > value = q.Dequeue();
| > return true;
| > }
| > }
| >
| > /// <summary>
| > /// Returns the object at the beginning of the BlockingQueue<T>
| > /// without removing it.
| > /// </summary>
| > /// <returns>The object at the beginning of the
| > BlockingQueue<T>.</returns>
| > public T Peek()
| > {
| > return Peek(-1);
| > }
| >
| > /// <summary>
| > /// Returns the object at the beginning of the BlockingQueue<T>
| > /// without removing it.
| > /// </summary>
| > /// <returns>The object at the beginning of the
| > BlockingQueue<T>.</returns>
| > /// <param name="millisecondsTimeout">Time to wait before
returning
| > (in milliseconds).</param>
| > public T Peek(int millisecondsTimeout)
| > {
| > lock ( syncRoot )
| > {
| > while ( isOpened && (q.Count == 0) )
| > {
| > if ( ! Monitor.Wait(syncRoot, millisecondsTimeout) )
| > throw new TimeoutException("Operation timeout");
| > }
| >
| > if ( ! isOpened )
| > throw new InvalidOperationException("Queue closed");
| > return q.Peek();
| > }
| > }
| >
| > /// <summary>
| > /// Adds an object to the end of the Queue.
| > /// </summary>
| > /// <param name="obj">Object to put in queue.</param>
| > public void Enqueue(T item)
| > {
| > lock ( syncRoot )
| > {
| > if ( ! isOpened )
| > throw new InvalidOperationException("Queue closed");
| > q.Enqueue(item);
| > Monitor.Pulse(syncRoot); // Move 1 waiting thread to
the
| > "ready" queue in this monitor object.
| > } // Exiting lock will free thread(s) in the "ready" queue
for
| > this monitor object.
| > }
| >
| > [Serializable, StructLayout(LayoutKind.Sequential)]
| > public struct Enumerator : IEnumerator<T>, IDisposable,
IEnumerator
| > {
| > private BlockingQueue<T> q;
| > private IEnumerator<T> e;
| >
 
G

Guest

Thank you for your reply, Will.
Thana

William Stacey said:
Here is a non-generic version. hth

using System;
using System.Collections;
using System.Threading;

namespace WJS.Threading
{
/// <summary>
/// A blocking queue derived from Queue. The dequeue method will block
on an empty queue.
/// Enqueue operations will not block as queue is only bounded by array
size or MaxSize parameter.
/// <remarks>This class is thread safe for multiple consumer and
producer threads.</remarks>
/// </summary>
public class BlockingQueue : Queue
{
private bool opened = true;
private int maxSize = int.MaxValue;
private readonly object syncRoot = new object();

/// <summary>
/// Create new BlockingQueue.
/// </summary>
/// <param name="col">The System.Collections.ICollection to copy
elements from.</param>
public BlockingQueue(ICollection col) : base(col)
{
}

/// <summary>
/// Create new BlockingQueue.
/// </summary>
/// <param name="capacity">The initial number of elements that the
queue can contain.</param>
/// <param name="growFactor">The factor by which the capacity of the
queue is expanded.</param>
public BlockingQueue(int capacity, float growFactor) :
base(capacity, growFactor)
{
}

/// <summary>
/// Create new BlockingQueue.
/// </summary>
/// <param name="capacity">The initial number of elements that the
queue can contain.</param>
public BlockingQueue(int capacity) : base(capacity)
{
}

/// <summary>
/// Create new BlockingQueue.
/// </summary>
public BlockingQueue() : base()
{
}

/// <summary>
/// BlockingQueue Destructor (Close queue, resume any waiting
thread).
/// </summary>
~BlockingQueue()
{
Close();
}

/// <summary>
/// Gets or sets the maximum size of the queue. After MaxSize is
reached, an additional Enqueue operation will
/// throw an exception.
/// </summary>
public int MaxSize
{
get
{
lock ( syncRoot )
{
return this.maxSize;
}
}
set
{
lock ( syncRoot )
{
if ( value < 0 )
throw new ArgumentOutOfRangeException("MaxSize must
be >= 0.");
}
}
}

/// <summary>
/// Remove all objects from the Queue.
/// </summary>
public override void Clear()
{
lock ( syncRoot )
{
base.Clear();
}
}

/// <summary>
/// Remove all objects from the Queue, resume all dequeue threads.
/// </summary>
public void Close()
{
lock ( syncRoot )
{
if ( ! Opened )
return; // Already closed.

opened = false;
base.Clear();
Monitor.PulseAll(syncRoot); // resume any waiting threads
}
}

/// <summary>
/// Gets flag indicating if queue has been closed.
/// </summary>
public bool Opened
{
get
{
lock ( syncRoot )
{
return this.opened;
}
}
}

/// <summary>
/// Removes and returns the object at the beginning of the Queue.
/// </summary>
/// <returns>Object in queue.</returns>
public override object Dequeue()
{
return Dequeue(Timeout.Infinite);
}

/// <summary>
/// Removes and returns the object at the beginning of the Queue.
/// </summary>
/// <param name="timeout">time to wait before returning</param>
/// <returns>Object in queue.</returns>
public object Dequeue(TimeSpan timeout)
{
return Dequeue(timeout.Milliseconds);
}

/// <summary>
/// Removes and returns the object at the beginning of the Queue.
/// </summary>
/// <param name="timeout">Time to wait before returning (in
milliseconds)</param>
/// <returns>Object in queue.</returns>
public object Dequeue(int timeout)
{
lock ( syncRoot )
{
while ( Opened && (base.Count == 0) )
{
if ( ! Monitor.Wait(syncRoot, timeout) )
throw new InvalidOperationException("Timeout");
}

if ( Opened )
return base.Dequeue();
else
throw new InvalidOperationException("Queue closed");
}
}

/// <summary>
/// Adds an object to the end of the Queue.
/// </summary>
/// <param name="obj">Object to put in queue</param>
public override void Enqueue(object obj)
{
lock ( syncRoot )
{
if ( Opened )
throw new InvalidOperationException("Queue closed");
if ( base.Count >= this.maxSize )
throw new InvalidOperationException("Queue full. MaxSize
reached.");
base.Enqueue(obj);
Monitor.Pulse(syncRoot); // Move 1 waiting thread to ready
queue in this monitor object.
} // Exiting lock will start all threads in the ready queue for
this monitor object.
}
}
}


--
William Stacey [MVP]

| Thanks for your reply, but I've forgot to tell you all that I still using
| .NET 1.1 not 2.0. Any suggestion for 1.1.
|
| Rgrds,
| Thana N.
|
| "William Stacey [MVP]" wrote:
|
| > Couple issues here. First, don't use spin locks that way - big waste of
| > cpu. Moreover, the sync wrapper takes a lock for all methods and
| > properties. So the 2nd thread is taking an releasing the lock 4 times
for a
| > single dequeue operation. As Brian said, use a blocking queue for this
kind
| > of producer/consumer pattern. Here is my 1 lock blocking queue below.
| >
| > private void button5_Click(object sender, EventArgs e)
| > {
| > BlockingQueue<int> bq = new BlockingQueue<int>();
| >
| > new Thread(delegate()
| > {
| > int i;
| > while( bq.TryDequeue(200, out i) )
| > Console.WriteLine(i);
| > Console.WriteLine("Consumer thread completed.");
| > }).Start();
| >
| > for (int i = 0; i < 100; i++)
| > {
| > bq.Enqueue(i);
| > }
| >
| > }
| >
| > /// <summary>
| > /// Represents a first-in, first-out collection of objects.
| > /// </summary>
| > /// <typeparam name="T">Type of element queue will
contain.</typeparam>
| > public class BlockingQueue<T> : IEnumerable<T>, ICollection
| > {
| > private bool isOpened = true;
| > private readonly Queue<T> q;
| > private readonly object syncRoot = new object();
| >
| > /// <summary>
| > /// Initializes a new instance of the BlockingQueue class.
| > /// </summary>
| > public BlockingQueue()
| > {
| > q = new Queue<T>();
| > }
| >
| > /// <summary>
| > /// Initializes a new instance of the BlockingQueue class.
| > /// </summary>
| > /// <param name="capacity">The initial number of elements the
queue
| > can contain.</param>
| > public BlockingQueue(int capacity)
| > {
| > q = new Queue<T>(capacity);
| > }
| >
| > /// <summary>
| > /// Initializes a new instance of the BlockingQueue class.
| > /// </summary>
| > /// <param name="collection">A collection whose elements are
copied
| > to the new queue.</param>
| > public BlockingQueue(IEnumerable<T> collection)
| > {
| > q = new Queue<T>(collection);
| > }
| >
| > /// <summary>
| > /// Gets the number of elements in the queue.
| > /// </summary>
| > public int Count
| > {
| > get
| > {
| > lock ( syncRoot )
| > {
| > return q.Count;
| > }
| > }
| > }
| >
| > /// <summary>
| > /// Remove all objects from the BlockingQueue<T>.
| > /// </summary>
| > public void Clear()
| > {
| > lock ( syncRoot )
 
G

Greg Young

So port the code to not use generics .. shouldn't take much more than 1/2 an
hour to only do the queue :) (just use object instead of T)

Cheers,

Greg
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Top