Context.Send/Post in non-gui applications

T

Ty

Hi guys

I hope someone can shed some light on following problem. Sorry for the
lengthy post, maybe you can read the bottom of the post to understand
my problem in advance...

We have a library that builds/manages a dataset that represents data
being received from the network.

This class looks like this (simplified):

Public class DataManager
{
private DataSet m_dataset;
private SynchronizationContext m_synchronization_context;

public DataManager()
{
m_synchronization_context = SynchronizationContext.Current; //thread
1 (gui-thread if gui-app)

//handler is the class that receives data from the network
handler.Data1Update += handle_data1update_proxy; //thread 2
handler.Data2Update += handle_data2update_proxy; //thread 3
}

private void handle_data1update(object state)
{
//doing the work on the DataSet;
}

private void handle_data2update(object state)
{
//doing the work on the DataSet;
}

private void handle_data1update_proxy(object msg)
{
object state = new object[] { msg };
m_synchronization_context.Post(handle_data1update, state);
}

private void handle_data2update_proxy(object msg)
{
object state = new object[] { msg };
m_synchronization_context.Post(handle_data2update, state);
}
}



This class/dataset is usually used in a GUI and it's working well
there. Providing thread-safety because all updating/reading is done in
the same thread.


Now we want to use the same class in a Windows Service. This is how
far i've come:
In non-gui applications there is no SynchronizationContext, causing
the DataManager-Class to not work at all. So before instanciating
DataManager, i set/create a new SyncronizationContext:

SynchronizationContext.SetSynchronizationContext(new
SynchronizationContext());

Now the DataManager works. But it crashed after a while due to Dataset-
inconsistencies. I figured out that this happens because the
SynchronizationContext-Class does not really post the message to the
main-thread, it just executes the method in the same thread as the
message came in.
and due to the async nature of Post(), it was possible that thread 2
and thread 3 delivered updates at the same time.

at this point i was able to solve the problem using a trick...:

public class CheatSync : SynchronizationContext
{
public override void Post(SendOrPostCallback d, object state)
{
Send(d, state);
}
}


i use this class to set the SyncronizationContext. By overriding Post
and doing a Send instead, the workflow is better synced. And like
that, it's actually working. But it's not satisfing.

Firstable: Send/Post are still not executing the Methods in the main-
thread, they are being executed in the thread they arrived. I'm
checking this by watching Thread.CurrentThread.ManagedThreadId, it
properly executes on the main-thread when used from a gui-app but uses
the data-incoming thread in a non-gui app.I'm shure this has potential
risks, since im reading out the data from the main-thread.

Let's assume i don't want to change DataManager-Class. How can i use
Send/Post to execute methods in other threads in non-gui applications?
i also played with instanciating WindowsFormsSynchronizationContext
and many other things.... but no success. it's either not working at
all, or not working correctly. i believe theres a "message-hub"
missing? maybe there exists some kind of smarter SyncronisationContext-
Class?

What am i missing?


THANK YOU!!
 
P

Peter Duniho

Ty said:
Hi guys

I hope someone can shed some light on following problem. Sorry for the
lengthy post, maybe you can read the bottom of the post to understand
my problem in advance... [...]

Bottom line: you are trying to use SynchronizationContext as a way to
serialize access to your shared data structures, but finding that
without a Forms or WPF application, SynchronizationContext does not
actually marshal to a different thread when you call Post() or Send().

I think you could have explained the problem more concisely :), but
better to have more words and make sure you've explained the question
clearly, than to post a message with some vague sense of what you're
doing and what you're having trouble with.

Actually, I'm a bit surprised that you find yourself having to create a
SynchronizationContext explicitly. I'm sure I've tested
SynchronizationContext in a console application, with no Forms or WPF
context available, and was able to have it "work" without explicitly
creating one myself. (Albeit with the same non-marshaling behavior
you're seeing).

Anyway, it seems to me that SynchronizationContext is most useful in a
couple of scenarios:

-- When you don't necessarily need thread-safety, but you do have a
potential for thread-affinity for some GUI object and want to solve that
in a way that's not dependent on your specific GUI implementation, or
even the presence of one.

-- When you know you have a Forms or WPF implementation, and would
like to use the main-thread-marshaling behavior of the
SynchronizationContext to serialize access to your shared data
structures rather than using other synchronization techniques.

Unfortunately, I don't think either of these situations apply to you.

You have two options I see:

-- Implement the thread-marshaling behavior yourself. You can
either do this by fully implementing it in a SynchronizationContext
subclass and continuing to use SynchronizationContext, or you can just
switch to a more basic mechanism where you maintain a queue of delegates
and data to execute, having that queue consumed on a particular thread.

-- Just use the other synchronization techniques (e.g. "lock"
statement) to ensure coherency and synchronization between threads.

Personally, I would go with the latter. Since your code otherwise works
fine absent a GUI environment, obviously the code in question doesn't
actually use any GUI objects, and so the thread affinity inherent in
Forms and WPF isn't an issue. So you don't really _need_ the
thread-specific behavior of SynchronizationContext. Instead, it must be
simply that you need to ensure serialized access to the data structures,
which can be accomplished using other synchronization techniques, such
as the "lock" statement.

You could in fact choose the first option and reimplement the
thread-marshaling behavior, but this is going to be more complicated
than just adding synchronization -- since, after all, thread-marshaling
requires the synchronization _and_ additional logic -- and it doesn't
appear that it will gain you anything you actually need.

Pete
 
T

Ty

Hi Pete
Thanks for you answer. Things start to clear up, just a little
though :)
Actually, I'm a bit surprised that you find yourself having to create a
SynchronizationContext explicitly.  I'm sure I've tested
SynchronizationContext in a console application, with no Forms or WPF
context available, and was able to have it "work" without explicitly
creating one myself.  (Albeit with the same non-marshaling behavior
you're seeing).

For non-gui apps there doesn't seem a SyncContext by default (at least
not under .net 2.0). This was also confirmed in some material i read
while investigating.
     -- Implement the thread-marshaling behavior yourself.  You can
