locking an int

D

Dan Holmes

I have TCP code that is incrementing an int for each accept socket. I tried to use lock(int) for synchronization but
that doesn't work. I have settled on declaring the int as volatile. Will that work? Based on what i read about
volatile this could work.

This is a poor man's load balancing app. The clients contact with this app and this app is just going to round-robin
through the actual doers of stuff.

using SNS = System.Net.Sockets;
private const int Initial_Buffer_Size = 65536;
private SNS.TcpListener listener = null;
//this is incremented before the execution that is why it is initialized to -1 instead of 0.
private volatile int nextServantIndex = -1;

private void DoAcceptTcpClientCallback(IAsyncResult ar)
{
try
{
SNS.TcpListener listener = (SNS.TcpListener)ar.AsyncState;
SNS.TcpClient clientSckt = listener.EndAcceptTcpClient(ar);
listener.BeginAcceptTcpClient(new AsyncCallback(DoAcceptTcpClientCallback), listener);
byte[] readbuffer = new byte[Initial_Buffer_Size];
SNS.NetworkStream ns = clientSckt.GetStream();
StringBuilder message = new StringBuilder();
int bytesread = ns.Read(readbuffer, 0, readbuffer.Length);
while (bytesread > 0)
{
message.Append(Encoding.UTF8.GetString(readbuffer, 0, bytesread));
bytesread = ns.Read(readbuffer, 0, readbuffer.Length);
}
if (message.Length > 0)
{
if (Properties.Settings.Default.RouterMode)
{
//lock the int. since this is async IO more than 1 IO could be at this point
//need to make sure that the variable only gets accessed in a serial manner.
//move to the next servant
if (++nextServantIndex >= Properties.Settings.Default.Servants.Count)
nextServantIndex = 0;
 
P

Paul

Why not use threading.

Use a static dictionary with lock and use IP/port as a key.....
 
P

Peter Duniho

I have TCP code that is incrementing an int for each accept socket. I
tried to use lock(int) for synchronization but that doesn't work.

Right. Locking on value types is a bad idea. You wind up locking on a
boxed instance of the value type, but you get a brand new boxed instance
every time you do that.
I have settled on declaring the int as volatile. Will that work? Based
on what i read about volatile this could work.

No, volatile won't work. It is possible for two (or more!) different
threads to both attempt to access the variable and increment it at the
same time, in such a way that each thread sees the same initial value and
thus winds up writing the same new value. So the outcome of N threads
operating on the variable, where N > 1, could be exactly the same as if
just one thread had operated on the variable. That is, when the variable
should have been incremented by N, it will only be incremented by 1.

The simplest solution is to lock using a separate object instance for the
purpose of locking. Even with reference types (which you can use for
locking), it is generally better to maintain a private, simple object for
locking. That way, you eliminate at least one potential deadlock risk:
some other code using the same reference for locking that you're using.

In some cases, you could instead use the System.Threading.Interlocked
class serves your need. It provides atomic access to integer variables.
But here, you appear to want to be able to assign a new value to the
variable based on the outcome of the increment. It's possible to write
code that uses the Interlocked class to do that, but it would involve some
polling of the variable to deal with race conditions. IMHO, it's probably
not worth the trouble, at least not as the initial solution (it might
provide a worthwhile performance improvement, but you'd have to profile it
to be sure, and it's a lot more complicated to implement).

Pete
 
P

Paul

Probably best using a Mutex....

But the TP does not need to even create this key there is a natural one
provided for him use it.
 
P

Peter Duniho

Probably best using a Mutex....

IMHO, if one is going to lock, the "lock" statement is the most
appropriate.
But the TP does not need to even create this key there is a natural one
provided for him use it.

I'm not sure what "natural key" you're talking about. The best object to
use for a lock would be an instance of System.Object created for the
express purpose.

If you're saying that the int itself is unneeded, I agree...that may very
well be. What little code we did see seems potentially awkward if not
outright flawed from a design standpoint. But the OP didn't show enough
code for us to know for sure; it could just be that we're not seeing the
whole picture.

Pete
 
B

Ben Voigt [C++ MVP]

In some cases, you could instead use the System.Threading.Interlocked
class serves your need. It provides atomic access to integer variables.
But here, you appear to want to be able to assign a new value to the
variable based on the outcome of the increment. It's possible to write
code that uses the Interlocked class to do that, but it would involve some
polling of the variable to deal with race conditions. IMHO, it's probably
not worth the trouble, at least not as the initial solution (it might
provide a worthwhile performance improvement, but you'd have to profile it
to be sure, and it's a lot more complicated to implement).

Basically:

//atomic increment modulo N

int oldValue, newValue;

do {
oldValue = var;
newValue = (oldValue + 1) % N;
} while (oldValue != Interlocked.CompareExchange(ref var, newValue,
oldValue));

See http://msdn.microsoft.com/en-us/library/801kt583.aspx
 
P

Paul

Maybe the way I read it but I assumed he was using int to identify current
client sessions, Which could be done via IP and Port number.......just read
your bottom bit so will shutup and agree... :)
 
D

Dan Holmes

Peter said:
If you're saying that the int itself is unneeded, I agree...that may
very well be. What little code we did see seems potentially awkward if
not outright flawed from a design standpoint. But the OP didn't show
enough code for us to know for sure; it could just be that we're not
seeing the whole picture.

Pete

If my code is bad i want to know. Here is the whole thing. This is a service. It works and has been for a while so
whatever flaw must be drawn out by special circumstances that i haven't encountered yet. There isn't anything secret
going on here.

I have written the client side too, so i know what inputs are expected.


using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Diagnostics;
using System.ServiceProcess;
using System.Text;
using SNS = System.Net.Sockets;
namespace RouteMatch.RMQS
{
public partial class RMQSService : ServiceBase
{
//initial size of the read buffer
private const int Initial_Buffer_Size = 65536;
private SNS.TcpListener listener = null;
//this is incremented before the execution that is why it is initialized to -1 instead of 0.
private int nextServantIndex = -1;
private ClientState nextServantIndexSerializeObject = new ClientState(null, null);
private Dictionary<string, ServiceAdapterInterface> serviceAdapterPool = new Dictionary<string,
ServiceAdapterInterface>();
public RMQSService()
{
InitializeComponent();
}

protected override void OnStart(string[] args)
{
try
{
eventLog1.Log = "Application";
eventLog1.Source = "RMQS";
listener = new SNS.TcpListener(System.Net.IPAddress.Any, Properties.Settings.Default.Port);
listener.Start();
listener.BeginAcceptTcpClient(new AsyncCallback(DoAcceptTcpClientCallback), listener);
}
catch (Exception ex)
{
if (eventLog1 != null)
eventLog1.WriteEntry(ex.ToString(), EventLogEntryType.Error);
}
}

private void DoAcceptTcpClientCallback(IAsyncResult ar)
{
try
{
SNS.TcpListener listener = (SNS.TcpListener)ar.AsyncState;
SNS.TcpClient clientSckt = listener.EndAcceptTcpClient(ar);
listener.BeginAcceptTcpClient(new AsyncCallback(DoAcceptTcpClientCallback), listener);
byte[] readbuffer = new byte[Initial_Buffer_Size];
SNS.NetworkStream ns = clientSckt.GetStream();
StringBuilder message = new StringBuilder();
int bytesread = ns.Read(readbuffer, 0, readbuffer.Length);
while (bytesread > 0)
{
message.Append(Encoding.UTF8.GetString(readbuffer, 0, bytesread));
bytesread = ns.Read(readbuffer, 0, readbuffer.Length);
}
if (message.Length > 0)
{
if (Properties.Settings.Default.RouterMode)
{
//lock the int. since this is async IO more than 1 IO could be at this point
//need to make sure that the variable only gets accessed in a serial manner.
//move to the next servant
string servername = string.Empty;
int port = 0;
lock (nextServantIndexSerializeObject)
{
if (++nextServantIndex >= Properties.Settings.Default.Servants.Count)
nextServantIndex = 0;
servername = Properties.Settings.Default.Servants[nextServantIndex].ServerName;
port = Properties.Settings.Default.Servants[nextServantIndex].Port;
}
SNS.TcpClient qsServant = new SNS.TcpClient();
qsServant.Connect(servername , port);
//write client's incoming message to servant
byte[] writebuffer = Encoding.ASCII.GetBytes(message.ToString());
SNS.NetworkStream servant_ns = qsServant.GetStream();
servant_ns.Write(writebuffer, 0, writebuffer.Length);
qsServant.Client.Shutdown(System.Net.Sockets.SocketShutdown.Send);
//read from servant and send to client
bytesread = servant_ns.Read(readbuffer, 0, readbuffer.Length);
while (bytesread > 0)
{
clientSckt.GetStream().Write(readbuffer, 0, bytesread);
bytesread = ns.Read(readbuffer, 0, readbuffer.Length);
}
qsServant.Client.Shutdown(System.Net.Sockets.SocketShutdown.Both);
qsServant.Close();
}
else
{
ClientState cs = new ClientState(clientSckt, message.ToString());
System.Threading.ThreadPool.QueueUserWorkItem(
new System.Threading.WaitCallback(ProcessMessage), cs
);
}
}
else
{
clientSckt.Client.Shutdown(System.Net.Sockets.SocketShutdown.Both);
clientSckt.Close();
}
}
catch (Exception ex)
{
eventLog1.WriteEntry(ex.ToString(), EventLogEntryType.Error);
}
}

/// <summary>
/// ths is where all the "work" gets done.
///
/// The client state consists of the message and the client socket. that allows this proc
/// to responsd to the caller.
///
/// Because right now Rse is not reentrant the dictionary that keeps the Rse objects is
/// locked so that simultaneous access is disallowed.
/// </summary>
/// <param name="clientState"></param>
/// <exception cref="RMQSMessageException"/>
/// <exception cref="RMQSInvalidCommandValueException"/>
private void ProcessMessage(object clientState)
{
RMQSMessage question= null;
RMQSResponse answer = null;
ClientState state = null;
try
{
state = (ClientState)clientState;
question = new RMQSMessage();
question.BuildMessage(state.Message);
ServiceAdapterInterface serviceInterface = null;
//we key the compool by the agency and the database because there could be a
//training database in use. that data could be different
//the message target is there as well so that different modules would be allowed
//to work simultaneously. right now they are all in the same pool. if another message target
//was created then it would be good to add a target pool so that only the objects of the same
//target of locked. messages to different targets would then be able to work in parallel.
string comPoolKey = question.AgencyName + "|" + question.DatabaseName + "|" + question.MessageTarget;
//this lock essentially places all queued objects in line. The first one spawned is the
//first one to work. The other then back up behind and wait for the lock to be released.
//when Rse is re-entrant this lock can be moved such that the processing isn't included in
//the lock. that will allow the TreadPool to operate as expected.
lock (serviceAdapterPool)
{
if (!serviceAdapterPool.ContainsKey(comPoolKey))
{
//this is the only line in the server that references the RSE code.
//when/if there is a reason this code will be made to read from a file
//dynamically load the class connected with the MessageTarget.
//it would also be possible to load each of those in their own app domain
//so process isolation could be enabled. this might be a solution to the FIFO queue
//that is here.
//also note that each dynamically loaded service will need to implement ServiceAdapterInterface.
ServiceAdapterInterface newAdapter = (ServiceAdapterInterface)new
RouteMatch.TS.Rse.RseAdapterContext(); ;
newAdapter.Initialize(question);
serviceAdapterPool.Add(comPoolKey, newAdapter);
}
serviceInterface = serviceAdapterPool[comPoolKey];
answer = serviceInterface.HandleMessage(question);
}
}
catch (RMQSMessageException ex)
{
if (question == null)
answer = new RMQSError("", ex.ErrorCode, ex.Message);
else
answer = new RMQSError(question, ex.ErrorCode, ex.Message);
}
catch (Exception ex)
{
if (question == null)
answer = new RMQSError("", -99, ex.Message);
else
answer = new RMQSError(question, -99, ex.Message);
}
SNS.NetworkStream ns = state.ClientSocket.GetStream();
if (ns.CanWrite)
{
byte[] writebuffer = answer.ToByteArray();
ns.Write(writebuffer, 0, writebuffer.Length);
state.ClientSocket.Client.Shutdown(System.Net.Sockets.SocketShutdown.Send);
}
else
eventLog1.WriteEntry("Couldn't write");

state.ClientSocket.Close();
}
/// <summary>
/// Used to hold state for the message and is placed in the treadpool
/// </summary>
private class ClientState
{
public ClientState(SNS.TcpClient clientSocket, string message)
{
Message = message;
ClientSocket = clientSocket;
}
private string _message;

public string Message
{
get { return _message; }
set { _message = value; }
}
private SNS.TcpClient _socket;

public SNS.TcpClient ClientSocket
{
get { return _socket; }
set { _socket = value; }
}
}
}
public class QsServant
{
private string _serverName;

public string ServerName
{
get { return _serverName; }
set { _serverName = value; }
}
private int _port;

public int Port
{
get { return _port; }
set { _port = value; }
}
}
[Serializable]
public class QsServants : List<QsServant>
{
public QsServants() : base() { }
public QsServants(IEnumerable<QsServant> collection) : base(collection) { }
public QsServants(int capacity) : base(capacity) { }
}
}
 
P

Paul

Dan,

Is there any reason why you did not use WCF or standard Web Services, or
even just message queuing?

Why not implement the count in QsServant and implement the locking method
that Peter suggested earlier on, that I then posted an example for,
alrternatively use the count from QsServants??

Your code is complex so without background it is hard to know what your
objectives are.


Dan Holmes said:
Peter said:
If you're saying that the int itself is unneeded, I agree...that may very
well be. What little code we did see seems potentially awkward if not
outright flawed from a design standpoint. But the OP didn't show enough
code for us to know for sure; it could just be that we're not seeing the
whole picture.

Pete

If my code is bad i want to know. Here is the whole thing. This is a
service. It works and has been for a while so whatever flaw must be drawn
out by special circumstances that i haven't encountered yet. There isn't
anything secret going on here.

I have written the client side too, so i know what inputs are expected.


using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Diagnostics;
using System.ServiceProcess;
using System.Text;
using SNS = System.Net.Sockets;
namespace RouteMatch.RMQS
{
public partial class RMQSService : ServiceBase
{
//initial size of the read buffer
private const int Initial_Buffer_Size = 65536;
private SNS.TcpListener listener = null;
//this is incremented before the execution that is why it is
initialized to -1 instead of 0.
private int nextServantIndex = -1;
private ClientState nextServantIndexSerializeObject = new
ClientState(null, null);
private Dictionary<string, ServiceAdapterInterface>
serviceAdapterPool = new Dictionary<string, ServiceAdapterInterface>();
public RMQSService()
{
InitializeComponent();
}

protected override void OnStart(string[] args)
{
try
{
eventLog1.Log = "Application";
eventLog1.Source = "RMQS";
listener = new SNS.TcpListener(System.Net.IPAddress.Any,
Properties.Settings.Default.Port);
listener.Start();
listener.BeginAcceptTcpClient(new
AsyncCallback(DoAcceptTcpClientCallback), listener);
}
catch (Exception ex)
{
if (eventLog1 != null)
eventLog1.WriteEntry(ex.ToString(),
EventLogEntryType.Error);
}
}

private void DoAcceptTcpClientCallback(IAsyncResult ar)
{
try
{
SNS.TcpListener listener = (SNS.TcpListener)ar.AsyncState;
SNS.TcpClient clientSckt =
listener.EndAcceptTcpClient(ar);
listener.BeginAcceptTcpClient(new
AsyncCallback(DoAcceptTcpClientCallback), listener);
byte[] readbuffer = new byte[Initial_Buffer_Size];
SNS.NetworkStream ns = clientSckt.GetStream();
StringBuilder message = new StringBuilder();
int bytesread = ns.Read(readbuffer, 0, readbuffer.Length);
while (bytesread > 0)
{
message.Append(Encoding.UTF8.GetString(readbuffer, 0,
bytesread));
bytesread = ns.Read(readbuffer, 0, readbuffer.Length);
}
if (message.Length > 0)
{
if (Properties.Settings.Default.RouterMode)
{
//lock the int. since this is async IO more than
1 IO could be at this point
//need to make sure that the variable only gets
accessed in a serial manner.
//move to the next servant
string servername = string.Empty;
int port = 0;
lock (nextServantIndexSerializeObject)
{
if (++nextServantIndex >=
Properties.Settings.Default.Servants.Count)
nextServantIndex = 0;
servername =
Properties.Settings.Default.Servants[nextServantIndex].ServerName;
port =
Properties.Settings.Default.Servants[nextServantIndex].Port;
}
SNS.TcpClient qsServant = new SNS.TcpClient();
qsServant.Connect(servername , port);
//write client's incoming message to servant
byte[] writebuffer =
Encoding.ASCII.GetBytes(message.ToString());
SNS.NetworkStream servant_ns =
qsServant.GetStream();
servant_ns.Write(writebuffer, 0,
writebuffer.Length);

qsServant.Client.Shutdown(System.Net.Sockets.SocketShutdown.Send);
//read from servant and send to client
bytesread = servant_ns.Read(readbuffer, 0,
readbuffer.Length);
while (bytesread > 0)
{
clientSckt.GetStream().Write(readbuffer, 0,
bytesread);
bytesread = ns.Read(readbuffer, 0,
readbuffer.Length);
}

qsServant.Client.Shutdown(System.Net.Sockets.SocketShutdown.Both);
qsServant.Close();
}
else
{
ClientState cs = new ClientState(clientSckt,
message.ToString());
System.Threading.ThreadPool.QueueUserWorkItem(
new
System.Threading.WaitCallback(ProcessMessage), cs
);
}
}
else
{

clientSckt.Client.Shutdown(System.Net.Sockets.SocketShutdown.Both);
clientSckt.Close();
}
}
catch (Exception ex)
{
eventLog1.WriteEntry(ex.ToString(),
EventLogEntryType.Error);
}
}

/// <summary>
/// ths is where all the "work" gets done.
///
/// The client state consists of the message and the client
socket. that allows this proc
/// to responsd to the caller.
///
/// Because right now Rse is not reentrant the dictionary that
keeps the Rse objects is
/// locked so that simultaneous access is disallowed.
/// </summary>
/// <param name="clientState"></param>
/// <exception cref="RMQSMessageException"/>
/// <exception cref="RMQSInvalidCommandValueException"/>
private void ProcessMessage(object clientState)
{
RMQSMessage question= null;
RMQSResponse answer = null;
ClientState state = null;
try
{
state = (ClientState)clientState;
question = new RMQSMessage();
question.BuildMessage(state.Message);
ServiceAdapterInterface serviceInterface = null;
//we key the compool by the agency and the database
because there could be a
//training database in use. that data could be different
//the message target is there as well so that different
modules would be allowed
//to work simultaneously. right now they are all in the
same pool. if another message target
//was created then it would be good to add a target pool
so that only the objects of the same
//target of locked. messages to different targets would
then be able to work in parallel.
string comPoolKey = question.AgencyName + "|" +
question.DatabaseName + "|" + question.MessageTarget;
//this lock essentially places all queued objects in line.
The first one spawned is the
//first one to work. The other then back up behind and
wait for the lock to be released.
//when Rse is re-entrant this lock can be moved such that
the processing isn't included in
//the lock. that will allow the TreadPool to operate as
expected.
lock (serviceAdapterPool)
{
if (!serviceAdapterPool.ContainsKey(comPoolKey))
{
//this is the only line in the server that
references the RSE code.
//when/if there is a reason this code will be made
to read from a file
//dynamically load the class connected with the
MessageTarget.
//it would also be possible to load each of those
in their own app domain
//so process isolation could be enabled. this
might be a solution to the FIFO queue
//that is here.
//also note that each dynamically loaded service
will need to implement ServiceAdapterInterface.
ServiceAdapterInterface newAdapter =
(ServiceAdapterInterface)new RouteMatch.TS.Rse.RseAdapterContext(); ;
newAdapter.Initialize(question);
serviceAdapterPool.Add(comPoolKey, newAdapter);
}
serviceInterface = serviceAdapterPool[comPoolKey];
answer = serviceInterface.HandleMessage(question);
}
}
catch (RMQSMessageException ex)
{
if (question == null)
answer = new RMQSError("", ex.ErrorCode, ex.Message);
else
answer = new RMQSError(question, ex.ErrorCode,
ex.Message);
}
catch (Exception ex)
{
if (question == null)
answer = new RMQSError("", -99, ex.Message);
else
answer = new RMQSError(question, -99, ex.Message);
}
SNS.NetworkStream ns = state.ClientSocket.GetStream();
if (ns.CanWrite)
{
byte[] writebuffer = answer.ToByteArray();
ns.Write(writebuffer, 0, writebuffer.Length);

state.ClientSocket.Client.Shutdown(System.Net.Sockets.SocketShutdown.Send);
}
else
eventLog1.WriteEntry("Couldn't write");

state.ClientSocket.Close();
}
/// <summary>
/// Used to hold state for the message and is placed in the
treadpool
/// </summary>
private class ClientState
{
public ClientState(SNS.TcpClient clientSocket, string message)
{
Message = message;
ClientSocket = clientSocket;
}
private string _message;

public string Message
{
get { return _message; }
set { _message = value; }
}
private SNS.TcpClient _socket;

public SNS.TcpClient ClientSocket
{
get { return _socket; }
set { _socket = value; }
}
}
}
public class QsServant
{
private string _serverName;

public string ServerName
{
get { return _serverName; }
set { _serverName = value; }
}
private int _port;

public int Port
{
get { return _port; }
set { _port = value; }
}
}
[Serializable]
public class QsServants : List<QsServant>
{
public QsServants() : base() { }
public QsServants(IEnumerable<QsServant> collection) :
base(collection) { }
public QsServants(int capacity) : base(capacity) { }
}
}
 
D

Dan Holmes

Paul said:
Dan,

Is there any reason why you did not use WCF or standard Web Services, or
even just message queuing?

Yes, my boss wanted it TCP messaging only.
Why not implement the count in QsServant and implement the locking method
that Peter suggested earlier on, that I then posted an example for,
alrternatively use the count from QsServants??

I am not sure i follow. Not only do i need to increment the value i need to reset it to zero when it is higher than the
number of objects in my collection. I didn't see any example that locked the increment and the reset the value when
necessary. Perhaps an example would help me.
Your code is complex so without background it is hard to know what your
objectives are.

This code is receiving a message and sending it to non-reentrant C++ code. That intentional locking is causing
performance issues for high message volumes. A router mode was created. More than one copy of the service will now be
running with one of them in router modes. The rest are servants. The client still only talks to the one instance and
doesn't know about the task delegation. The delegation is done in a round-robin, hence the collection of QsServants and
the need to reset the index to zero when the value is higher than the collection count.
Dan Holmes said:
Peter said:
If you're saying that the int itself is unneeded, I agree...that may very
well be. What little code we did see seems potentially awkward if not
outright flawed from a design standpoint. But the OP didn't show enough
code for us to know for sure; it could just be that we're not seeing the
whole picture.

Pete
If my code is bad i want to know. Here is the whole thing. This is a
service. It works and has been for a while so whatever flaw must be drawn
out by special circumstances that i haven't encountered yet. There isn't
anything secret going on here.

I have written the client side too, so i know what inputs are expected.


using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Diagnostics;
using System.ServiceProcess;
using System.Text;
using SNS = System.Net.Sockets;
namespace RouteMatch.RMQS
{
public partial class RMQSService : ServiceBase
{
//initial size of the read buffer
private const int Initial_Buffer_Size = 65536;
private SNS.TcpListener listener = null;
//this is incremented before the execution that is why it is
initialized to -1 instead of 0.
private int nextServantIndex = -1;
private ClientState nextServantIndexSerializeObject = new
ClientState(null, null);
private Dictionary<string, ServiceAdapterInterface>
serviceAdapterPool = new Dictionary<string, ServiceAdapterInterface>();
public RMQSService()
{
InitializeComponent();
}

protected override void OnStart(string[] args)
{
try
{
eventLog1.Log = "Application";
eventLog1.Source = "RMQS";
listener = new SNS.TcpListener(System.Net.IPAddress.Any,
Properties.Settings.Default.Port);
listener.Start();
listener.BeginAcceptTcpClient(new
AsyncCallback(DoAcceptTcpClientCallback), listener);
}
catch (Exception ex)
{
if (eventLog1 != null)
eventLog1.WriteEntry(ex.ToString(),
EventLogEntryType.Error);
}
}

private void DoAcceptTcpClientCallback(IAsyncResult ar)
{
try
{
SNS.TcpListener listener = (SNS.TcpListener)ar.AsyncState;
SNS.TcpClient clientSckt =
listener.EndAcceptTcpClient(ar);
listener.BeginAcceptTcpClient(new
AsyncCallback(DoAcceptTcpClientCallback), listener);
byte[] readbuffer = new byte[Initial_Buffer_Size];
SNS.NetworkStream ns = clientSckt.GetStream();
StringBuilder message = new StringBuilder();
int bytesread = ns.Read(readbuffer, 0, readbuffer.Length);
while (bytesread > 0)
{
message.Append(Encoding.UTF8.GetString(readbuffer, 0,
bytesread));
bytesread = ns.Read(readbuffer, 0, readbuffer.Length);
}
if (message.Length > 0)
{
if (Properties.Settings.Default.RouterMode)
{
//lock the int. since this is async IO more than
1 IO could be at this point
//need to make sure that the variable only gets
accessed in a serial manner.
//move to the next servant
string servername = string.Empty;
int port = 0;
lock (nextServantIndexSerializeObject)
{
if (++nextServantIndex >=
Properties.Settings.Default.Servants.Count)
nextServantIndex = 0;
servername =
Properties.Settings.Default.Servants[nextServantIndex].ServerName;
port =
Properties.Settings.Default.Servants[nextServantIndex].Port;
}
SNS.TcpClient qsServant = new SNS.TcpClient();
qsServant.Connect(servername , port);
//write client's incoming message to servant
byte[] writebuffer =
Encoding.ASCII.GetBytes(message.ToString());
SNS.NetworkStream servant_ns =
qsServant.GetStream();
servant_ns.Write(writebuffer, 0,
writebuffer.Length);

qsServant.Client.Shutdown(System.Net.Sockets.SocketShutdown.Send);
//read from servant and send to client
bytesread = servant_ns.Read(readbuffer, 0,
readbuffer.Length);
while (bytesread > 0)
{
clientSckt.GetStream().Write(readbuffer, 0,
bytesread);
bytesread = ns.Read(readbuffer, 0,
readbuffer.Length);
}

qsServant.Client.Shutdown(System.Net.Sockets.SocketShutdown.Both);
qsServant.Close();
}
else
{
ClientState cs = new ClientState(clientSckt,
message.ToString());
System.Threading.ThreadPool.QueueUserWorkItem(
new
System.Threading.WaitCallback(ProcessMessage), cs
);
}
}
else
{

clientSckt.Client.Shutdown(System.Net.Sockets.SocketShutdown.Both);
clientSckt.Close();
}
}
catch (Exception ex)
{
eventLog1.WriteEntry(ex.ToString(),
EventLogEntryType.Error);
}
}

/// <summary>
/// ths is where all the "work" gets done.
///
/// The client state consists of the message and the client
socket. that allows this proc
/// to responsd to the caller.
///
/// Because right now Rse is not reentrant the dictionary that
keeps the Rse objects is
/// locked so that simultaneous access is disallowed.
/// </summary>
/// <param name="clientState"></param>
/// <exception cref="RMQSMessageException"/>
/// <exception cref="RMQSInvalidCommandValueException"/>
private void ProcessMessage(object clientState)
{
RMQSMessage question= null;
RMQSResponse answer = null;
ClientState state = null;
try
{
state = (ClientState)clientState;
question = new RMQSMessage();
question.BuildMessage(state.Message);
ServiceAdapterInterface serviceInterface = null;
//we key the compool by the agency and the database
because there could be a
//training database in use. that data could be different
//the message target is there as well so that different
modules would be allowed
//to work simultaneously. right now they are all in the
same pool. if another message target
//was created then it would be good to add a target pool
so that only the objects of the same
//target of locked. messages to different targets would
then be able to work in parallel.
string comPoolKey = question.AgencyName + "|" +
question.DatabaseName + "|" + question.MessageTarget;
//this lock essentially places all queued objects in line.
The first one spawned is the
//first one to work. The other then back up behind and
wait for the lock to be released.
//when Rse is re-entrant this lock can be moved such that
the processing isn't included in
//the lock. that will allow the TreadPool to operate as
expected.
lock (serviceAdapterPool)
{
if (!serviceAdapterPool.ContainsKey(comPoolKey))
{
//this is the only line in the server that
references the RSE code.
//when/if there is a reason this code will be made
to read from a file
//dynamically load the class connected with the
MessageTarget.
//it would also be possible to load each of those
in their own app domain
//so process isolation could be enabled. this
might be a solution to the FIFO queue
//that is here.
//also note that each dynamically loaded service
will need to implement ServiceAdapterInterface.
ServiceAdapterInterface newAdapter =
(ServiceAdapterInterface)new RouteMatch.TS.Rse.RseAdapterContext(); ;
newAdapter.Initialize(question);
serviceAdapterPool.Add(comPoolKey, newAdapter);
}
serviceInterface = serviceAdapterPool[comPoolKey];
answer = serviceInterface.HandleMessage(question);
}
}
catch (RMQSMessageException ex)
{
if (question == null)
answer = new RMQSError("", ex.ErrorCode, ex.Message);
else
answer = new RMQSError(question, ex.ErrorCode,
ex.Message);
}
catch (Exception ex)
{
if (question == null)
answer = new RMQSError("", -99, ex.Message);
else
answer = new RMQSError(question, -99, ex.Message);
}
SNS.NetworkStream ns = state.ClientSocket.GetStream();
if (ns.CanWrite)
{
byte[] writebuffer = answer.ToByteArray();
ns.Write(writebuffer, 0, writebuffer.Length);

state.ClientSocket.Client.Shutdown(System.Net.Sockets.SocketShutdown.Send);
}
else
eventLog1.WriteEntry("Couldn't write");

state.ClientSocket.Close();
}
/// <summary>
/// Used to hold state for the message and is placed in the
treadpool
/// </summary>
private class ClientState
{
public ClientState(SNS.TcpClient clientSocket, string message)
{
Message = message;
ClientSocket = clientSocket;
}
private string _message;

public string Message
{
get { return _message; }
set { _message = value; }
}
private SNS.TcpClient _socket;

public SNS.TcpClient ClientSocket
{
get { return _socket; }
set { _socket = value; }
}
}
}
public class QsServant
{
private string _serverName;

public string ServerName
{
get { return _serverName; }
set { _serverName = value; }
}
private int _port;

public int Port
{
get { return _port; }
set { _port = value; }
}
}
[Serializable]
public class QsServants : List<QsServant>
{
public QsServants() : base() { }
public QsServants(IEnumerable<QsServant> collection) :
base(collection) { }
public QsServants(int capacity) : base(capacity) { }
}
}
 
D

Dan Holmes

Paul said:
Dan,

Is there any reason why you did not use WCF or standard Web Services, or
even just message queuing?

Yes, my boss wanted it TCP messaging only.
Why not implement the count in QsServant and implement the locking method
that Peter suggested earlier on, that I then posted an example for,
alrternatively use the count from QsServants??

I am not sure i follow. Not only do i need to increment the value i need to reset it to zero when it is higher than the
number of objects in my collection. I didn't see any example that locked the increment and the reset the value when
necessary. Perhaps an example would help me.
Your code is complex so without background it is hard to know what your
objectives are.

This code is receiving a message and sending it to non-reentrant C++ code. That intentional locking is causing
performance issues for high message volumes. A router mode was created. More than one copy of the service will now be
running with one of them in router modes. The rest are servants. The client still only talks to the one instance and
doesn't know about the task delegation. The delegation is done in a round-robin, hence the collection of QsServants and
the need to reset the index to zero when the value is higher than the collection count.
Dan Holmes said:
Peter said:
If you're saying that the int itself is unneeded, I agree...that may very
well be. What little code we did see seems potentially awkward if not
outright flawed from a design standpoint. But the OP didn't show enough
code for us to know for sure; it could just be that we're not seeing the
whole picture.

Pete
If my code is bad i want to know. Here is the whole thing. This is a
service. It works and has been for a while so whatever flaw must be drawn
out by special circumstances that i haven't encountered yet. There isn't
anything secret going on here.

I have written the client side too, so i know what inputs are expected.


using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Diagnostics;
using System.ServiceProcess;
using System.Text;
using SNS = System.Net.Sockets;
namespace RouteMatch.RMQS
{
public partial class RMQSService : ServiceBase
{
//initial size of the read buffer
private const int Initial_Buffer_Size = 65536;
private SNS.TcpListener listener = null;
//this is incremented before the execution that is why it is
initialized to -1 instead of 0.
private int nextServantIndex = -1;
private ClientState nextServantIndexSerializeObject = new
ClientState(null, null);
private Dictionary<string, ServiceAdapterInterface>
serviceAdapterPool = new Dictionary<string, ServiceAdapterInterface>();
public RMQSService()
{
InitializeComponent();
}

protected override void OnStart(string[] args)
{
try
{
eventLog1.Log = "Application";
eventLog1.Source = "RMQS";
listener = new SNS.TcpListener(System.Net.IPAddress.Any,
Properties.Settings.Default.Port);
listener.Start();
listener.BeginAcceptTcpClient(new
AsyncCallback(DoAcceptTcpClientCallback), listener);
}
catch (Exception ex)
{
if (eventLog1 != null)
eventLog1.WriteEntry(ex.ToString(),
EventLogEntryType.Error);
}
}

private void DoAcceptTcpClientCallback(IAsyncResult ar)
{
try
{
SNS.TcpListener listener = (SNS.TcpListener)ar.AsyncState;
SNS.TcpClient clientSckt =
listener.EndAcceptTcpClient(ar);
listener.BeginAcceptTcpClient(new
AsyncCallback(DoAcceptTcpClientCallback), listener);
byte[] readbuffer = new byte[Initial_Buffer_Size];
SNS.NetworkStream ns = clientSckt.GetStream();
StringBuilder message = new StringBuilder();
int bytesread = ns.Read(readbuffer, 0, readbuffer.Length);
while (bytesread > 0)
{
message.Append(Encoding.UTF8.GetString(readbuffer, 0,
bytesread));
bytesread = ns.Read(readbuffer, 0, readbuffer.Length);
}
if (message.Length > 0)
{
if (Properties.Settings.Default.RouterMode)
{
//lock the int. since this is async IO more than
1 IO could be at this point
//need to make sure that the variable only gets
accessed in a serial manner.
//move to the next servant
string servername = string.Empty;
int port = 0;
lock (nextServantIndexSerializeObject)
{
if (++nextServantIndex >=
Properties.Settings.Default.Servants.Count)
nextServantIndex = 0;
servername =
Properties.Settings.Default.Servants[nextServantIndex].ServerName;
port =
Properties.Settings.Default.Servants[nextServantIndex].Port;
}
SNS.TcpClient qsServant = new SNS.TcpClient();
qsServant.Connect(servername , port);
//write client's incoming message to servant
byte[] writebuffer =
Encoding.ASCII.GetBytes(message.ToString());
SNS.NetworkStream servant_ns =
qsServant.GetStream();
servant_ns.Write(writebuffer, 0,
writebuffer.Length);

qsServant.Client.Shutdown(System.Net.Sockets.SocketShutdown.Send);
//read from servant and send to client
bytesread = servant_ns.Read(readbuffer, 0,
readbuffer.Length);
while (bytesread > 0)
{
clientSckt.GetStream().Write(readbuffer, 0,
bytesread);
bytesread = ns.Read(readbuffer, 0,
readbuffer.Length);
}

qsServant.Client.Shutdown(System.Net.Sockets.SocketShutdown.Both);
qsServant.Close();
}
else
{
ClientState cs = new ClientState(clientSckt,
message.ToString());
System.Threading.ThreadPool.QueueUserWorkItem(
new
System.Threading.WaitCallback(ProcessMessage), cs
);
}
}
else
{

clientSckt.Client.Shutdown(System.Net.Sockets.SocketShutdown.Both);
clientSckt.Close();
}
}
catch (Exception ex)
{
eventLog1.WriteEntry(ex.ToString(),
EventLogEntryType.Error);
}
}

/// <summary>
/// ths is where all the "work" gets done.
///
/// The client state consists of the message and the client
socket. that allows this proc
/// to responsd to the caller.
///
/// Because right now Rse is not reentrant the dictionary that
keeps the Rse objects is
/// locked so that simultaneous access is disallowed.
/// </summary>
/// <param name="clientState"></param>
/// <exception cref="RMQSMessageException"/>
/// <exception cref="RMQSInvalidCommandValueException"/>
private void ProcessMessage(object clientState)
{
RMQSMessage question= null;
RMQSResponse answer = null;
ClientState state = null;
try
{
state = (ClientState)clientState;
question = new RMQSMessage();
question.BuildMessage(state.Message);
ServiceAdapterInterface serviceInterface = null;
//we key the compool by the agency and the database
because there could be a
//training database in use. that data could be different
//the message target is there as well so that different
modules would be allowed
//to work simultaneously. right now they are all in the
same pool. if another message target
//was created then it would be good to add a target pool
so that only the objects of the same
//target of locked. messages to different targets would
then be able to work in parallel.
string comPoolKey = question.AgencyName + "|" +
question.DatabaseName + "|" + question.MessageTarget;
//this lock essentially places all queued objects in line.
The first one spawned is the
//first one to work. The other then back up behind and
wait for the lock to be released.
//when Rse is re-entrant this lock can be moved such that
the processing isn't included in
//the lock. that will allow the TreadPool to operate as
expected.
lock (serviceAdapterPool)
{
if (!serviceAdapterPool.ContainsKey(comPoolKey))
{
//this is the only line in the server that
references the RSE code.
//when/if there is a reason this code will be made
to read from a file
//dynamically load the class connected with the
MessageTarget.
//it would also be possible to load each of those
in their own app domain
//so process isolation could be enabled. this
might be a solution to the FIFO queue
//that is here.
//also note that each dynamically loaded service
will need to implement ServiceAdapterInterface.
ServiceAdapterInterface newAdapter =
(ServiceAdapterInterface)new RouteMatch.TS.Rse.RseAdapterContext(); ;
newAdapter.Initialize(question);
serviceAdapterPool.Add(comPoolKey, newAdapter);
}
serviceInterface = serviceAdapterPool[comPoolKey];
answer = serviceInterface.HandleMessage(question);
}
}
catch (RMQSMessageException ex)
{
if (question == null)
answer = new RMQSError("", ex.ErrorCode, ex.Message);
else
answer = new RMQSError(question, ex.ErrorCode,
ex.Message);
}
catch (Exception ex)
{
if (question == null)
answer = new RMQSError("", -99, ex.Message);
else
answer = new RMQSError(question, -99, ex.Message);
}
SNS.NetworkStream ns = state.ClientSocket.GetStream();
if (ns.CanWrite)
{
byte[] writebuffer = answer.ToByteArray();
ns.Write(writebuffer, 0, writebuffer.Length);

state.ClientSocket.Client.Shutdown(System.Net.Sockets.SocketShutdown.Send);
}
else
eventLog1.WriteEntry("Couldn't write");

state.ClientSocket.Close();
}
/// <summary>
/// Used to hold state for the message and is placed in the
treadpool
/// </summary>
private class ClientState
{
public ClientState(SNS.TcpClient clientSocket, string message)
{
Message = message;
ClientSocket = clientSocket;
}
private string _message;

public string Message
{
get { return _message; }
set { _message = value; }
}
private SNS.TcpClient _socket;

public SNS.TcpClient ClientSocket
{
get { return _socket; }
set { _socket = value; }
}
}
}
public class QsServant
{
private string _serverName;

public string ServerName
{
get { return _serverName; }
set { _serverName = value; }
}
private int _port;

public int Port
{
get { return _port; }
set { _port = value; }
}
}
[Serializable]
public class QsServants : List<QsServant>
{
public QsServants() : base() { }
public QsServants(IEnumerable<QsServant> collection) :
base(collection) { }
public QsServants(int capacity) : base(capacity) { }
}
}
 
P

Paul

So basically what you are doing is farming out work over threading and to
other services/Servers?

Using WCF and webservices you could have scalability without all the
complexity created for yourself.

Personally I would question the need for it running over TCP but as
explained WCF and classic remoting can both run over TCP.

Paul

Dan Holmes said:
Paul said:
Dan,

Is there any reason why you did not use WCF or standard Web Services, or
even just message queuing?

Yes, my boss wanted it TCP messaging only.
Why not implement the count in QsServant and implement the locking method
that Peter suggested earlier on, that I then posted an example for,
alrternatively use the count from QsServants??

I am not sure i follow. Not only do i need to increment the value i need
to reset it to zero when it is higher than the number of objects in my
collection. I didn't see any example that locked the increment and the
reset the value when necessary. Perhaps an example would help me.
Your code is complex so without background it is hard to know what your
objectives are.

This code is receiving a message and sending it to non-reentrant C++ code.
That intentional locking is causing performance issues for high message
volumes. A router mode was created. More than one copy of the service
will now be running with one of them in router modes. The rest are
servants. The client still only talks to the one instance and doesn't
know about the task delegation. The delegation is done in a round-robin,
hence the collection of QsServants and the need to reset the index to zero
when the value is higher than the collection count.
Dan Holmes said:
Peter Duniho wrote:
If you're saying that the int itself is unneeded, I agree...that may
very well be. What little code we did see seems potentially awkward if
not outright flawed from a design standpoint. But the OP didn't show
enough code for us to know for sure; it could just be that we're not
seeing the whole picture.

Pete
If my code is bad i want to know. Here is the whole thing. This is a
service. It works and has been for a while so whatever flaw must be
drawn out by special circumstances that i haven't encountered yet.
There isn't anything secret going on here.

I have written the client side too, so i know what inputs are expected.


using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Diagnostics;
using System.ServiceProcess;
using System.Text;
using SNS = System.Net.Sockets;
namespace RouteMatch.RMQS
{
public partial class RMQSService : ServiceBase
{
//initial size of the read buffer
private const int Initial_Buffer_Size = 65536;
private SNS.TcpListener listener = null;
//this is incremented before the execution that is why it is
initialized to -1 instead of 0.
private int nextServantIndex = -1;
private ClientState nextServantIndexSerializeObject = new
ClientState(null, null);
private Dictionary<string, ServiceAdapterInterface>
serviceAdapterPool = new Dictionary<string, ServiceAdapterInterface>();
public RMQSService()
{
InitializeComponent();
}

protected override void OnStart(string[] args)
{
try
{
eventLog1.Log = "Application";
eventLog1.Source = "RMQS";
listener = new SNS.TcpListener(System.Net.IPAddress.Any,
Properties.Settings.Default.Port);
listener.Start();
listener.BeginAcceptTcpClient(new
AsyncCallback(DoAcceptTcpClientCallback), listener);
}
catch (Exception ex)
{
if (eventLog1 != null)
eventLog1.WriteEntry(ex.ToString(),
EventLogEntryType.Error);
}
}

private void DoAcceptTcpClientCallback(IAsyncResult ar)
{
try
{
SNS.TcpListener listener =
(SNS.TcpListener)ar.AsyncState;
SNS.TcpClient clientSckt =
listener.EndAcceptTcpClient(ar);
listener.BeginAcceptTcpClient(new
AsyncCallback(DoAcceptTcpClientCallback), listener);
byte[] readbuffer = new byte[Initial_Buffer_Size];
SNS.NetworkStream ns = clientSckt.GetStream();
StringBuilder message = new StringBuilder();
int bytesread = ns.Read(readbuffer, 0,
readbuffer.Length);
while (bytesread > 0)
{
message.Append(Encoding.UTF8.GetString(readbuffer,
0, bytesread));
bytesread = ns.Read(readbuffer, 0,
readbuffer.Length);
}
if (message.Length > 0)
{
if (Properties.Settings.Default.RouterMode)
{
//lock the int. since this is async IO more
than 1 IO could be at this point
//need to make sure that the variable only gets
accessed in a serial manner.
//move to the next servant
string servername = string.Empty;
int port = 0;
lock (nextServantIndexSerializeObject)
{
if (++nextServantIndex >=
Properties.Settings.Default.Servants.Count)
nextServantIndex = 0;
servername =
Properties.Settings.Default.Servants[nextServantIndex].ServerName;
port =
Properties.Settings.Default.Servants[nextServantIndex].Port;
}
SNS.TcpClient qsServant = new SNS.TcpClient();
qsServant.Connect(servername , port);
//write client's incoming message to servant
byte[] writebuffer =
Encoding.ASCII.GetBytes(message.ToString());
SNS.NetworkStream servant_ns =
qsServant.GetStream();
servant_ns.Write(writebuffer, 0,
writebuffer.Length);

qsServant.Client.Shutdown(System.Net.Sockets.SocketShutdown.Send);
//read from servant and send to client
bytesread = servant_ns.Read(readbuffer, 0,
readbuffer.Length);
while (bytesread > 0)
{
clientSckt.GetStream().Write(readbuffer, 0,
bytesread);
bytesread = ns.Read(readbuffer, 0,
readbuffer.Length);
}

qsServant.Client.Shutdown(System.Net.Sockets.SocketShutdown.Both);
qsServant.Close();
}
else
{
ClientState cs = new ClientState(clientSckt,
message.ToString());
System.Threading.ThreadPool.QueueUserWorkItem(
new
System.Threading.WaitCallback(ProcessMessage), cs
);
}
}
else
{

clientSckt.Client.Shutdown(System.Net.Sockets.SocketShutdown.Both);
clientSckt.Close();
}
}
catch (Exception ex)
{
eventLog1.WriteEntry(ex.ToString(),
EventLogEntryType.Error);
}
}

/// <summary>
/// ths is where all the "work" gets done.
///
/// The client state consists of the message and the client
socket. that allows this proc
/// to responsd to the caller.
///
/// Because right now Rse is not reentrant the dictionary that
keeps the Rse objects is
/// locked so that simultaneous access is disallowed.
/// </summary>
/// <param name="clientState"></param>
/// <exception cref="RMQSMessageException"/>
/// <exception cref="RMQSInvalidCommandValueException"/>
private void ProcessMessage(object clientState)
{
RMQSMessage question= null;
RMQSResponse answer = null;
ClientState state = null;
try
{
state = (ClientState)clientState;
question = new RMQSMessage();
question.BuildMessage(state.Message);
ServiceAdapterInterface serviceInterface = null;
//we key the compool by the agency and the database
because there could be a
//training database in use. that data could be
different
//the message target is there as well so that different
modules would be allowed
//to work simultaneously. right now they are all in the
same pool. if another message target
//was created then it would be good to add a target pool
so that only the objects of the same
//target of locked. messages to different targets would
then be able to work in parallel.
string comPoolKey = question.AgencyName + "|" +
question.DatabaseName + "|" + question.MessageTarget;
//this lock essentially places all queued objects in
line. The first one spawned is the
//first one to work. The other then back up behind and
wait for the lock to be released.
//when Rse is re-entrant this lock can be moved such
that the processing isn't included in
//the lock. that will allow the TreadPool to operate as
expected.
lock (serviceAdapterPool)
{
if (!serviceAdapterPool.ContainsKey(comPoolKey))
{
//this is the only line in the server that
references the RSE code.
//when/if there is a reason this code will be
made to read from a file
//dynamically load the class connected with the
MessageTarget.
//it would also be possible to load each of
those in their own app domain
//so process isolation could be enabled. this
might be a solution to the FIFO queue
//that is here.
//also note that each dynamically loaded service
will need to implement ServiceAdapterInterface.
ServiceAdapterInterface newAdapter =
(ServiceAdapterInterface)new RouteMatch.TS.Rse.RseAdapterContext(); ;
newAdapter.Initialize(question);
serviceAdapterPool.Add(comPoolKey, newAdapter);
}
serviceInterface = serviceAdapterPool[comPoolKey];
answer = serviceInterface.HandleMessage(question);
}
}
catch (RMQSMessageException ex)
{
if (question == null)
answer = new RMQSError("", ex.ErrorCode,
ex.Message);
else
answer = new RMQSError(question, ex.ErrorCode,
ex.Message);
}
catch (Exception ex)
{
if (question == null)
answer = new RMQSError("", -99, ex.Message);
else
answer = new RMQSError(question, -99, ex.Message);
}
SNS.NetworkStream ns = state.ClientSocket.GetStream();
if (ns.CanWrite)
{
byte[] writebuffer = answer.ToByteArray();
ns.Write(writebuffer, 0, writebuffer.Length);

state.ClientSocket.Client.Shutdown(System.Net.Sockets.SocketShutdown.Send);
}
else
eventLog1.WriteEntry("Couldn't write");

state.ClientSocket.Close();
}
/// <summary>
/// Used to hold state for the message and is placed in the
treadpool
/// </summary>
private class ClientState
{
public ClientState(SNS.TcpClient clientSocket, string
message)
{
Message = message;
ClientSocket = clientSocket;
}
private string _message;

public string Message
{
get { return _message; }
set { _message = value; }
}
private SNS.TcpClient _socket;

public SNS.TcpClient ClientSocket
{
get { return _socket; }
set { _socket = value; }
}
}
}
public class QsServant
{
private string _serverName;

public string ServerName
{
get { return _serverName; }
set { _serverName = value; }
}
private int _port;

public int Port
{
get { return _port; }
set { _port = value; }
}
}
[Serializable]
public class QsServants : List<QsServant>
{
public QsServants() : base() { }
public QsServants(IEnumerable<QsServant> collection) :
base(collection) { }
public QsServants(int capacity) : base(capacity) { }
}
}
 
P

Peter Duniho

Peter said:
If you're saying that the int itself is unneeded, I agree...that may
very well be. What little code we did see seems potentially awkward if
not outright flawed from a design standpoint. But the OP didn't show
enough code for us to know for sure; it could just be that we're not
seeing the whole picture.
Pete

If my code is bad i want to know. Here is the whole thing. [...]

A code example needs to be both concise and complete, and reliably
demonstrate the problem.

Concise means there is nothing in the code example that is not strictly
needed to demonstrates the problem.

Complete means that there is nothing missing from the code example to
demonstrate the problem. Usually for networking questions, this means
providing code for both endpoints (usually by including both endpoints in
the same executable).

In this particular case, we may be able to do without an actual working
example (i.e. without the other endpoint). But you should still simplify
the code example so that we don't have to sift through a bunch of code
that isn't actaully relevant to the question at hand. The example you
posted does not appear to meet that requirement.

Someone with more time on their hands may spend the time looking through
the code in detail and be able to provide useful information. But if you
want to maximize the number of people actively providing suggestions,
you'll need to post a better code example.

Pete
 
B

Ben Voigt [C++ MVP]

Dan Holmes said:
If my code is bad i want to know. Here is the whole thing. This is a
service. It works and has been for a while so whatever flaw must be drawn
out by special circumstances that i haven't encountered yet. There isn't
anything secret going on here.

You're locking on an instance of a reference type, no problems there.
Although if that variable (nextServantIndexSerializeObject) isn't used for
anything except that lock, you might just use an object instead of some more
complex class.

I didn't look closely at the rest of the code. Using pure TCP instead of
WCF or .NET Remoting or MSMQ has a huge advantage in terms of
interoperability. The other end can be anything with a TCP stack, which
means just about every processor, operating system, and programming language
in existence is a candidate.
I have written the client side too, so i know what inputs are expected.


using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Diagnostics;
using System.ServiceProcess;
using System.Text;
using SNS = System.Net.Sockets;
namespace RouteMatch.RMQS
{
public partial class RMQSService : ServiceBase
{
//initial size of the read buffer
private const int Initial_Buffer_Size = 65536;
private SNS.TcpListener listener = null;
//this is incremented before the execution that is why it is
initialized to -1 instead of 0.
private int nextServantIndex = -1;
private ClientState nextServantIndexSerializeObject = new
ClientState(null, null);
private Dictionary<string, ServiceAdapterInterface>
serviceAdapterPool = new Dictionary<string, ServiceAdapterInterface>();
public RMQSService()
{
InitializeComponent();
}

protected override void OnStart(string[] args)
{
try
{
eventLog1.Log = "Application";
eventLog1.Source = "RMQS";
listener = new SNS.TcpListener(System.Net.IPAddress.Any,
Properties.Settings.Default.Port);
listener.Start();
listener.BeginAcceptTcpClient(new
AsyncCallback(DoAcceptTcpClientCallback), listener);
}
catch (Exception ex)
{
if (eventLog1 != null)
eventLog1.WriteEntry(ex.ToString(),
EventLogEntryType.Error);
}
}

private void DoAcceptTcpClientCallback(IAsyncResult ar)
{
try
{
SNS.TcpListener listener = (SNS.TcpListener)ar.AsyncState;
SNS.TcpClient clientSckt =
listener.EndAcceptTcpClient(ar);
listener.BeginAcceptTcpClient(new
AsyncCallback(DoAcceptTcpClientCallback), listener);
byte[] readbuffer = new byte[Initial_Buffer_Size];
SNS.NetworkStream ns = clientSckt.GetStream();
StringBuilder message = new StringBuilder();
int bytesread = ns.Read(readbuffer, 0, readbuffer.Length);
while (bytesread > 0)
{
message.Append(Encoding.UTF8.GetString(readbuffer, 0,
bytesread));
bytesread = ns.Read(readbuffer, 0, readbuffer.Length);
}
if (message.Length > 0)
{
if (Properties.Settings.Default.RouterMode)
{
//lock the int. since this is async IO more than
1 IO could be at this point
//need to make sure that the variable only gets
accessed in a serial manner.
//move to the next servant
string servername = string.Empty;
int port = 0;
lock (nextServantIndexSerializeObject)
{
if (++nextServantIndex >=
Properties.Settings.Default.Servants.Count)
nextServantIndex = 0;
servername =
Properties.Settings.Default.Servants[nextServantIndex].ServerName;
port =
Properties.Settings.Default.Servants[nextServantIndex].Port;
}
SNS.TcpClient qsServant = new SNS.TcpClient();
qsServant.Connect(servername , port);
//write client's incoming message to servant
byte[] writebuffer =
Encoding.ASCII.GetBytes(message.ToString());
SNS.NetworkStream servant_ns =
qsServant.GetStream();
servant_ns.Write(writebuffer, 0,
writebuffer.Length);

qsServant.Client.Shutdown(System.Net.Sockets.SocketShutdown.Send);
//read from servant and send to client
bytesread = servant_ns.Read(readbuffer, 0,
readbuffer.Length);
while (bytesread > 0)
{
clientSckt.GetStream().Write(readbuffer, 0,
bytesread);
bytesread = ns.Read(readbuffer, 0,
readbuffer.Length);
}

qsServant.Client.Shutdown(System.Net.Sockets.SocketShutdown.Both);
qsServant.Close();
}
else
{
ClientState cs = new ClientState(clientSckt,
message.ToString());
System.Threading.ThreadPool.QueueUserWorkItem(
new
System.Threading.WaitCallback(ProcessMessage), cs
);
}
}
else
{

clientSckt.Client.Shutdown(System.Net.Sockets.SocketShutdown.Both);
clientSckt.Close();
}
}
catch (Exception ex)
{
eventLog1.WriteEntry(ex.ToString(),
EventLogEntryType.Error);
}
}

/// <summary>
/// ths is where all the "work" gets done.
///
/// The client state consists of the message and the client
socket. that allows this proc
/// to responsd to the caller.
///
/// Because right now Rse is not reentrant the dictionary that
keeps the Rse objects is
/// locked so that simultaneous access is disallowed.
/// </summary>
/// <param name="clientState"></param>
/// <exception cref="RMQSMessageException"/>
/// <exception cref="RMQSInvalidCommandValueException"/>
private void ProcessMessage(object clientState)
{
RMQSMessage question= null;
RMQSResponse answer = null;
ClientState state = null;
try
{
state = (ClientState)clientState;
question = new RMQSMessage();
question.BuildMessage(state.Message);
ServiceAdapterInterface serviceInterface = null;
//we key the compool by the agency and the database
because there could be a
//training database in use. that data could be different
//the message target is there as well so that different
modules would be allowed
//to work simultaneously. right now they are all in the
same pool. if another message target
//was created then it would be good to add a target pool
so that only the objects of the same
//target of locked. messages to different targets would
then be able to work in parallel.
string comPoolKey = question.AgencyName + "|" +
question.DatabaseName + "|" + question.MessageTarget;
//this lock essentially places all queued objects in line.
The first one spawned is the
//first one to work. The other then back up behind and
wait for the lock to be released.
//when Rse is re-entrant this lock can be moved such that
the processing isn't included in
//the lock. that will allow the TreadPool to operate as
expected.
lock (serviceAdapterPool)
{
if (!serviceAdapterPool.ContainsKey(comPoolKey))
{
//this is the only line in the server that
references the RSE code.
//when/if there is a reason this code will be made
to read from a file
//dynamically load the class connected with the
MessageTarget.
//it would also be possible to load each of those
in their own app domain
//so process isolation could be enabled. this
might be a solution to the FIFO queue
//that is here.
//also note that each dynamically loaded service
will need to implement ServiceAdapterInterface.
ServiceAdapterInterface newAdapter =
(ServiceAdapterInterface)new RouteMatch.TS.Rse.RseAdapterContext(); ;
newAdapter.Initialize(question);
serviceAdapterPool.Add(comPoolKey, newAdapter);
}
serviceInterface = serviceAdapterPool[comPoolKey];
answer = serviceInterface.HandleMessage(question);
}
}
catch (RMQSMessageException ex)
{
if (question == null)
answer = new RMQSError("", ex.ErrorCode, ex.Message);
else
answer = new RMQSError(question, ex.ErrorCode,
ex.Message);
}
catch (Exception ex)
{
if (question == null)
answer = new RMQSError("", -99, ex.Message);
else
answer = new RMQSError(question, -99, ex.Message);
}
SNS.NetworkStream ns = state.ClientSocket.GetStream();
if (ns.CanWrite)
{
byte[] writebuffer = answer.ToByteArray();
ns.Write(writebuffer, 0, writebuffer.Length);

state.ClientSocket.Client.Shutdown(System.Net.Sockets.SocketShutdown.Send);
}
else
eventLog1.WriteEntry("Couldn't write");

state.ClientSocket.Close();
}
/// <summary>
/// Used to hold state for the message and is placed in the
treadpool
/// </summary>
private class ClientState
{
public ClientState(SNS.TcpClient clientSocket, string message)
{
Message = message;
ClientSocket = clientSocket;
}
private string _message;

public string Message
{
get { return _message; }
set { _message = value; }
}
private SNS.TcpClient _socket;

public SNS.TcpClient ClientSocket
{
get { return _socket; }
set { _socket = value; }
}
}
}
public class QsServant
{
private string _serverName;

public string ServerName
{
get { return _serverName; }
set { _serverName = value; }
}
private int _port;

public int Port
{
get { return _port; }
set { _port = value; }
}
}
[Serializable]
public class QsServants : List<QsServant>
{
public QsServants() : base() { }
public QsServants(IEnumerable<QsServant> collection) :
base(collection) { }
public QsServants(int capacity) : base(capacity) { }
}
}
 
P

Paul

I didn't look closely at the rest of the code. Using pure TCP instead of
WCF or .NET Remoting or MSMQ has a huge advantage in terms of
interoperability. The other end can be anything with a TCP stack, which
means just about every processor, operating system, and programming
language in existence is a candidate.


What?

TCP is the communication protocol and does not by itself determine if it is
interoperable between platforms.

Most remoting methods are interoperable, it is the interface that is
probably more important.

Your argument is mute and more reason to choose technology such as SOAP or
Restful HTTP, but ensure you use an interface that can be shared. e,g, don't
expose datasets in your SOAP interface which you would not anyway right.
 
G

Göran Andersson

Dan said:
I have TCP code that is incrementing an int for each accept socket. I
tried to use lock(int) for synchronization but that doesn't work. I
have settled on declaring the int as volatile. Will that work? Based
on what i read about volatile this could work.

Making it volatile doesn't do any syncronisation, it only keeps the
compiler from optimising away reads and writes to the variable so that
you always work with the actual variable. You can use volatile if you
don't need any synchronisation, i.e. if you only do atomic operations on
the variable.

Your problem with using a value type in the lock statement is obviously
that you lock on the boxed object. However, I think that the real
problem is that you think that you should use the variable that you need
to protect in the lock statement. The lock doesn't protect the object
that you lock on, it protects the code in the lock from being run by
more than one thread at a time.

Actually, in most cases you should not use the object that you want to
protect in the lock statement, instead you should have a specific object
that is created solely to be used as an identifier for the lock.

Create an object in the class to use as identifier:

object _sync = new object();

Use the object for locking the code block accessing the counter:

// Lock the code. Since this is async IO more than
// 1 IO could be at this point need to make sure
// that the variable only gets accessed in a serial manner.
lock (_sync) {
//move to the next servant
if (++nextServantIndex >=
Properties.Settings.Default.Servants.Count) {
nextServantIndex = 0;
}
}
 
P

Paul

lol moot not mute.....Absolutely right don't know what made me write it
mute.

I agree as a communication protocol TCP is pretty much cross platform where
I disagree is that re-inventing the wheel is a good thing. Most development
languages have support for SOAP or even HTTP communication this day and age,
even VB6 had SOAP support.

Looking at the function in hand re-creating the wheel in this case has
caused and always has the posibility of creating more issues than it solves.
Extra testing and black box functionality with no standards for other
consumers of the service to follow. Yes fine if you want to use it yourself
but why then re-create something for additional dev and testing when you can
find an existing standard that fits, lets be honest you will be hard pressed
not to.

To me solutions like this are badly architected and waiting to fail, mabe im
an old fart but when I see deveopers re-creating the wheel I alway qustion
the design/architecture of the system. MS has spent a lot of time developing
the remoting services in .NET and they are for the most part cross platform
in the right hands and given correct architectural choices, why would you
try to re-write this? Low level devices maybe but Software to Software?

It's just my opinion.
 
D

Dan Holmes

Paul said:
lol moot not mute.....Absolutely right don't know what made me write it
mute.

I agree as a communication protocol TCP is pretty much cross platform where
I disagree is that re-inventing the wheel is a good thing. Most development
languages have support for SOAP or even HTTP communication this day and age,
even VB6 had SOAP support.

Looking at the function in hand re-creating the wheel in this case has
caused and always has the posibility of creating more issues than it solves.
Extra testing and black box functionality with no standards for other
consumers of the service to follow. Yes fine if you want to use it yourself
but why then re-create something for additional dev and testing when you can
find an existing standard that fits, lets be honest you will be hard pressed
not to.

To me solutions like this are badly architected and waiting to fail, mabe im
an old fart but when I see deveopers re-creating the wheel I alway qustion
the design/architecture of the system. MS has spent a lot of time developing
the remoting services in .NET and they are for the most part cross platform
in the right hands and given correct architectural choices, why would you
try to re-write this? Low level devices maybe but Software to Software?

It's just my opinion.

As the author of that code i can see your point but in this case it is hard to anticipate who will be a consumer of this
messaging in the future. We have a mixture of languages already. .net, VB6, Java, C++ along with .net inside
SQLServer. We are also on VS2005 and there are currently no plans to change. With all those things going on, i don't
see how Remoting or SOAP or WCF or a Webservice is any easier (or possible) to implement than my own protocol and messaging.

Plus why should i send a bloated XML (SOAP IS BLOATED) document to represent two integers when a
<length><delimiter><messagetype>,<int>,<int> (6,1,40,23 (9 bytes) vs
<positionmessage><lat>40</lat><long>23</long></positionmessage> (63 bytes))type protocol works. When the mobile devices
are charged by the amount of bandwidth they use and they send this information every 30seconds or so, the difference
adds up quickly.

So your statement that this solution is "badly architected and waiting to fail" is a statement made out of ignorance and
hopefully not hubris. That is your opinion and i will take it as such.

My gripe with MS is that they introduce so many new technologies and deprecate them (WCF a superset of remoting) so
quickly how do i know that VS201? won't introduce a new flavor that obsoletes something they came up with in the release
i am working in. I know that socket programming will always be. Once i am good at it i don't have to relearn how to do
it with each release. I can also know that i can transfer my knowledge to the other languages currently in use in the
company.

Having said that, some of the stuff that comes with WCF (like security) would be nice (because i won't have to write
that) but even then a mobile device doesn't login to the network and therefore doesn't have a windows user name anyway.

I am thankful for the help.

danny
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Top