thread-safety

R

RedLars

The below method is executed in a separate thread, processing tasks
added to a queue.
private void Thread_Main() {
while (!TerminatedThread) {
// Next() blocks if queue is empty
IJob job = queue.Next() as IJob;
if (job != null) job.Execute();
}
}

To add tasks there are two public methods;
public void AddJob(IJob job) {
queue.Add(job);
}

public void AddJob(IJob job, int intervalTime) {
Timer timer = new Timer(new TimerCallback(InternalAddJob), job,
intervalTime, intervalTime);
timerList.Add(timer);
}

private void InternalAddJob(object obj) {
IJob job = obj as IJob;
if (job != null) AddJob(job);
}

And to start the execution in the thread:
public void Start() {
thread = new Thread(new ThreadStart(this.Thread_Main));
thread.Start();
}

This setup has worked fine so far. Now I need to implement a stop()
method which brings up a few concerns regarding thread safety.

A few comments
* A job.Execute() beeing executed must be completed before the thread
is stopped.
* A job.Execute() operation might take upto a few seconds to complete
(Asynch operation).
* Any unprocessed tasks in the queue, or created timers shall not be
deleted by Stop() method.
* The Queue (queue variable) has a Wakeup() method that stops the
queue.Next() from blocking and returns null;

To prevent stop() occuring during execution of a job a lock(..) was
introduced.
private object threadLocker = new object();
private void Thread_Main() {
while (!TerminatedThread) {
lock (threadLocker) {
IJob job = queue.Next() as IJob;
if (job != null) job.Execute();
}
}
}

To actually stop the while loop from running I thought about adding
Monitor.Wait(...) to the loop. But to be able to add such mechanism I
would need a state flag to distinguish between running and not running
(stopped) mode. So I thought about adding private enum State
{ Created, Started, Stopped };

private volatile State state;
private ManualResetEvent StopThreadResetEvent = new ManualResetEvent
(false);
private void Thread_Main()
{
while (!TerminatedThread) {
if (state == State.Started) {
lock (threadLocker) {
IJob job = queue.Next() as IJob;
if (job != null) job.Execute();
}
}
else if (state == State.Stopped)
{
Monitor.Wait(StopThreadResetEvent);
}
}

So would it be enough to implement the stop method like this (?);
private void Stop() {
if (state == State.Started) {
state = State.Stopped;
queue.Wakeup();
}
}
From what I can understand this would not guarantee that Thread_Main
is at Monitor.Wait(...) when Stop() returns.
Could that be a problem? I guess I could add
lock (threadLocker) { /* no code */ }
after Wakeup() as a form of waiting until Thread_Main has completed
its current task.

I would also need to change the Start() method a bit;
public void Start() {
if (state == State.Created)
thread = new Thread(new ThreadStart(this.Thread_Main));
thread.Start();
}
else if (state == State.Stopped)
{
state = State.Started
StopThreadResetEvent.Set();
}
}
What if Stop() and Start() are executed within a very short timeframe
- could that cause a problem with the current code?

I get the feeling I'm missing something when it comes handling state
\state-changes in a multi-threaded environment.

Appreicate any comments.
 
P

Peter Morris

This has been discussed in depth in this group in the past. Not only this
but the discussion has included various source code examples too, including
two approaches to stopping

01: Stop at the end of the current job.
02: Stop at the end of all queued jobs.
 
R

RedLars

Thank you very much for such a detailed response. Will try and answer
your comments below.

[...]
This setup has worked fine so far. Now I need to implement a stop()
method which brings up a few concerns regarding thread safety.

Just for the record, there's nothing in the code you posted that suggests 
it's itself thread-safe.  We might assume it is, but that would depend on  
the implementation of your Queue class (obviously not one of the .NET  
queue classes).
A few comments
* A job.Execute() beeing executed must be completed before the thread
is stopped.
* A job.Execute() operation might take upto a few seconds to complete
(Asynch operation).
* Any unprocessed tasks in the queue, or created timers shall not be
deleted by Stop() method.
* The Queue (queue variable) has a Wakeup() method that stops the
queue.Next() from blocking and returns null;
To prevent stop() occuring during execution of a job a lock(..) was
introduced.

What do you mean "prevent stop() occurring during execution of a job"?  As  
you've already noted, until your job.Execute() method comes back, your  
consumer thread isn't going to do anything else, including stopping.  
What's in the "stop()" method that you need to not occur until after  
job.Execute() returns?

Apoligize for poor wording. What I'd like to achieve is for the Stop()
method to block until the consumer-thread actually has stopped
processing tasks. So if the job.Execute() is half-way into processing
a task, then I would like Stop to block until that task has been
completed before returning to calleer.
What other code locks on "threadLocker"?  Without knowing that, there'sno  
useful comments to be made about the above.


