Architecture advise wanted.

  • Thread starter Thread starter Frank Rizzo
  • Start date Start date
F

Frank Rizzo

I am not sure if this is the place, but here it goes...
I have a project where I accept many UDP packets from many different
sources (i.e. programs installed on about a thousand PCs). The packets
usually comes in a rapid fire succession.

What kind of architecture should I go for in order to not to drop any
packets? And where can I learn more about how UDP packets are
processed? For instance, if my process is busy, will the UDP packet sit
there (in the network stack or whereever else) waiting for the process
to get around to it?

Thanks
 
One way. Create a listener thread that listens for udp on port n. It grabs
packets and verifies them, etc and puts them into a queue (blocking queue,
etc.) the stack will queue to some extent then just drop anything new (I
think) once the winsock queue is full. One reason why udp is not gaurenteed
delivery (tcp on the other hand would resend the packet, etc.) The client
then needs to restransmit if it does not get a response using your own algo.
hth
 
William said:
One way. Create a listener thread that listens for udp on port n. It grabs
packets and verifies them, etc and puts them into a queue (blocking queue,
etc.) the stack will queue to some extent then just drop anything new (I
think) once the winsock queue is full. One reason why udp is not gaurenteed
delivery (tcp on the other hand would resend the packet, etc.) The client
then needs to restransmit if it does not get a response using your own algo.
hth
Thanks. That's what I settled on. The only thing that still bugs is
that I listen for UDP packets on a different thread (because
UdpClient.Receive is blocking), then I generate an event that is
consumed by the calling class. I am just not sure whether this setup is
thread-safe.

Thanks.
 
Not sure I follow. Why gen an event? Your listener will put the packet
object into the CQueue and Listen again. Your Consumer will wait until the
queue has an object and then go. So you don't need an event unless your
using for some other reason. You can get fancy with no-lock queues and two
lock queues (etc), but in the end I think a one lock blocking queue (not the
Framework queue) is about the best concidering all things and is relatively
easy to verify thread safety'ness and rw memory barrier issues. What is
your arch in more detail? tia
 
William said:
Not sure I follow. Why gen an event? Your listener will put the packet
object into the CQueue and Listen again. Your Consumer will wait until the
queue has an object and then go. So you don't need an event unless your
using for some other reason. You can get fancy with no-lock queues and two
lock queues (etc), but in the end I think a one lock blocking queue (not the
Framework queue) is about the best concidering all things and is relatively
easy to verify thread safety'ness and rw memory barrier issues. What is
your arch in more detail? tia

Well, the problem is that the application must be doing many things
(accepting UDP connections from UDP clients, accepting TCP/IP
connections from other types of clients, pushing the data out to TCP
clients, etc...) so I can't have stuff waiting around. So I have the
following code listening for UDP (below). If I go about it as you
describe (i.e. have a shared CQueue collection), that means that the
Consumer class would have to be polling it constantly. With an event
you get the info instantly. It seems to me a good design, except that I
am not sure whether the events in .NET can safely jump between threads.
So far, they have, but I haven't exposed my app to a heavy load. When
I roll it out, it will be under constant barrage of UDP packets 24/7.





Public Shared Function UDP_Listen(ByVal Port As Integer) As Boolean
Try
UDP_Server_Port = Port
UDP_Server = New UdpClient(Port)
thdUdp = New Thread(AddressOf GetUDPData)
thdUdp.Start()
Catch e As Exception
RaiseEvent Sock_Error(e.ToString)
End Try

End Function

Private Shared Sub GetUDPData()
Do While True
Try
Dim RemoteIpEndPoint As New IPEndPoint(IPAddress.Any, 0)
Dim sMessage As String =
Encoding.Unicode.GetString(UDP_Server.Receive(RemoteIpEndPoint))
RaiseEvent DataArrival(sMessage,
RemoteIpEndPoint.Address.ToString)
If sMessage = "CloseMe" Then Exit Do
Thread.Sleep (0)
Catch e As Exception
RaiseEvent Sock_Error(e.ToString)
End Try
Loop
End Sub
 
Well, the problem is that the application must be doing many things
(accepting UDP connections from UDP clients, accepting TCP/IP
connections from other types of clients, pushing the data out to TCP
clients, etc...) so I can't have stuff waiting around.

That I one reason I suggested the queue. Maybe I should expand (lets talk
udp for now).
1) Create a Listener Object and put Start() and Stop() methods on it. You
can listen to all IPs or selected IPs with multiple Listener Objects. Each
object will Contain its own tread.
2) Create your Server Object. This will read the network objects and
process them. Your main logic.
3) Add a *blocking* output queue. The Listener will output objects into
the queue. The Server Object will read objects from this queue.
4) Both the Listener will block/wait on full queue and Server will block on
empty queue. So no polling involved - very efficient.
5) The Server will make reply object and put into some output queue. Your
Listener could also be your Sender, or you can have another thread/object
for that job. You can share the Socket on different threads as long as one
reads and other writes. So you can have sender thread in your listener or
anothe object.
6) This shared queue works well as you can add Listener objects (say
listening on different IP addresses) sending objects to same output queue.
Your server object will just pick them out of the queue without knowing how
many Listeners are working. You can also "inject" objects into the queue
from out-of-band sources like Mgmt, etc. So it is flexible.