either do this by fully implementing it in a SynchronizationContext
subclass and continuing to use SynchronizationContext, or you can just
switch to a more basic mechanism where you maintain a queue of delegates
and data to execute, having that queue consumed on a particular thread.
ahh. while writing this answer i think something dawned on me. In my
CheatSync class i could queue the Post or Send calls and then deque
and execute them (also in the CheatSync Class) in the mainthread. i
couldnt have the mainthread waiting for the queue, since this would
lock the application, no? maybe as background-worker wich fires events
into the mainthread witch then executes the queued delegates...
do you have an example on what "thread-marshaling" means? how is a
delegate executed on a certain thread? i'm having a hard time finding
the information in that case.

Your other explanations are also very helpful. but if not neccessary i
would rather not touch the DataManager class. it's actually quite
complex and the developer of it is currently not available. but it's
certainly possible to change (or copy) the DataManager. Also, i'm a
bit ahead of schedule and have the time to figure out some things i
don't yet understand.

Thank and greets

Beat
 
P

Peter Duniho

Ty said:
[...]
ahh. while writing this answer i think something dawned on me. In my
CheatSync class i could queue the Post or Send calls and then deque
and execute them (also in the CheatSync Class) in the mainthread. i
couldnt have the mainthread waiting for the queue, since this would
lock the application, no? maybe as background-worker wich fires events
into the mainthread witch then executes the queued delegates...

If you don't have a "mainthread" that is set up to receive "events"
"fired into" it, then the above solution just pushes the problem back
another layer that you have to solve.

But yes, if you have a way to signal your main thread to do something,
it's possible you could approach the problem in the way you describe.
Since you haven't posted a concise-but-complete code example that
reliably demonstrates your scenario, there's no way for any of us to
know for sure.
do you have an example on what "thread-marshaling" means? how is a
delegate executed on a certain thread? i'm having a hard time finding
the information in that case.

"Thread-marshaling" is my way of describing what these kinds of classes
do. I.e. calling Control.Invoke(), Dispatcher.Invoke(),
SynchronizationContext.Send(), etc. As far as how to do it, you either
use the built-in features, or you write your own.

If you are writing your own, you start with the knowledge that to invoke
a delegate, you need a reference to the delegate, and an array of
arguments passed to the delegate. Then you proceed from there,
implementing a design that allows the thread where you want the delegate
to execute to retrieve those pieces of information and invoke the
delegate at the appropriate time (whatever that happens to be for your
application).

An obvious approach to that would be to have a queue shared by one or
more producers and a consumer thread. Each element of the queue would
contain references to the data needed to invoke a delegate (references
to the delegate and an array containing the arguments). The producers
would add elements to the queue, the consumer would remove them and
invoke the delegate represented by them.

Of course, access to the queue needs to be synchronized, and you need
some way to signal to the consumer thread that there is data to process
in the queue (when using a thread that needs to block while waiting for
work to consume, a typical solution involves using the Monitor class
with the Wait() and Pulse() methods).

Pete
 
T

Ty

An obvious approach to that would be to have a queue shared by one or
more producers and a consumer thread.  Each element of the queue would
contain references to the data needed to invoke a delegate (references
to the delegate and an array containing the arguments).  The producers
would add elements to the queue, the consumer would remove them and
invoke the delegate represented by them.

i made below class wich is able to solve the problem in the
DataManager. i simply set this class as Current
SynchronisationContext.

it's not quite finished yet because im not shure how to handle Send().
it's supposed to be executed in sync, but i'm queuing them too...
there are probably some other problems in it, but at least the
DataManager behaves nicely with it.i'm going to put some more work
into it.

thank you very much for your help!


<code>
public class SyncedContext : SynchronizationContext
{
Queue<Message> _messageQueue;
SyncEvents _syncEvents;
Thread _messageConsumer;

public bool Log = false;

static object lockObj = new object();

public SyncedContext()
{
_messageQueue = new Queue<Message>();
_syncEvents = new SyncEvents();

_messageConsumer = new Thread(new ThreadStart(Consumer));
_messageConsumer.Start();
}

public override void Send(SendOrPostCallback d, object state)
{
EnqueueMessage(SendOrPost.Send, d, state);
}

public override void Post(SendOrPostCallback d, object state)
{
EnqueueMessage(SendOrPost.Post, d, state);
}

void EnqueueMessage(SendOrPost messageType, SendOrPostCallback
d, object state)
{
lock (lockObj)
{
if (Log)
Print("incoming " + d.Method.Name);

Message msg = new Message();

msg.MessageType = messageType;
msg.d = d;
msg.state = state;

_messageQueue.Enqueue(msg);

_syncEvents.NewItemEvent.Set();
}
}

void Consumer()
{
while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1)
{
while (_messageQueue.Count > 0)
{
Message msg = null;

lock (lockObj)
{
msg = _messageQueue.Dequeue();
}

if (Log)
Print("executing " + msg.d.Method.Name);

msg.d(msg.state);
}

_syncEvents.NewItemEvent.Reset();
}
}