How would that be different from your queue implementation which already  
has a "release on new item or wake-up" semantic built right in?  (For that  
matter, I'm surprised at the implication that the queue implementation  
doesn't already use Monitor.Wait() or something similar).

The semantics I've worked by so far is Stop() means to pause-
processing-of-the-queue and not stop-adding-tasks-to-queue.

Say as an example I ran the following code;
ThreadClass.AddJob( job1 );
ThreadClass.AddJob( job2 );
ThreadClass.AddJob( job3 );
ThreadClass.Stop();
Assuming each job requires 1 second to complete, I would expect the
first job to be completed before the thread has been stopped
(obviously depends on current state of ThreadClass and timing).

So the way I see this, the while loop needs a "sleep" state, it cannot
rely on the queue.Next() to block because it might still contain data.

The Queue implementation uses ManualResetEvent.
That code won't work.  The thread has to have taken the lock on  
"StopThreadResetEvent" before it can call Monitor.Wait() using that  
object.  That said...

Point taken.
Who sets "StopThreadResetEvent"?  Why do you need that?  The Monitor class  
certainly doesn't.  It's odd for you to be passing a WaitHandle of any  
sort to the Monitor class; if you've got a WaitHandle, you would normally 
just wait on _that_, rather than using the WaitHandle simply as a  
synchronization object for Monitor.

Will look into that.
This Stop() method doesn't interact with any of the synchronization you  
put into your consumer thread.  If you meant for the synchronization to 
somehow affect the way that the Stop() method works, you've failed in that  
goal.

The goal was to set the while loop into a non-processing-tasks-state.
Then the Start() method could interact with synchronization in
consumer thread.
It would be a problem to call Monitor.Pulse() if no other thread is  
waiting at a call to Monitor.Wait(), because then the waiting thread would  
miss the Pulse().  However, typically the waiting thread would always be  
either in the lock or waiting, ensuring that no other thread _can_ call  
Monitor.Pulse() until the waiting thread is in fact waiting.


I don't see what the point of that would be.  It would only block if some  
other thread already had the "threadLocker" lock, but a) there's nothing  
in the code you posted that would indicate your consumer thread would be  
doing anything useful with that lock after you lock on it, and b) if that 
thread _were_ going to do something useful with that lock, you would do  
that useful thing _inside_ the locked block of code (instead of "no code").

The point i was trying to make was, if the consumer thread is
currently processing a task when the Stop() is executed and Stop()
only changed the state, then method would return prematurely (before
the consumer thread actually was in a Stopped \ pause state).
 
R

RedLars

[...]
Apoligize for poor wording. What I'd like to achieve is for the Stop()
method to block until the consumer-thread actually has stopped
processing tasks. So if the job.Execute() is half-way into processing
a task, then I would like Stop to block until that task has been
completed before returning to calleer.

Do you really need this?  It seems somewhat arbitrary to me, and it  
definitely is complicating things.  In addition to making the threading 
issues more difficult, it also will have the effect of blocking your UI if  
you're calling the method in response to user input.

If it's absolutely necessary, we can work with it, but a lot of the  
difficulty you're having simply goes away if you can get rid of this  
requirement.
Will try to remove this requirement.

The UI thread is not involved in this process.
[...]
So would it be enough to implement the stop method like this (?);
private void Stop() {
    if (state == State.Started) {
           state = State.Stopped;
           queue.Wakeup();
   }
}
This Stop() method doesn't interact with any of the synchronization you
put into your consumer thread.  If you meant for the synchronizationto
somehow affect the way that the Stop() method works, you've failed in  
that  
goal.
The goal was to set the while loop into a non-processing-tasks-state.
Then the Start() method could interact with synchronization in
consumer thread.

That's a different goal than you've stated earlier in your reply.  In  
particular, you said you'd like the Stop() method to block until the  
consumer thread has actually finished executing the current job.  This  
Stop() method doesn't do that (unless there's something about the Wakeup()  
method you haven't mentioned yet that would have that effect).

Do you want Stop() to block?  Or Start()?