I have done just this in a upd/tcp server I am working on, so know it works
well.
Hope that makes some sense. Please post back if not. Cheers!
--wjs mvp
 
William said:
That I one reason I suggested the queue. Maybe I should expand (lets talk
udp for now).
1) Create a Listener Object and put Start() and Stop() methods on it. You
can listen to all IPs or selected IPs with multiple Listener Objects. Each
object will Contain its own tread.
2) Create your Server Object. This will read the network objects and
process them. Your main logic.
3) Add a *blocking* output queue. The Listener will output objects into
the queue. The Server Object will read objects from this queue.
4) Both the Listener will block/wait on full queue and Server will block on
empty queue. So no polling involved - very efficient.
5) The Server will make reply object and put into some output queue. Your
Listener could also be your Sender, or you can have another thread/object
for that job. You can share the Socket on different threads as long as one
reads and other writes. So you can have sender thread in your listener or
anothe object.
6) This shared queue works well as you can add Listener objects (say
listening on different IP addresses) sending objects to same output queue.
Your server object will just pick them out of the queue without knowing how
many Listeners are working. You can also "inject" objects into the queue
from out-of-band sources like Mgmt, etc. So it is flexible.

I have done just this in a upd/tcp server I am working on, so know it works
well.
Hope that makes some sense. Please post back if not. Cheers!
--wjs mvp
The design makes sense, but I am not understanding how the listener will
let the Server object know that a new message has arrived.
I am assuming that in the Listener you do the following after you have
received the message.

SyncLock
cQueue.Add NewMessage
End SyncLock

But how does the Server object know that something has been added to the
queue?

Thanks
But
 
The design makes sense, but I am not understanding how the listener will
let the Server object know that a new message has arrived.
I am assuming that in the Listener you do the following after you have
received the message.

No explicit signal from your code per se. Signal/Wait logic is built *into
a Blocking Bounded Queue (BBQ) implementation. The Enqueue and Dequeue
method should do the right things with your threads. Check out my Bounded
Blocking One Lock Queue. Note I have done some ad-hoc changes on this and
have not did heavy testing for correctness, but should be close. Please let
me know if you see a thread issue or other. This will allow unlimited
Consumers and Producer threads in thread safe manner. Note unbounded queues
are much easer to implement but do not give you the blocking behavior you
need in a Consumer/Producer deal and allow unbounded range which could run
you out of memory and allow a producer to run wild. Anyway, here it is...

using System;
using System.Threading;
using System.Collections;