void Print(string text)
{
string line = string.Format("Thread {0}: {1}",
Thread.CurrentThread.ManagedThreadId, text);
Console.WriteLine(line);
}

protected internal class SyncEvents
{
protected internal EventWaitHandle ExitThreadEvent { get;
private set; }
protected internal EventWaitHandle NewItemEvent { get;
private set; }
protected internal WaitHandle[] EventArray { get; private
set; }

protected internal SyncEvents()
{
NewItemEvent = new AutoResetEvent(false);
ExitThreadEvent = new ManualResetEvent(false);
EventArray = new WaitHandle[2];
EventArray[0] = NewItemEvent;
EventArray[1] = ExitThreadEvent;
}
}

protected internal enum SendOrPost
{
Send,
Post
}

protected internal class Message
{
protected internal SendOrPost MessageType;
protected internal SendOrPostCallback d;
protected internal object state;
}
}

</code>
 
T

Ty

here's an update to my previous post. i've rearranged the
SyncedContext class a bit. also, in the particular service i'm working
on there is actually data arriving from 4 different threads. 3 from
the DataManager class and one from another provider. so i still had to
do some locking against the 2 datasources, wich is not neccessary when
starting as gui. and it's also cumbersome because some methods are
being touched by both datasources, making it difficult to avoid
locking itself up. so below is the solution without any locking
mechanism in the actual working code. gui and guiless work exactly the
same.
now all the little ducklings are always lined up without the need for
continuous herding :) maybe i reinvented wheel? isn't some mechanism
like this already existing? it was a good exercise anyway.

thanks for helping me Pete!

******

i added a second constructor to the start-class. one taking a
SyncronisationContext (startin from non-gui) the other parameterless
(starting as gui)

then added this static method to the start-class

/// <summary>
/// parm[0] bool consoleMode
/// parm[1] long DebugLevel
/// parm[2] long DebugLevelFile
/// parm[3] SyncronisationContext context
/// </summary>
/// <param name="state"></param>
public static void GuilessKickOff(object state)
{
object[] parms = (object[])state;

CondServerMain condServer = new CondServerMain
((SynchronizationContext)parms[3]);

condServer.Start((bool)parms[0]);
condServer.DebugLevel = Convert.ToInt64(parms[1]);
condServer.DebugLevelLogSink = Convert.ToInt64(parms[2]);
}


this is how i start the application without gui:

class Program
{
static void Main(string[] args)
{
SyncedContext context = new SyncedContext();
SynchronizationContext.SetSynchronizationContext(context);

context.Send(CondServerMain.GuilessKickOff,
new object[] { true, 2, 2, context });
}
}


and here's the slightly changed SyncedContext class. i'm still not
shure about how to handle Send/Post. both are being queued right now.
suggestions are welcome:

public class SyncedContext : SynchronizationContext
{
Queue<Message> _messageQueue;
SyncEvents _syncEvents;
Thread _messageConsumer;

public bool Log = false;

static object lockObj = new object();

public SyncedContext()
{
_messageQueue = new Queue<Message>();
_syncEvents = new SyncEvents();

_messageConsumer = new Thread(new ThreadStart(Consumer));
_messageConsumer.Start();
}


public override void Send(SendOrPostCallback d, object state)
{
EnqueueMessage(SendOrPost.Send, d, state);
}

public override void Post(SendOrPostCallback d, object state)
{
EnqueueMessage(SendOrPost.Post, d, state);
}

void EnqueueMessage(SendOrPost messageType, SendOrPostCallback
d, object state)
{
lock (lockObj)
{
if (Log)
Print("incoming " + d.Method.Name);

Message msg = new Message();

msg.MessageType = messageType;
msg.d = d;
msg.state = state;

_messageQueue.Enqueue(msg);

_syncEvents.NewItemEvent.Set();
}
}

void Consumer()
{
Message msg = null;

while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1)
{
lock(lockObj)
{
while(_messageQueue.Count>0)
{
msg = _messageQueue.Dequeue();
msg.d(msg.state);

if (Log)
Print("executing " + msg.d.Method.Name);
}

_syncEvents.NewItemEvent.Reset();
}
}
}

void Print(string text)
{
string line = string.Format("Thread {0}: {1}",
Thread.CurrentThread.ManagedThreadId, text);
Console.WriteLine(line);
}

protected internal class SyncEvents
{
protected internal EventWaitHandle ExitThreadEvent { get;
private set; }
protected internal EventWaitHandle NewItemEvent { get;
private set; }
protected internal WaitHandle[] EventArray { get; private
set; }

protected internal SyncEvents()
{
NewItemEvent = new AutoResetEvent(false);
ExitThreadEvent = new ManualResetEvent(false);
EventArray = new WaitHandle[2];
EventArray[0] = NewItemEvent;
EventArray[1] = ExitThreadEvent;
}
}