[...]
I guess I could add
lock (threadLocker) { /* no code */ }
after Wakeup() as a form of waiting until Thread_Main has completed
its current task.
I don't see what the point of that would be.  It would only block if 
some  
other thread already had the "threadLocker" lock, but a) there's  
nothing  
in the code you posted that would indicate your consumer thread would  
be  
doing anything useful with that lock after you lock on it, and b) if  
that  
thread _were_ going to do something useful with that lock, you would do
that useful thing _inside_ the locked block of code (instead of "no  
code").
The point i was trying to make was, if the consumer thread is
currently processing a task when the Stop() is executed and Stop()
only changed the state, then method would return prematurely (before
the consumer thread actually was in a Stopped \ pause state).

Okay, well...in fact you have a race condition then taking this approach, 
in which the consumer thread may have just finished checking the state  
variable but not yet begun the next job, when some other thread calls  
Stop().  That call to Stop() will proceed without interference, returning  
to the caller and then when the consumer next gets to run, it will go  
ahead and execute the next job in the queue before it gets around to  
pausing.

For this sort of thing to work, you really need to synchronize access to  
the state variable in both the consumer thread and the Stop() method, so  
that you are guaranteed that the consumer thread can only look at and act 
on the state variable when it knows no other thread can modify it, and  
likewise that any other thread can modify the variable knowing that the  
consumer thread isn't going to miss the change.

And that sort of synchronization will result in there being some actual  
code inside the "lock" statement's block of code.

Pete

would something like this prevent the race condition?

private void Thread_Main() {
while (!TerminatedThread) {
lock (threadLocker) {
if (state == State.Started) {
IJob job = queue.Next() as IJob;
if (job != null) job.Execute();
}
}
}

private void Stop()
{
lock (threadLocker) {
if (state == State.Started) {
state = State.Stopped;
queue.Wakeup();
}
}
}
 
R

RedLars

Sure.  But you're guaranteed a deadlock instead when the queue is empty..  
Not sure that's such a good trade-off.  :)

Pete

Good point.

Thought about fixing the deadlock issue with the below code but it
might lead to a deadlock if consumer thread just manages to aquire the
lock before another thread executes queue.Wakeup in Stop(), given the
queue is empty. The queue.Wakeup will not have an affect because the
consumer thread has not reach the blocking call at this point.

private void Stop()
{
queue.Wakeup();
lock (threadLocker) {
if (state == State.Started) {
state = State.Stopped;
}
}
}

Do you have any suggestion on how to fix this issue?

Your help is much appreciated. Cheers.
 
R

RedLars

[...]
Thought about fixing the deadlock issue with the below code but it
might lead to a deadlock if consumer thread just manages to aquire the
lock before another thread executes queue.Wakeup in Stop(), given the
queue is empty. The queue.Wakeup will not have an affect because the
consumer thread has not reach the blocking call at this point.
private void Stop()
{
  queue.Wakeup();
  lock (threadLocker)  {
    if (state == State.Started)  {
       state = State.Stopped;
     }
   }
}
Do you have any suggestion on how to fix this issue?

Well, as I mentioned before, you've got a set-up that's causing trouble:  
your queue implements its own synchronization strategy, and you have this 
requirement that the Stop() method block until the currently executing job  
completes.

You could make the problem a lot easier if you could change either of  
those two things (or better, both).

I'm working on very little sleep at the moment, and I would prefer to not 
try to come up with a fix that addresses all of your goals at the moment. 
It's a complicated problem, or at least more complicated than I feel I  
ought to be trying to work on.  I might only wind up given lots of bad  
advice if I answered the question now.

But, I'll think about it when I've had more rest and try to remember to  
follow-up.  In the meantime, I'll suggest that if you can work out a way  
to directly use the Monitor class with the Wait() and Pulse() methods,  
that's likely to lead to a good solution, since those are meant exactly  
for this kind of thing (the "block the Stop() method" is still going to  
cause trouble, but it's at least a direction for you to look in the  
meantime).

Sorry for not being more helpful at the moment.

Pete

Thanks for your quick response. Given your response time to me posts I
was actually starting to wondering if actually did sleep :)

Will take your advice under consideration and get back to you.

Currently working on the issue...
 
R

RedLars

[...]
Thought about fixing the deadlock issue with the below code but it
might lead to a deadlock if consumer thread just manages to aquire the
lock before another thread executes queue.Wakeup in Stop(), given the
queue is empty. The queue.Wakeup will not have an affect because the
consumer thread has not reach the blocking call at this point.
private void Stop()
{
  queue.Wakeup();
  lock (threadLocker)  {
    if (state == State.Started)  {
       state = State.Stopped;
     }
   }
}
Do you have any suggestion on how to fix this issue?

Well, as I mentioned before, you've got a set-up that's causing trouble:  
your queue implements its own synchronization strategy, and you have this 
requirement that the Stop() method block until the currently executing job  
completes.

You could make the problem a lot easier if you could change either of  
those two things (or better, both).

I'm working on very little sleep at the moment, and I would prefer to not 
try to come up with a fix that addresses all of your goals at the moment. 
It's a complicated problem, or at least more complicated than I feel I  
ought to be trying to work on.  I might only wind up given lots of bad  
advice if I answered the question now.

But, I'll think about it when I've had more rest and try to remember to  
follow-up.  In the meantime, I'll suggest that if you can work out a way  
to directly use the Monitor class with the Wait() and Pulse() methods,  
that's likely to lead to a good solution, since those are meant exactly  
for this kind of thing (the "block the Stop() method" is still going to  
cause trouble, but it's at least a direction for you to look in the  
meantime).

Sorry for not being more helpful at the moment.

Pete

After some consideration I agree that the current queue implementation
and blocking stop is causing me problem and have decided to remove
both requirements.

Below I have included a suggested solution, any review of this code
would be much appreicated.

public interface IJob
{
void Execute();
}

internal class ThreadQueue
{
private Queue<IJob> queue = new Queue<IJob>();
private object locker = new object();

internal IJob Dequeue()
{
lock (locker)
{
if (queue.Count > 0)
{
return queue.Dequeue();
}
return null;
}
}

internal void Enqueue(IJob job)
{
lock (locker) queue.Enqueue(job);
}

internal void Clear()
{
lock (locker) queue.Clear();
}
}

public class ConsumerThread
{
private Thread thread;
private bool TerminateThread;
private EventWaitHandle waitHandle = new AutoResetEvent(false);

private enum ThreadState { Created, Started, Stopped }
private volatile ThreadState state = ThreadState.Created;
private object stateLocker = new object();

private ThreadQueue queue = new ThreadQueue();

private List<Timer> timerList = new List<Timer>();

public void Start()
{
lock (stateLocker)
{
if (state == ThreadState.Created)
{
state = ThreadState.Started;
thread = new Thread(new ThreadStart(ThreadMain));
thread.Start();
}
else if (state == ThreadState.Stopped)
{
state = ThreadState.Started;
waitHandle.Set();
}
}
}

public void Stop()
{
lock (stateLocker) state = ThreadState.Stopped;
}


public void Add(IJob job)
{
queue.Enqueue(job);
lock (stateLocker)
{
if (state == ThreadState.Started) waitHandle.Set();
}
}

public void Add(IJob job, int intervalTime)
{
System.Threading.Timer timer = new System.Threading.Timer(
new TimerCallback(InternalAdd), job, intervalTime,
intervalTime);
}

private void InternalAdd(object obj)
{
lock (stateLocker)
{
if (state != ThreadState.Started) return;
}
Add(obj as IJob);
}

private void ThreadMain()
{
while (!TerminateThread)
{
IJob job = null;
lock (stateLocker)
{
if (state == ThreadState.Started)
{
job = queue.Dequeue();
}
}
if (job != null)
{
job.Execute();
}
else
{
waitHandle.WaitOne();
}
}
}
}
 
R

RedLars

[...]
Thought about fixing the deadlock issue with the below code but it
might lead to a deadlock if consumer thread just manages to aquire the
lock before another thread executes queue.Wakeup in Stop(), given the
queue is empty. The queue.Wakeup will not have an affect because the
consumer thread has not reach the blocking call at this point.
private void Stop()
{
  queue.Wakeup();
  lock (threadLocker)  {
    if (state == State.Started)  {
       state = State.Stopped;
     }
   }
}
Do you have any suggestion on how to fix this issue?
Well, as I mentioned before, you've got a set-up that's causing trouble:  
your queue implements its own synchronization strategy, and you have this  
requirement that the Stop() method block until the currently executing job  
completes.
You could make the problem a lot easier if you could change either of  
those two things (or better, both).
I'm working on very little sleep at the moment, and I would prefer to not  
try to come up with a fix that addresses all of your goals at the moment.  
It's a complicated problem, or at least more complicated than I feel I  
ought to be trying to work on.  I might only wind up given lots of bad  
advice if I answered the question now.
But, I'll think about it when I've had more rest and try to remember to 
follow-up.  In the meantime, I'll suggest that if you can work out a way  
to directly use the Monitor class with the Wait() and Pulse() methods,  
that's likely to lead to a good solution, since those are meant exactly 
for this kind of thing (the "block the Stop() method" is still going to 
cause trouble, but it's at least a direction for you to look in the  
meantime).
Sorry for not being more helpful at the moment.

After some consideration I agree that the current queue implementation
and blocking stop is causing me problem and have decided to remove
both requirements.

Below I have included a suggested solution, any review of this code
would be much appreicated.

public interface IJob
{
    void Execute();

}

internal class ThreadQueue
{
    private Queue<IJob> queue = new Queue<IJob>();
    private object locker = new object();

    internal IJob Dequeue()
    {
        lock (locker)
        {
            if (queue.Count > 0)
            {
                return queue.Dequeue();
            }
            return null;
        }
    }

    internal void Enqueue(IJob job)
    {
        lock (locker) queue.Enqueue(job);
    }

    internal void Clear()
    {
        lock (locker) queue.Clear();
    }

}

public class ConsumerThread
{
    private Thread thread;
    private bool TerminateThread;
    private EventWaitHandle waitHandle = new AutoResetEvent(false);

    private enum ThreadState { Created, Started, Stopped }
    private volatile ThreadState state = ThreadState.Created;
    private object stateLocker = new object();

    private ThreadQueue queue = new ThreadQueue();

    private List<Timer> timerList = new List<Timer>();

    public void Start()
    {
        lock (stateLocker)
        {
            if (state == ThreadState.Created)
            {
                state = ThreadState.Started;
                thread = new Thread(new ThreadStart(ThreadMain));
                thread.Start();
            }
            else if (state == ThreadState.Stopped)
            {
                state = ThreadState.Started;
                waitHandle.Set();
            }
        }
    }

    public void Stop()
    {
        lock (stateLocker) state = ThreadState.Stopped;
    }

    public void Add(IJob job)
    {
        queue.Enqueue(job);
        lock (stateLocker)
        {
            if (state == ThreadState.Started) waitHandle.Set();
        }
    }

    public void Add(IJob job, int intervalTime)
    {
        System.Threading.Timer timer = new System.Threading.Timer(
                new TimerCallback(InternalAdd), job, intervalTime,
intervalTime);
    }

    private void InternalAdd(object obj)
    {
        lock (stateLocker)
        {
            if (state != ThreadState.Started) return;
        }
        Add(obj as IJob);
    }

    private void ThreadMain()
    {
        while (!TerminateThread)
        {
            IJob job = null;
            lock (stateLocker)
            {
                if (state == ThreadState.Started)
                {
                    job = queue.Dequeue();
                }
            }
            if (job != null)
            {
                job.Execute();
            }
            else
            {
                waitHandle.WaitOne();
            }
        }
    }



}– Skjul sitert tekst –

– Vis sitert tekst –– Skjul sitert tekst –

– Vis sitert tekst –

Just a follow up question. To validate execution across multiple
threads, will System.Diagnostics.Trace.WriteLine("methodName") show
the correct sequence of action viewed in the output window in vs2005?
(AutoFlush is set to true).
 
R

RedLars

[...]
Thought about fixing the deadlock issue with the below code but it
might lead to a deadlock if consumer thread just manages to aquire the
lock before another thread executes queue.Wakeup in Stop(), given the
queue is empty. The queue.Wakeup will not have an affect because the
consumer thread has not reach the blocking call at this point.
private void Stop()
{
  queue.Wakeup();
  lock (threadLocker)  {
    if (state == State.Started)  {
       state = State.Stopped;
     }
   }
}
Do you have any suggestion on how to fix this issue?
Well, as I mentioned before, you've got a set-up that's causing trouble:  
your queue implements its own synchronization strategy, and you have this  
requirement that the Stop() method block until the currently executing job  
completes.
You could make the problem a lot easier if you could change either of  
those two things (or better, both).
I'm working on very little sleep at the moment, and I would prefer to not  
try to come up with a fix that addresses all of your goals at the moment.  
It's a complicated problem, or at least more complicated than I feel I  
ought to be trying to work on.  I might only wind up given lots of bad  
advice if I answered the question now.
But, I'll think about it when I've had more rest and try to remember to 
follow-up.  In the meantime, I'll suggest that if you can work out a way  
to directly use the Monitor class with the Wait() and Pulse() methods,  
that's likely to lead to a good solution, since those are meant exactly 
for this kind of thing (the "block the Stop() method" is still going to 
cause trouble, but it's at least a direction for you to look in the  
meantime).
Sorry for not being more helpful at the moment.

After some consideration I agree that the current queue implementation
and blocking stop is causing me problem and have decided to remove
both requirements.

Below I have included a suggested solution, any review of this code
would be much appreicated.

public interface IJob
{
    void Execute();

}

internal class ThreadQueue
{
    private Queue<IJob> queue = new Queue<IJob>();
    private object locker = new object();

    internal IJob Dequeue()
    {
        lock (locker)
        {
            if (queue.Count > 0)
            {
                return queue.Dequeue();
            }
            return null;
        }
    }

    internal void Enqueue(IJob job)
    {
        lock (locker) queue.Enqueue(job);
    }

    internal void Clear()
    {
        lock (locker) queue.Clear();
    }

}

public class ConsumerThread
{
    private Thread thread;
    private bool TerminateThread;
    private EventWaitHandle waitHandle = new AutoResetEvent(false);

    private enum ThreadState { Created, Started, Stopped }
    private volatile ThreadState state = ThreadState.Created;
    private object stateLocker = new object();

    private ThreadQueue queue = new ThreadQueue();

    private List<Timer> timerList = new List<Timer>();

    public void Start()
    {
        lock (stateLocker)
        {
            if (state == ThreadState.Created)
            {
                state = ThreadState.Started;
                thread = new Thread(new ThreadStart(ThreadMain));
                thread.Start();
            }
            else if (state == ThreadState.Stopped)
            {
                state = ThreadState.Started;
                waitHandle.Set();
            }
        }
    }

    public void Stop()
    {
        lock (stateLocker) state = ThreadState.Stopped;
    }

    public void Add(IJob job)
    {
        queue.Enqueue(job);
        lock (stateLocker)
        {
            if (state == ThreadState.Started) waitHandle.Set();
        }
    }

    public void Add(IJob job, int intervalTime)
    {
        System.Threading.Timer timer = new System.Threading.Timer(
                new TimerCallback(InternalAdd), job, intervalTime,
intervalTime);
    }

    private void InternalAdd(object obj)
    {
        lock (stateLocker)
        {
            if (state != ThreadState.Started) return;
        }
        Add(obj as IJob);
    }

    private void ThreadMain()
    {
        while (!TerminateThread)
        {
            IJob job = null;
            lock (stateLocker)
            {
                if (state == ThreadState.Started)
                {
                    job = queue.Dequeue();
                }
            }
            if (job != null)
            {
                job.Execute();
            }
            else
            {
                waitHandle.WaitOne();
            }
        }
    }

}

There seems to be a race condition present in the code. The following
code has been added since the last post
public void Clear()
{
lock (stateLocker) if (state != ThreadState. Stopped) return;
DisposeTimers();
queue.Clear();
}

private void DisposeTimers()
{
foreach (Timer timer in timerList)
{
timer.Dispose();
}
timerList.Clear();
}

At the start of the application there following code is executed
ConsumerThread consumerThread = new ConsumerThread();
consumerThread.Start();
consumerThread.Add(new SimpleJob(foo), 4000);

Then at some later point during execution the following code is
executed
consumerThread.Stop();
consumerThread.Clear();
consumerThread.Start();

/* several other resources are also stopped */
db.disconnect();
lan.disconnect();

It appears to me that Threading.Timer.Dispose() does not have an
instant affect on the timer. So there are no guarantees that when
DisposeTimers()
is completed no more callbacks are executed, right? So that means the
scheduled task 'foo' might be called after consumerThread.Clear() is
completed.
It might also be called after db.disconnect is executed. One of the
problems is that 'foo' in this case actually relies on the database to
be up-and-running. So the next execution of 'foo' will most probably
throw an object disposed exception or similar. What solution would
you suggest for this case?

Appreciate any help?
 
R

RedLars

[...]
Appreciate any help?

Sorry I've been incommunicado.  There are others who read this newsgroup  
who may notice your posts, who know at least as much about threading as I 
do, and who may have time to review the code.  In the meantime, rest  
assured that I haven't abandonded the thread...just have not been online  
much the past several days, nor had the time to look at your follow-ups.

I promise that when I do have time, I will.

Pete

Alright.

To give a short summary to anyone not bothered reading all the above.
This post is about creating consumer thread (ConsumerThread class)
that handles asynchronous tasks (particular network communication).

At one point during the application I need to be able to stop the
consumer thread from processing tasks. Shortly after stopping the
consumer thread several communication resources will also be close, so
its vital that the stopping of the consumer thread blocks until thread
has stopped executing jobs otherwise a task might try to use a closed
resource.

A task has the following definition:
public interface IJob
{
void Execute();
}

The consumer thread uses this simple queue as a collection of the
tasks.
private class ThreadQueue
{
private Queue<IJob> queue = new Queue<IJob>();
private object locker = new object();
internal IJob Dequeue()
{
lock (locker)
{
if (queue.Count > 0)
{
return queue.Dequeue();
}
return null;
}
}
internal void Enqueue(IJob job)
{
lock (locker) queue.Enqueue(job);
}
internal void Clear()
{
lock (locker) queue.Clear();
}
internal int Count()
{
lock (locker) return queue.Count;
}
}

The following method executes the tasks in the context of a background
thread.
object stateLocker = new object();
ThreadQueue queue = new ThreadQueue();

EventWaitHandle WaitSignal = new AutoResetEvent(false);
EventWaitHandle WaitOn = new AutoResetEvent(false);
enum State { Created, Started, Stopped };

void ThreadMain()
{
while (bThreadStop)
{
IJob job;
lock (stateLocker)
{
if (state == Started)
{
job = queue.Dequeue();
}
}
if (job != null)
{
job.Execute();
}
else
{
WaitHandle.SignalAndWait(WaitSignal , WaitOn);
}
}
}

I'm currently struggling to find a way to signal the loop to stop
executing and wait until that happens. So far I've tried these two
approaches

/* Can lead to inconsistent data */
public void StopExecuting1()
{
lock (stateLocker)
{
State.Stopped;
}
WaitSignal.WaitOne();
}

/* Will lead to deadlock */
public void StopExecuting2()
{
lock (stateLocker)
{
state = State.Stopped;
WaitSignal.WaitOne();
}
}

To add tasks to the class I have the following method
public void Add(IJob job)
{
queue.Enqueue(job);
lock (stateLocker)
{
if (state == Stopped)
{
WaitOn.set();
}
}
}

Any thoughts?
 
R

RedLars

After some consideration I agree that the current queue implementation
and blocking stop is causing me problem and have decided to remove
both requirements. [...]

I left this reply to the last, because it seems that the code you posted  
should work fine (notwithstanding the timer issue, which is new :) ).