namespace Queues
{
/// <summary>
/// Author: William Stacey
/// Modified Date: 08/03/2004
/// My One Lock Bounded Blocking Queue (e.g. Bounded Buffer.)
/// Generally the best blocking queue for normal usage on single and
multi-cpu.
/// </summary>
public class OneLockQueue : ICollection
{
#region Fields
private object[] buffer;
private int count;
private int size;
private int head;
private int tail;
private readonly object syncRoot;
#endregion

#region Constructors
/// <summary>
/// Create instance of Queue with Bounded number of elements. After that
/// many elements are used, another Enqueue operation will "block" or wait
/// until a Consumer calls Dequeue to free a slot. Likewise, if the queue
/// is empty, a call to Dequeue will block until another thread calls
/// Enqueue.
/// </summary>
/// <param name="size"></param>
public OneLockQueue(int size)
{
syncRoot = new object();
this.size = size;
buffer = new object[size];
count = 0;
head = 0;
tail = 0;
}
#endregion

#region Properties
/// <summary>
/// Gets the object values currently in the queue. If queue is empty,
this
/// will return a zero length array. The returned array lenght can be
/// 0 to Size.
/// </summary>
public object[] Values
{
get
{
// Copy used elements to a new array of "count" size. Note a simple
// Buffer copy will not work as head could be anywhere and we want
// a zero based array.
object[] values;
lock(syncRoot)
{
values = new object[count];
int pos = head;
for(int i = 0; i < count; i++)
{
values = buffer[pos];
pos = (pos + 1) % size;
}
}
return values;
}
}

#endregion

#region Public Methods
/// <summary>
/// Enqueue "value" object into queue. If queue is full, this method will
/// block until another thread calls one of the Dequeue methods.
/// </summary>
/// <param name="value"></param>
public void Enqueue(object value)
{
lock(syncRoot)
{
while(count == size)
{
try
{
Monitor.Wait(syncRoot);
}
catch
{
Monitor.PulseAll(syncRoot);
throw;
}
}
buffer[tail] = value;
tail = (tail + 1) % size;
count++;
if ( count == 1 ) // Could have blocking Dequeue thread(s).
Monitor.PulseAll(syncRoot);
}
}

/// <summary>
/// Non-blocking version of Enqueue(). If Enqueue is successfull, this
will
/// return true, other false if queue is full.
/// </summary>
/// <param name="value"></param>
/// <returns>true if successfull, otherwise false.</returns>
public bool TryEnqueue(object value)
{
lock(syncRoot)
{
if ( count == size )
return false;
buffer[tail] = value;
tail = (tail + 1) % size;
count++;
if ( count == 1 ) // Could have blocking Dequeue thread(s).
Monitor.PulseAll(syncRoot);
}
return true;
}

/// <summary>
/// Removes and returns the object at the beginning of the Queue.
/// If queue is empty, method will block until another thread calls one of
/// the Enqueue methods.
/// </summary>
/// <returns>The object that is removed from the beginning of the
Queue.</returns>
public object Dequeue()
{
object value;
lock(syncRoot)
{
while(count == 0)
{
try
{
Monitor.Wait(syncRoot);
}
catch
{
Monitor.PulseAll(syncRoot);
throw;
}
}
value = buffer[head];
buffer[head] = null;
head = (head + 1) % size;
count--;
if ( count == (size - 1) ) // Could have blocking Enqueue thread(s).
Monitor.PulseAll(syncRoot);
}
return value;
}

/// <summary>
/// Non-blocking version of Dequeue. Will return false if queue is empty
and set
/// value to null, otherwise will return true and set value to the
dequeued object.
/// </summary>
/// <param name="value">The object that is removed from the beginning of
the Queue or null if empty.</param>
/// <returns>true if successfull, otherwise false.</returns>
public bool TryDequeue(out object value)
{
lock(syncRoot)
{
if ( count == 0 )
{
value = null;
return false;
}
value = buffer[head];
buffer[head] = null;
head = (head + 1) % size;
count--;
if ( count == (size - 1) ) // Could have blocking Enqueue thread(s).
Monitor.PulseAll(syncRoot);
}
return true;
}

/// <summary>
/// Returns the object at the beginning of the Queue without removing it.
/// </summary>
/// <returns>The object at the beginning of the Queue.</returns>
/// <exception cref="InvalidOpertionException">The Queue is
empty.</exception>
public object Peek()
{
lock(syncRoot)
{
if ( count == 0 )
throw new InvalidOperationException("The Queue is empty.");

object value = buffer[head];
return value;
}
}

/// <summary>
/// Returns the object at the beginning of the Queue without removing it.
/// Contrary to the Peek method, this method will not throw exception if
/// queue is empty, but instead will return false.
/// </summary>
/// <param name="value">The object at the beginning of the Queue or null
if empty.</param>
/// <returns>The object at the beginning of the Queue.</returns>
public bool TryPeek(out object value)
{
lock(syncRoot)
{
if ( count == 0 )
{
value = null;
return false;
}
value = buffer[head];
}
return true;
}

#endregion

#region ICollection Members
public bool IsSynchronized
{
get { return true; }
}

public int Size
{
get { return this.size; }
}

public int Count
{
get { lock(syncRoot) { return count; } }
}

public void CopyTo(Array array, int index)
{
object[] tmpArray = Values;
tmpArray.CopyTo(array, index);
}

public object SyncRoot
{
get { return this.syncRoot; }
}
#endregion

#region IEnumerable Members
public IEnumerator GetEnumerator()
{
throw new NotImplementedException("Not Implemented.");
}
#endregion
} // End OneLockQueue
}


Usage Example:
------------------------
OneLockQueue olq = new OneLockQueue(5);
while(olq.TryEnqueue("two") == true)
{
Console.WriteLine("Enqueued:");
}
object obj;
while(olq.TryDequeue(out obj))
{
Console.WriteLine("Dequeued:");
}

HTH
 
oops. Naturally a range check in the constructor would be most helpfull :)

if ( size < 1 )
throw new ArgumentOutOfRangeException("size must be greater then
zero.");
 
William said:
oops. Naturally a range check in the constructor would be most helpfull :)

if ( size < 1 )
throw new ArgumentOutOfRangeException("size must be greater then
zero.");
Wow, I'll have to absorb all that. I thought, you were talking about a
standard collection like an ArrayList.
I'll check it out.
 
Sounds good. Naturally an ArrayList wont work as it does not give you the
blocking symantics. You could wrap it in code such as what I showed, but
you don't need/want dynamic expansion for a bounded queue as the blocking is
actually what we desire here. Let me know. Cheers!
 
Back
Top