protected internal enum SendOrPost
{
Send,
Post
}

protected internal class Message
{
protected internal SendOrPost MessageType;
protected internal SendOrPostCallback d;
protected internal object state;
}
}
 
P

Peter Duniho

Ty said:
here's an update to my previous post. i've rearranged the
SyncedContext class a bit. also, in the particular service i'm working
on there is actually data arriving from 4 different threads. 3 from
the DataManager class and one from another provider. so i still had to
do some locking against the 2 datasources, wich is not neccessary when
starting as gui. and it's also cumbersome because some methods are
being touched by both datasources, making it difficult to avoid
locking itself up. so below is the solution without any locking
mechanism in the actual working code. gui and guiless work exactly the
same.
now all the little ducklings are always lined up without the need for
continuous herding :) maybe i reinvented wheel? isn't some mechanism
like this already existing? it was a good exercise anyway.

thanks for helping me Pete!

Glad I could help.

I haven't reviewed your code carefully, but a few quick comments:

-- In terms of the difference between Send() and Post(), you may
find it helpful to have a specific signaling object (e.g. AutoResetEvent
or a plain object with which you use the Monitor class) in the event
itself and wait for the consumer thread to signal that object. Then you
just queue the event, but block on that signaling object until the
consumer thread is done.

-- For signaling in any case, you may find it simpler and more
appropriate to use the Monitor class with the locking object and use the
Wait() and Pulse() methods (for consumer and producers, respectively)
for signaling of queue state.

-- You may want to not execute the dequeued delegate until after
you've released the lock. Alternatively, dequeue _all_ the currently
present elements in the queue (copy them to a local array) and then
release the lock before executing any of them. Either way, the goal is
to not call into external code while you hold the lock. This can
improve throughput, minimize unintended dependencies between producers,
and even avoid some deadlock conditions.

Speaking of deadlock...you also seem to imply that you ran into a
deadlock problem and removed locking from your code, which I don't
really understand. The code you posted does use the "lock" statement.
Was there other locking you removed?

Anyway, glad you're making progress and the suggestion worked out for you.

Pete
 
T

Ty

Thanks for the suggestions Pete. I've included some of your tips and
also implemented ISynchronizeInvoke, so the object can be used for
classes depending on this interface.
as for the deadlocks, there was a small problem when i did the locking
in the working code. with this SyncedContext it's fine.

here's the complete class. i'm shure there are still things in there
to be improved... also i'm not shure about the AsyncResult part. seems
to be working though.


public class SyncedContext : SynchronizationContext,
ISynchronizeInvoke
{
Queue<Message> _messageQueue;
Message _syncExecution;

SyncEvents _syncEvents;
Thread _messageConsumer;

static object lockObj = new object();

public SyncedContext()
{
_messageQueue = new Queue<Message>();
_syncEvents = new SyncEvents();

_messageConsumer = new Thread(new ThreadStart(Consumer));
_messageConsumer.Start();
}


#region SyncronisationContext Send/Post

public override void Send(SendOrPostCallback d, object state)
{
EnqueueMessage(SendOrPost.Send, d, state, null);
}

public override void Post(SendOrPostCallback d, object state)
{
EnqueueMessage(SendOrPost.Post, d, state, null);
}

public override void OperationCompleted()
{
_syncEvents.ExitThreadEvent.Set();
}

#endregion


#region ISynchronizeInvoke Members

public IAsyncResult BeginInvoke(Delegate method, object[]
args)
{
AsyncResult res = new AsyncResult();
EnqueueMessage(SendOrPost.Post, method, args, res);
return res;

}

public object EndInvoke(IAsyncResult result)
{
return null;
}

public object Invoke(Delegate method, params object[] args)
{
EnqueueMessage(SendOrPost.Send, method, args, null);
return null;
}



public bool InvokeRequired
{
get
{
return Thread.CurrentThread != _messageConsumer;
}
}

#endregion



void EnqueueMessage(SendOrPost messageType, Delegate d, object
state, AsyncResult asyncResult)
{
//Print("incoming " + d.Method.Name);
lock (lockObj)
{
Message msg = new Message();

msg.MessageType = messageType;
msg.Del = d;
msg.State = state;

switch (messageType)
{
case SendOrPost.Post:
msg.AsyncResult = asyncResult;
_messageQueue.Enqueue(msg);
break;

case SendOrPost.Send:
_syncExecution = msg;
break;
}
}

_syncEvents.NewItemEvent.Set();

if (messageType == SendOrPost.Send)
{
_syncEvents.SyncItemEvent.WaitOne();
_syncEvents.SyncItemEvent.Reset();
}
}

void Consumer()
{
Queue<Message> dequeuer = new Queue<Message>();

while (WaitHandle.WaitAny(_syncEvents.EventArray) != 2)
{
lock (lockObj)
{

if (_syncExecution != null)
{
Executer(_syncExecution);
_syncExecution = null;
_syncEvents.SyncItemEvent.Set();
}

while (_messageQueue.Count > 0)
dequeuer.Enqueue(_messageQueue.Dequeue());
}

while (dequeuer.Count > 0)
Executer(dequeuer.Dequeue());
}
}

void Executer(Message msg)
{
if (msg.Del is SendOrPostCallback)
((SendOrPostCallback)msg.Del).Invoke(msg.State);
else if (msg.Del is Delegate)
msg.Del.Method.Invoke(msg.Del.Target, (object[])
msg.State);


if (msg.AsyncResult != null)
{
lock (msg.AsyncResult.AsyncWaitHandle)
{
msg.AsyncResult.IsCompleted = true;
Monitor.Pulse(msg.AsyncResult.AsyncWaitHandle);
}
}
}

void Print(string text)
{
string line = string.Format("Thread {0}: {1}",
Thread.CurrentThread.ManagedThreadId, text);
Console.WriteLine(line);
}


public class AsyncResult : IAsyncResult
{
public object AsyncState { get; set; }
public WaitHandle AsyncWaitHandle { get; set; }
public bool CompletedSynchronously { get; set; }
public bool IsCompleted { get; set; }

public AsyncResult()
{
AsyncWaitHandle = new AutoResetEvent(false);
}
}


protected internal class SyncEvents
{
protected internal EventWaitHandle ExitThreadEvent { get;
private set; }
protected internal EventWaitHandle NewItemEvent { get;
private set; }
protected internal EventWaitHandle SyncItemEvent { get;
private set; }
protected internal WaitHandle[] EventArray { get; private
set; }

protected internal SyncEvents()
{
NewItemEvent = new AutoResetEvent(false);
SyncItemEvent = new ManualResetEvent(false);
ExitThreadEvent = new ManualResetEvent(false);
EventArray = new WaitHandle[3];
EventArray[0] = NewItemEvent;
EventArray[1] = SyncItemEvent;
EventArray[2] = ExitThreadEvent;
}
}

protected internal enum SendOrPost
{
Send,
Post
}

protected internal class Message
{
protected internal SendOrPost MessageType;
protected internal Delegate Del;
protected internal object State;
protected internal AsyncResult AsyncResult;
}
}
 