Your most recent post implies that you are still hoping for this "block  
until everything's stopped" behavior.  So I'm a little confused about  
which way you're trying to go.  If you still want that behavior, then we  
can look at solutions for that, but I'm going to leave things as they are 
for the moment because I'm still catching up and have plenty of stuff to  
think about already.  :)

Pete

Hi Peter,

I must apologize for the confusion that I have caused. After some
initial testing I realised that I needed a blocking Stop() method to
guarantee that no tasks are processed in the consumer thread after Stop
() has been executed and then not run the risk of using disposed
resource objects.

Below is the current code that I’m testing at the moment. This code
contains some minor changes to variable names since my previous posts,
and I started to use dotnet's queue collection with lock protection.
There are currently three minor issues I have with the current code:
1. Dispose(bool) will cause an exception in the consumer thread when
cleaning up the EventWaitHandlers. Disposing of the EventWaitHandlers
is executed before the consumer thread manages to exit the loop in
normal manner (via setting TerminatedWorkerThread etc).
2. In the background thread, using ‘continue’ to ‘escape’ the
resStartWaitHandle is bit iffy.
3. If a job processed by the consumer‘s background thread tries to
executed ThreadQueue.Stop() this will lead to a deadlock. I’m not sure
it is the responsibility of ThreadQueue to handle such circumstance or
the developer using the class.

Here is the code.

namespace na
{
public class ThreadQueue : IDisposable
{
private enum WorkerThreadState { Created, Started, Stopped };
private volatile WorkerThreadState state;
private object stateLocker = new object();
private Queue<IJob> queue = new Queue<IJob>();
private object queueLocker = new object();
private EventWaitHandle waitHandle = new ManualResetEvent
(false);
private EventWaitHandle stoppingWaitHandle = new
ManualResetEvent(false);
private EventWaitHandle reStartWaitHandle = new AutoResetEvent
(false);
private Thread thread;
private volatile bool TerminatedWorkerThread;
private List<Timer> timerList = new List<Timer>();
private string threadName;
private bool disposed;

public ThreadQueue () : this("ThreadQueue") { }

public ThreadQueue (string threadName)
{
this.threadName = threadName;
state = WorkerThreadState.Created;
}

~ThreadQueue () { Dispose(false); }

public void Start()
{
lock (stateLocker)
{
if (state == WorkerThreadState.Created)
{
state = WorkerThreadState.Started;
thread = new Thread(new ThreadStart
(this.Thread_Main));
thread.Name = threadName;
thread.Start();
}
else if (state == WorkerThreadState.Stopped)
{
state = WorkerThreadState.Started;
reStartWaitHandle.Set();
}
}
}

public void Stop()
{
lock (stateLocker)
{
if (state == WorkerThreadState.Stopped) return;
state = WorkerThreadState.Stopped;
}
waitHandle.Set();
stoppingWaitHandle.WaitOne();
}

public void Reset()
{
lock (stateLocker) if (state != WorkerThreadState.Stopped)
throw new WorkThreadException("Reseting workThread before
stopping."); ;
DisposeTimers();
lock (queueLocker)
{
queue.Clear();
}
}

public void Add(IJob job)
{
lock (stateLocker)
{
if (state == WorkerThreadState.Stopped) return;
}
lock (queueLocker)
{
queue.Enqueue(job);
}
lock (stateLocker)
{
if (state == WorkerThreadState.Started) waitHandle.Set
();
}
}

public void AddPeriodic(IJob job, int intervalTime)
{
Timer timer = new Timer(new TimerCallback(InternalAdd),
job, intervalTime, intervalTime);
lock (timerList)
{
timerList.Add(timer);
}
}

private void DisposeTimers()
{
lock (timerList)
{
foreach (Timer timer in timerList)
{
timer.Dispose();
}
timerList.Clear();
}
}

private void Thread_Main()
{
while (!TerminatedWorkerThread)
{
IJob job = null;
lock (stateLocker)
{
if (state == WorkerThreadState.Started)
{
lock (queueLocker)
{
if (queue.Count > 0)
job = queue.Dequeue();

if (queue.Count == 0)
waitHandle.Reset();
}
}
}
if (job != null)
{
job. Execute();
}
waitHandle.WaitOne();

lock (stateLocker)
{
if (state == WorkerThreadState.Stopped)
{
stoppingWaitHandle.Set();
}
else
{
continue;
}
}
reStartWaitHandle.WaitOne();
}
}

/* Timer callback */
private void InternalAdd(object obj)
{
lock (stateLocker)
{
if (state == WorkerThreadState.Stopped) return;
}
BeginAddWork(obj as IJob);
}

public void Dispose()
{
Stop();
Dispose(true);
GC.SuppressFinalize(true);
}

protected virtual void Dispose(bool disposeManagedResources)
{
if (!(disposed))
{
if (disposeManagedResources)
{
/* In stopped state at this point - now exit
background thread loop */
this.TerminatedWorkerThread = true;
reStartWaitHandle.Set();

/* Release managed resources */
DisposeTimers();
waitHandle.Close();
stoppingWaitHandle.Close();
reStartWaitHandle.Close();
}
}
this.disposed = true;
}
}
}
 
R

RedLars

After some consideration I agree that the current queue implementation
and blocking stop is causing me problem and have decided to remove
both requirements. [...]
I left this reply to the last, because it seems that the code you posted  
should work fine (notwithstanding the timer issue, which is new :) ).
Your most recent post implies that you are still hoping for this "block 
until everything's stopped" behavior.  So I'm a little confused about 
which way you're trying to go.  If you still want that behavior, thenwe  
can look at solutions for that, but I'm going to leave things as they are  
for the moment because I'm still catching up and have plenty of stuff to  
think about already.  :)