P

Peter Duniho

Ty said:
Thanks for the suggestions Pete. I've included some of your tips and
also implemented ISynchronizeInvoke, so the object can be used for
classes depending on this interface.
as for the deadlocks, there was a small problem when i did the locking
in the working code. with this SyncedContext it's fine.

here's the complete class. i'm shure there are still things in there
to be improved... also i'm not shure about the AsyncResult part. seems
to be working though.

Took a quick look. I'm afraid you've got at least one bug. Because
your synchronized invocation (Send()) implementation doesn't enqueue the
operation, but rather uses a specific variable to store it, you could
lose invocations. Specifically, there's a window between when the
producer has set the variable and when the consumer next acquires the
lock in which some other producer can also set the variable, overwriting
the previous value.

You could probably put together some combination of EventWaitHandle
objects that would allow all the different threads to coordinate,
causing subsequent producers trying to call Send() to wait until the
current Send() has been processed. But IMHO a cleaner, more
maintainable approach would be to just enqueue the invocations for
Send() just like you do for Post(), but with the difference that you
wait for the enqueue invocation to finish (obviously this requires some
kind of signaling specific to the enqueued invocation).

The queue for invocations passed to Send() can either be the same as the
one for Post() or, if you for some reason want to prioritize the two
different kinds of operations separately (e.g. make Send() invocations
execute before any Post() invocations), you could have multiple queues.

Pete
 
T

Ty

Took a quick look.  I'm afraid you've got at least one bug.  Because
your synchronized invocation (Send()) implementation doesn't enqueue the
operation, but rather uses a specific variable to store it, you could
lose invocations.  
Took a quick look. I'm afraid you've got at least one bug. Because
your synchronized invocation (Send()) implementation doesn't enqueue the
operation, but rather uses a specific variable to store it, you could
lose invocations.

yep, you're right of course. i've changed quite a bit (again...) and
me thinks i got it now :)

public class SyncedContext : SynchronizationContext,
ISynchronizeInvoke
{
Queue<Message> _msgQueue;
Thread _messageConsumer;
Dictionary<int, AutoResetEvent> _syncMessagesHandles;
WaitHandle[] _eventArray;
EventWaitHandle _exitThreadEvent;
EventWaitHandle _newItemEvent;

static object lockQueue = new object();

public SyncedContext()
{
_msgQueue = new Queue<Message>();

_eventArray = new WaitHandle[2];
_newItemEvent = new AutoResetEvent(false);
_exitThreadEvent = new ManualResetEvent(false);
_eventArray[0] = _newItemEvent;
_eventArray[1] = _exitThreadEvent;

_syncMessagesHandles = new Dictionary<int, AutoResetEvent>
();

_messageConsumer = new Thread(new ThreadStart(Consumer));
_messageConsumer.Start();
}


#region SyncronisationContext Send/Post

public override void Send(SendOrPostCallback d, object state)
{
EnqueueMessage(SendOrPost.Send, d, state, null);
}

public override void Post(SendOrPostCallback d, object state)
{
EnqueueMessage(SendOrPost.Post, d, state, null);
}

public override void OperationCompleted()
{
_exitThreadEvent.Set();
}

#endregion


#region ISynchronizeInvoke Members

public IAsyncResult BeginInvoke(Delegate method, object[]
args)
{
AsyncResult res = new AsyncResult();
EnqueueMessage(SendOrPost.Post, method, args, res);
return res;
}

public object EndInvoke(IAsyncResult result)
{
return null;
}

public object Invoke(Delegate method, params object[] args)
{
EnqueueMessage(SendOrPost.Send, method, args, null);
return null;
}

public bool InvokeRequired
{
get
{
return Thread.CurrentThread != _messageConsumer;
}
}

#endregion



void EnqueueMessage(SendOrPost messageType, Delegate d, object
state, AsyncResult asyncResult)
{
Message msg = new Message();

lock (lockQueue)
{
msg.MessageType = messageType;
msg.Del = d;
msg.State = state;
msg.AsyncResult = asyncResult;

if (messageType == SendOrPost.Send)
{
int threadId =
Thread.CurrentThread.ManagedThreadId;

if (!_syncMessagesHandles.ContainsKey(threadId))
_syncMessagesHandles.Add(threadId, new
AutoResetEvent(false));

msg.ExecutedWaitHandle = _syncMessagesHandles
[threadId];
}

_msgQueue.Enqueue(msg);
}

_newItemEvent.Set();

if (messageType == SendOrPost.Send)
msg.ExecutedWaitHandle.WaitOne();
}

void Consumer()
{
Queue<Message> dequeuer = new Queue<Message>();

while (WaitHandle.WaitAny(_eventArray) != 1)
{
lock (lockQueue)
{
while (_msgQueue.Count > 0)
dequeuer.Enqueue(_msgQueue.Dequeue());
}

while (dequeuer.Count > 0)
Executer(dequeuer.Dequeue());
}
}

void Executer(Message msg)
{
if (msg.Del is SendOrPostCallback)
((SendOrPostCallback)msg.Del)(msg.State);
else
msg.Del.Method.Invoke(msg.Del.Target, (object[])
msg.State);

if (msg.AsyncResult != null)
{
lock (msg.AsyncResult.AsyncWaitHandle)
{
msg.AsyncResult.IsCompleted = true;
Monitor.Pulse(msg.AsyncResult.AsyncWaitHandle);
}
}

if (msg.ExecutedWaitHandle != null)
msg.ExecutedWaitHandle.Set();
}

void Print(string text)
{
string line = string.Format("Thread {0}: {1}",
Thread.CurrentThread.ManagedThreadId, text);
Console.WriteLine(line);
}


public class AsyncResult : IAsyncResult
{
public object AsyncState { get; set; }
public WaitHandle AsyncWaitHandle { get; set; }
public bool CompletedSynchronously { get; set; }
public bool IsCompleted { get; set; }

public AsyncResult()
{
AsyncWaitHandle = new AutoResetEvent(false);
}
}

protected internal enum SendOrPost
{
Send,
Post
}

protected internal class Message
{
protected internal SendOrPost MessageType;
protected internal Delegate Del;
protected internal object State;
protected internal AsyncResult AsyncResult;
protected internal AutoResetEvent ExecutedWaitHandle;
}
}
 
P

Peter Duniho

Ty said:
yep, you're right of course. i've changed quite a bit (again...) and
me thinks i got it now :)

Okay...almost there. :)

I think the biggest issue is that you haven't fully implemented
IAsyncResult. The EndInvoke() method is required to block until the
operation has completed. It should also return the value returned by
the invoked delegate.

It also looks to me as though you've conflated the argument list with a
particular argument. That is, your primary API (Send/Post) takes as an
argument a single state object, while ISynchronizeInvoke allows an
arbitrary argument list to be passed. But, your code takes that
arbitrary argument list and passes it to the delegate as a single object.

You seem to have tried to address this in your Executer() method, but
you still have a problem: if someone calls ISynchronizeInvoke using a
delegate of type SendOrPostCallback, the called method will receive a
single-element object[] array, rather than the actual state object intended.

It seems to me that it would be better to generalize the invocation to
match the general delegate invocation, so that the Executer() method
doesn't need to check the type of the delegate. Just always store the
argument list as an object[] in your Message class, and then invoke the
delegate using the argument list. For calls to Send() or Post(), that
means you'll take the "state" argument and wrap it in an object[].

I also would use the Delegate.DynamicInvoke() rather than going through
the Delegate.Method property value. It's a more direct approach, and
won't arbitrarily restrict the use of multicast delegates (i.e.
delegates with more than one method).

Another issue that appears in your ISynchronizeInvoke implementation is
how you signal the wait handle. You are calling Monitor.Pulse(), but
that's not the correct way to signal that object. You need to call the
Set() method on the object itself (and of course that means casting it
to its correct type, or at least EventWaitHandle).

Some minor issues I also see:

-- You really only need one event for managing the consumer thread.
Have a separate flag that signals the thread to exit, and when it
wakes from the event wait, it can check that flag. Events are much more
expensive objects than a simple flag. :)

-- Another advantage of using only one event is that it would
eliminate the need for the dependency on the event handle array index
for detecting the exit condition. That's a maintenance issue you've got
right now, as you've got one section of code that is completely
dependent on some other section of code, and nothing to enforce any
connection between the two (e.g. if someone comes along later and
changes the arrangement of the event handle array, it can break the exit
logic).