Hi Peter,

I must apologize for the confusion that I have caused. After some
initial testing I realised that I needed a blocking Stop() method to
guarantee that no tasks are processed in the consumer thread after Stop
() has been executed and then not run the risk of using disposed
resource objects.

Below is the current code that I’m testing at the moment. This code
contains some minor changes to variable names since my previous posts,
and I started to use dotnet's queue collection with lock protection.
There are currently three minor issues I have with the current code:
1. Dispose(bool) will cause an exception in the consumer thread when
cleaning up the EventWaitHandlers. Disposing of the EventWaitHandlers
is executed before the consumer thread manages to exit the loop in
normal manner (via setting TerminatedWorkerThread etc).
2. In the background thread, using ‘continue’ to ‘escape’ the
resStartWaitHandle is bit iffy.
3. If a job processed by the consumer‘s background thread tries to
executed ThreadQueue.Stop() this will lead to a deadlock. I’m not sure
it is the responsibility of ThreadQueue to handle such circumstance or
the developer using the class.

Here is the code.

namespace na
{
    public class ThreadQueue : IDisposable
    {
        private enum WorkerThreadState { Created, Started, Stopped };
        private volatile WorkerThreadState state;
        private object stateLocker = new object();
        private Queue<IJob> queue = new Queue<IJob>();
        private object queueLocker = new object();
        private EventWaitHandle waitHandle = new ManualResetEvent
(false);
        private EventWaitHandle stoppingWaitHandle = new
ManualResetEvent(false);
        private EventWaitHandle reStartWaitHandle = new AutoResetEvent
(false);
        private Thread thread;
        private volatile bool TerminatedWorkerThread;
        private List<Timer> timerList = new List<Timer>();
        private string threadName;
        private bool disposed;