-- Actually, I wouldn't have used events at all...instead, I'd use
the Monitor class with Pulse() and Wait(), but that's a separate point
and not nearly as important.

-- A more typical implementation of IAsyncResult is to initialize
the AsyncWaitHandle object lazily. That is, only if some code actually
tries to retrieve the value from the property. Again, this is related
to the expense of a WaitHandle object, and avoiding that expense except
when it's actually needed.

-- Related to the previous point is the question of disposing the
WaitHandle object. Unfortunately, Microsoft guidance on the question is
IMHO poor. But my preferred approach is to dispose the object in the
call to EndInvoke(). As part of that (i.e. not instead of) you may also
want to implement IDisposable in the AsyncResult class, but doing so
then raises the question of what the appropriate response is if the
client code disposes the AsyncResult object before the asynchronous
operation has completed (i.e. your own code should still be able to
successfully complete the operation, but within the IAsyncResult
implementation, an exception should be thrown if the client code ever
tries to do something that doesn't make sense after disposal, such as
retrieving the AsyncWaitHandle object).

Related to the cost of a WaitHandle object is something I haven't had a
chance to explore yet. Specifically, when you handle a Send(), you
create a WaitHandle object and then use that for signaling. I think
it's possible that allocating a simple System.Object instance and then
using the Monitor class, again with Wait() and Pulse(), for signaling
may be more efficient.

I haven't actually benchmarked that, and haven't seen any specific
references along those lines. But my understanding is that generally,
the Monitor class provides for much more efficient synchronization than
relying on something heavier-weight like the WaitHandle classes. The
latter are extremely useful when you need a general purpose API to allow
unrelated pieces of code the ability to synchronize with each other, but
when dealing with code that's all part of the same implementation, you
have the freedom to use the built-in .NET synchronization mechanism
provided by the Monitor class.

I'm curious, so if I get some time to play with it, I'll see if I can
measure a performance difference between the two approaches. I'll post
back here if I find anything interesting.

Pete
 
P

Peter Duniho

Peter said:
[...]
I'm curious, so if I get some time to play with it, I'll see if I can
measure a performance difference between the two approaches. I'll post
back here if I find anything interesting.

Okay, I went ahead and tested it out. And sure enough, the Monitor
class is somewhat more efficient than using an EventWaitHandle (see code
below).

I tested four scenarios:

-- Using ManualResetEvent, newly allocated each operation
-- Using Monitor on a plain object, newly allocated each operation
-- Using ManualResetEvent, allocated once
-- Using Monitor on a plain object, allocated once

I included the latter two because I was curious as to whether there
might be some performance benefit to maintaining a pool of
synchronization objects.

The results, based on several trials on my running copy of Windows 7
(admittedly "noisy", performance-wise...it's in a VM, and on a PC doing
lots of other things):

-- Using an EventWaitHandle costs about 20% more time than using
Monitor. In the best case, it was about 13%, but the worst case was 24%.
-- While caching the EventWaitHandle object did produce a
performance improvement over not caching the EventWaitHandle, it wasn't
enough of an improvement to be as fast as an uncached Monitor
implementation.

-- Most interesting (to me, anyway) was that caching the object in
the Monitor implementation was sometimes _slower_ than not caching the
object. This was reproduced over several trials. Sometimes it would be
faster, sometimes not. My take on this is that there's so little
difference between caching and not caching in the Monitor scenario (i.e.
it's small enough that the noise in measurements can exceed the
difference), that it's probably not worth the extra complexity. Just
let the GC deal with the little objects as they pile up.

Finally, one thing I noted was that no matter what approach one takes,
you can only get about 7-8000 invocations per second. That's not a lot
of work to do in a second. Obviously this isn't something you'd really
want to be as a bottleneck in your program.

And in fact, assuming the cross-thread invocation isn't a bottleneck,
the 20% difference in performance is probably not worth worrying about,
especially if there's some other strong motivation for using
ManualResetEvent instead of the Monitor class.

(For completeness, I'll mention that one other form of overhead for
ManualResetEvent is the underlying OS object supporting it, which
doesn't exist for the approach using Monitor. This _might_ be a concern
if it turns out there's some potential for having a large number of
synchronous invocations "in-flight" -- i.e. not yet completed, with
threads waiting on the completion -- say, thousands of invocations. But
that would imply thousands of threads all trying to do a cross-thread
invocation at the same time. If that happens, there's something worse
wrong with the code that the potential overhead of allocating all those
ManualResetEvent objects, so I consider that particular point of
overhead moot :) ).

Pete


using System;
using System.Threading;
using System.Diagnostics;

namespace BenchMonitorVsEvent
{
class Program
{
const long kciterWarmup = 100;
const long kciterMax = 100000;

static long iiter;

static void Main(string[] args)
{
TimeSpan tsEvent, tsMonitor, tsEventPersist, tsMonitorPersist;
long citerMax;
IWaitObject woEventPersist = new PersistWaitObject(new
EventWaitObject()),
woMonitorPersist = new PersistWaitObject(new
MonitorWaitObject());

if (args.Length < 1 || !long.TryParse(args[0], out citerMax))
{
citerMax = kciterMax;
}
Console.WriteLine("Testing with {0} iterations",
citerMax.ToString());

using (Invoker invokerEvent = new Invoker(() => new
EventWaitObject()),
invokerMonitor = new Invoker(() => new
MonitorWaitObject()),
invokerEventPersist = new Invoker(() => woEventPersist),
invokerMonitorPersist = new Invoker(() =>
woMonitorPersist))
{
Stopwatch sw = new Stopwatch();

iiter = 0;
while (iiter < kciterWarmup)
{
invokerEvent.Invoke(_InvokedAction);
}

iiter = 0;
while (iiter < kciterWarmup)
{
invokerMonitor.Invoke(_InvokedAction);
}

iiter = 0;
sw.Reset();
GC.Collect();
sw.Start();
while (iiter < citerMax)
{
invokerEvent.Invoke(_InvokedAction);
}
sw.Stop();
tsEvent = sw.Elapsed;

iiter = 0;
sw.Reset();
GC.Collect();
sw.Start();
while (iiter < citerMax)
{
invokerMonitor.Invoke(_InvokedAction);
}
sw.Stop();
tsMonitor = sw.Elapsed;

iiter = 0;
sw.Reset();
GC.Collect();
sw.Start();
while (iiter < citerMax)
{
invokerEventPersist.Invoke(_InvokedAction);
}
sw.Stop();
tsEventPersist = sw.Elapsed;

iiter = 0;
sw.Reset();
GC.Collect();
sw.Start();
while (iiter < citerMax)
{
invokerMonitorPersist.Invoke(_InvokedAction);
}
sw.Stop();
tsMonitorPersist = sw.Elapsed;
}

Console.WriteLine("Event-based: {0}", tsEvent);
Console.WriteLine("Monitor-based: {0}", tsMonitor);
Console.WriteLine("Event-based, persist: {0}", tsEventPersist);
Console.WriteLine("Monitor-based, persist: {0}",
tsMonitorPersist);
Console.ReadLine();
}

private static void _InvokedAction()
{
iiter++;
}
}

class Invoker : IDisposable
{
private object _obj = new object();
private bool _fExit;
private Delegate _delInvoke;
private object[] _argsInvoke;
private volatile IWaitObject _wo;
private Thread _threadConsumer;
private Func<IWaitObject> _funcWOFactory;

public Invoker(Func<IWaitObject> funcWOFactory)
{
_funcWOFactory = funcWOFactory;
_threadConsumer = new Thread(_ConsumerThread);
_threadConsumer.Start();
}

public void Invoke(Action action)
{
Invoke(action, (object[])null);
}

public void Invoke(Delegate del, params object[] args)
{
lock (_obj)
{
_wo = _funcWOFactory();
_delInvoke = del;
_argsInvoke = args;
Monitor.Pulse(_obj);
}

_wo.Wait();
_wo.Dispose();
_wo = null;
}

private void _ConsumerThread()
{
lock (_obj)
{
while (!_fExit)
{
if (_wo != null)
{
_delInvoke.DynamicInvoke(_argsInvoke);
_wo.Signal();
}

Monitor.Wait(_obj);
}
}
}

#region IDisposable Members

public void Dispose()
{
lock (_obj)
{
_fExit = true;
Monitor.Pulse(_obj);
}
_threadConsumer.Join();
}

#endregion
}

interface IWaitObject : IDisposable
{
void Signal();
void Unsignal();
void Wait();
}

class EventWaitObject : IWaitObject
{
private EventWaitHandle _ewh = new ManualResetEvent(false);

public void Signal()
{
_ewh.Set();
}

public void Unsignal()
{
_ewh.Reset();
}

public void Wait()
{
_ewh.WaitOne();
}

#region IDisposable Members

public void Dispose()
{
((IDisposable)_ewh).Dispose();
}

#endregion
}

class MonitorWaitObject : IWaitObject
{
private object _obj = new object();
private bool _fSignaled;

public void Signal()
{
lock (_obj)
{
_fSignaled = true;
Monitor.Pulse(_obj);
}
}

public void Unsignal()
{
lock (_obj)
{
_fSignaled = false;
}
}

public void Wait()
{
lock (_obj)
{
while (!_fSignaled)
{
Monitor.Wait(_obj);
}
}
}

#region IDisposable Members

public void Dispose()
{
// nop
}

#endregion
}

class PersistWaitObject : IWaitObject
{
private IWaitObject _wo;

public PersistWaitObject(IWaitObject wo)
{
_wo = wo;
}

#region IWaitObject Members

public void Signal()
{
_wo.Signal();
}

public void Unsignal()
{
_wo.Unsignal();
}

public void Wait()
{
_wo.Wait();
}

#endregion

#region IDisposable Members

public void Dispose()
{
_wo.Unsignal();
}

#endregion
}
}
 
T

Ty

oh thanks for outclassing my class :)

i couldn't fully analyze your answer and class. the problem with the
two delegate-types is certainly a issue. i'm not so shure about the
performance cost of the way i do it. but i will look at it asap. i did
test it somewhat before and got arround 100'000 sends/invokes a
second. but it was not very thoroughly tested.

got a new subject i'm working on at the moment, wich unfortunatly i
have to post to the ng as well...

greets

beat
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Top