        public ThreadQueue () : this("ThreadQueue") { }

        public ThreadQueue (string threadName)
        {
            this.threadName = threadName;
            state = WorkerThreadState.Created;
        }

        ~ThreadQueue () { Dispose(false); }

        public void Start()
        {
            lock (stateLocker)
            {
                if (state == WorkerThreadState.Created)
                {
                    state = WorkerThreadState.Started;
                    thread = new Thread(new ThreadStart
(this.Thread_Main));
                    thread.Name = threadName;
                    thread.Start();
                }
                else if (state == WorkerThreadState.Stopped)
                {
                    state = WorkerThreadState.Started;
                    reStartWaitHandle.Set();
                }
            }
        }

        public void Stop()
        {
            lock (stateLocker)
            {
                if (state == WorkerThreadState.Stopped) return;
                state = WorkerThreadState.Stopped;
            }
            waitHandle.Set();
            stoppingWaitHandle.WaitOne();
        }

        public void Reset()
        {
            lock (stateLocker) if (state != WorkerThreadState.Stopped)
throw new WorkThreadException("Reseting workThread before
stopping."); ;
            DisposeTimers();
            lock (queueLocker)
            {
                queue.Clear();
            }
        }

        public void Add(IJob job)
        {
            lock (stateLocker)
            {
                if (state == WorkerThreadState.Stopped) return;
            }
            lock (queueLocker)
            {
                queue.Enqueue(job);
            }
            lock (stateLocker)
            {
                if (state == WorkerThreadState.Started) waitHandle.Set
();
            }
        }

        public void AddPeriodic(IJob job, int intervalTime)
        {
            Timer timer = new Timer(new TimerCallback(InternalAdd),
job, intervalTime, intervalTime);
            lock (timerList)
            {
                timerList.Add(timer);
            }
        }

        private void DisposeTimers()
        {
            lock (timerList)
            {
                foreach (Timer timer in timerList)
                {
                    timer.Dispose();
                }
                timerList.Clear();
            }
        }

        private void Thread_Main()
        {
            while (!TerminatedWorkerThread)
            {
                IJob job = null;
                lock (stateLocker)
                {
                    if (state == WorkerThreadState.Started)
                    {
                        lock (queueLocker)
                        {
                            if (queue.Count >0)
                                job = queue.Dequeue();

                            if (queue.Count == 0)
                                waitHandle.Reset();
                        }
                    }
                }
                if (job != null)
                {
                    job. Execute();
                }
                waitHandle.WaitOne();

                lock (stateLocker)
                {
                    if (state == WorkerThreadState.Stopped)
                    {
                        stoppingWaitHandle.Set();
                    }
                    else
                    {
                        continue;
                    }
                }
                reStartWaitHandle.WaitOne();
            }
        }

                /* Timer callback */
        private void InternalAdd(object obj)
        {
            lock (stateLocker)
            {
                if (state == WorkerThreadState.Stopped) return;
            }
            BeginAddWork(obj as IJob);
        }

        public void Dispose()
        {
            Stop();
            Dispose(true);
            GC.SuppressFinalize(true);
        }

        protected virtual void Dispose(bool disposeManagedResources)
        {
            if (!(disposed))
            {
                if (disposeManagedResources)
                {
                    /* In stopped state at this point- now exit
background thread loop */
                    this.TerminatedWorkerThread = true;
                    reStartWaitHandle.Set();

                    /* Release managed resources */
                    DisposeTimers();
                    waitHandle.Close();
                    stoppingWaitHandle.Close();
                    reStartWaitHandle.Close();
                }
            }
            this.disposed = true;
        }
    }

}

On small mistake in the previous post:

public void Start()
{
lock (stateLocker)
{
if (state == WorkerThreadState.Created)
{
state = WorkerThreadState.Running;
thread = new Thread(new ThreadStart(this.Thread_Main));
thread.Name = threadName;
thread.Start();
}
else if (state == WorkerThreadState.Stopped)
{
state = WorkerThreadState.Running;
stoppingWaitHandle.Reset(); // Added this line!!!
reStartWaitHandle.Set();
}
}
}
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Top