Joining threads controlled by a Semaphore?

D

DC

Hi,

to use a specified amount of threads to carry out a certain step I
typically use Semaphores. I then have to be able to determine when all
the threads have done their work.

I have attached a code snipped of how I do this. I would be happy if
someone could review this, since I lately had some problems possibly
caused by this approach. It seems that sometimes the synchronization
point will never be reached, although I am quiet confident that all
threads were able to do what they were supposed to do and release the
Semaphore.

I am especially worried about this code:

// Is this the way to wait until all threads have finished?
for (int k = 0; k < maxthreads; k++)
workSemaphore.WaitOne();

Looks clumsy to me, but it was the only simple way I found to join the
threads.

TIA for any comments!

Regards
DC



using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace ThreadingTest
{
class Program
{
static void Main(string[] args)
{
int maxthreads = 100;

Semaphore workSemaphore = new Semaphore(maxthreads, maxthreads);

for (int i = 0; i < 1000; i++)
{
ThreadData td = new ThreadData();
td.DataSemaphore = workSemaphore;
td.IntData = i;

Thread t = new Thread(new ParameterizedThreadStart(DoWork));
t.Start(td);
}

// Is this the way to wait until all threads have finished?
for (int k = 0; k < maxthreads; k++)
workSemaphore.WaitOne();

Console.WriteLine("Done.");
Console.ReadLine();
}

public static void DoWork(object data)
{
try
{
((ThreadData)data).DataSemaphore.WaitOne();
Console.Write("{0, 3}-", ((ThreadData)data).IntData);

// wait 0 - 2000 ms
Random ran = new Random();
Thread.Sleep((int)ran.NextDouble() * 2000);
}
finally
{
((ThreadData)data).DataSemaphore.Release();
}
}

public class ThreadData
{
public Semaphore DataSemaphore{ get; set; }
public int IntData { get; set; }
}
}
}
 
S

Stefan L

Hi DC,

when analyzing your code it looks like it will always pass the
joining-point, but there's no guarantee that all threads will finish
until then (thus, it's not really joining...). This is because your loop
is acquiring all semaphores and terminates when it's done, never
releasing them again. If it succeeds doing this before all threads are
done: rienne va plus... Main holds all semaphores, threads are blocked
infinitely.

What exactly are you trying to reach? You are creating 1000(!!) threads
at a time and try to block 900 of them?!?

Maybe you should use WaitHandle's and call WaitHandle.WaitAll(). There'a
also a Thread.Join method that might help you here.

HTH,
Stefan
 
D

DC

Hi DC,

when analyzing your code it looks like it will always pass the
joining-point, but there's no guarantee that all threads will finish
until then (thus, it's not really joining...). This is because your loop
is acquiring all semaphores and terminates when it's done, never
releasing them again. If it succeeds doing this before all threads are
done: rienne va plus... Main holds all semaphores, threads are blocked
infinitely.

What exactly are you trying to reach? You are creating 1000(!!) threads
at a time and try to block 900 of them?!?

Maybe you should use WaitHandle's and call WaitHandle.WaitAll(). There'a
also a Thread.Join method that might help you here.

HTH,
    Stefan

DC schrieb:


to use a specified amount of threads to carry out a certain step I
typically use Semaphores. I then have to be able to determine when all
the threads have done their work.
I have attached a code snipped of how I do this. I would be happy if
someone could review this, since I lately had some problems possibly
caused by this approach. It seems that sometimes the synchronization
point will never be reached, although I am quiet confident that all
threads were able to do what they were supposed to do and release the
Semaphore.
I am especially worried about this code:
                   // Is this the way to wait untilall threads have finished?
                   for (int k = 0; k < maxthreads; k++)
                           workSemaphore.WaitOne();
Looks clumsy to me, but it was the only simple way I found to join the
threads.
TIA for any comments!

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
namespace ThreadingTest
{
   class Program
   {
           static void Main(string[] args)
           {
                   int maxthreads = 100;
                   Semaphore workSemaphore = new Semaphore(maxthreads, maxthreads);
                   for (int i = 0; i < 1000; i++)
                   {
                           ThreadData td = new ThreadData();
                           td.DataSemaphore= workSemaphore;
                           td.IntData = i;
                           Thread t = newThread(new ParameterizedThreadStart(DoWork));
                           t.Start(td);
                   }
                   // Is this the way to wait untilall threads have finished?
                   for (int k = 0; k < maxthreads; k++)
                           workSemaphore.WaitOne();
                   Console.WriteLine("Done.");
                   Console.ReadLine();
           }
           public static void DoWork(object data)
           {
                   try
                   {
                           ((ThreadData)data).DataSemaphore.WaitOne();
                           Console.Write("{0, 3}-", ((ThreadData)data).IntData);
                           // wait 0 - 2000ms
                           Random ran = new Random();
                           Thread.Sleep((int)ran.NextDouble() * 2000);
                   }
                   finally
                   {
                           ((ThreadData)data).DataSemaphore.Release();
                   }
           }
           public class ThreadData
           {
                   public Semaphore DataSemaphore{ get; set; }
                   public int IntData { get; set; }
           }
   }
}- Zitierten Text ausblenden -

- Zitierten Text anzeigen -

Thanks, Stefan. There is a glitch in the example (which is not in my
production code), the semaphore must be acquired before the thread is
being spawned. Corrected example below.

I am trying to execute 1000 tasks by max. 100 threads in this example.

I don't see why the joining point should be passed, since at the
joining point a semaphore will be acquired for every worker that
releases one semaphore. Joining will not start before all work has
been delegated to threads.

Regards
DC

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace ThreadingTest
{
class Program
{
static void Main(string[] args)
{
int maxthreads = 100;

Semaphore workSemaphore = new Semaphore(maxthreads, maxthreads);

for (int i = 0; i < 1000; i++)
{
workSemaphore.WaitOne();

ThreadData td = new ThreadData();
td.DataSemaphore = workSemaphore;
td.IntData = i;

Thread t = new Thread(new ParameterizedThreadStart(DoWork));
t.Start(td);
}

// Is this the way to wait until all threads have finished?
for (int k = 0; k < maxthreads; k++)
workSemaphore.WaitOne();

Console.WriteLine("Done.");
Console.ReadLine();
}

public static void DoWork(object data)
{
try
{
Console.Write("{0, 3}-", ((ThreadData)data).IntData);

// wait 0 - 2000 ms
Random ran = new Random();
Thread.Sleep((int)ran.NextDouble() * 2000);
}
finally
{
((ThreadData)data).DataSemaphore.Release();
}
}

public class ThreadData
{
public Semaphore DataSemaphore{ get; set; }
public int IntData { get; set; }
}
}
}
 
S

Stefan L

Hi DC,

if you acquire the semaphore before you start the thread all is fine,
then your code should work. At least I see no reason what should go wrong.

In what way do you experience problems in your code? As it seems that
you did create the example pretty much out of your head, maybe you
already corrected some mistakes that you have in your production code...

I ran your example and couldn't detect any problems (except the fact
that Thread.Sleep((int)ran.NextDouble() * 2000) equals Thread.Sleep(0)
which makes me believe you did not run it...).

Regards,
Stefan
 
D

DC

Hi DC,

if you acquire the semaphore before you start the thread all is fine,
then your code should work. At least I see no reason what should go wrong..

In what way do you experience problems in your code? As it seems that
you did create the example pretty much out of your head, maybe you
already corrected some mistakes that you have in your production code...

I ran your example and couldn't detect any problems (except the fact
that Thread.Sleep((int)ran.NextDouble() * 2000) equals Thread.Sleep(0)
which makes me believe you did not run it...).

Regards,
     Stefan

DC schrieb:




Thanks, Stefan. There is a glitch in the example (which is not in my
production code), the semaphore must be acquired before the thread is
being spawned. Corrected example below.
I am trying to execute 1000 tasks by max. 100 threads in this example.
I don't see why the joining point should be passed, since at the
joining point a semaphore will be acquired for every worker that
releases one semaphore. Joining will not start before all work has
been delegated to threads.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
namespace ThreadingTest
{
   class Program
   {
           static void Main(string[] args)
           {
                   int maxthreads = 100;
                   Semaphore workSemaphore = new Semaphore(maxthreads, maxthreads);
                   for (int i = 0; i < 1000; i++)
                   {
                           workSemaphore.WaitOne();
                           ThreadData td = new ThreadData();
                           td.DataSemaphore= workSemaphore;
                           td.IntData = i;
                           Thread t = newThread(new ParameterizedThreadStart(DoWork));
                           t.Start(td);
                   }
                   // Is this the way to wait untilall threads have finished?
                   for (int k = 0; k < maxthreads; k++)
                           workSemaphore.WaitOne();
                   Console.WriteLine("Done.");
                   Console.ReadLine();
           }
           public static void DoWork(object data)
           {
                   try
                   {
                           Console.Write("{0, 3}-", ((ThreadData)data).IntData);
                           // wait 0 - 2000ms
                           Random ran = new Random();
                           Thread.Sleep((int)ran.NextDouble() * 2000);
                   }
                   finally
                   {
                           ((ThreadData)data).DataSemaphore.Release();
                   }
           }
           public class ThreadData
           {
                   public Semaphore DataSemaphore{ get; set; }
                   public int IntData { get; set; }
           }
   }
}- Zitierten Text ausblenden -

- Zitierten Text anzeigen -

Hi Stefan,

thank you for looking at this. Even worse, I actually did run the
example but did not notice that of course the sleep is being casted
away.

The problem is, that sometimes all threads seem to get their jobs done
but still the main thread blocks. But I am not sure if the whole
semaphore thing causes the problem.

Regards
Tim
 
D

DC

Your semaphore allows for 100 concurrent holders.  That means that once 
there are only 99 threads left to run, your main thread will be able to  
acquire the semaphore and proceed.

Basically, the answer is "no, a semaphore is not the appropriate mechanism  
for this purpose".

One trivial way to implement the behavior you're looking for is to only  
ever create 100 threads, and then have each thread pull work from a queue 
(protecting the queue with a lock, of course).  Each thread can exit once  
the queue is empty.  That's much more efficient than creating 1000  
different threads anyway.

Then you can use Thread.Join() on all 100 threads in a loop (similar to  
your WaitOne() loop, but using Thread.Join() instead) to make the  
controller thread block until they are all done.

Pete

Thanks, Pete. I can not use Thread.Join(), since the threads are
actually being pulled of the Threadpool.

I also got the impression that I am misusing semaphores but since it
worked (most of the time) I did not worry so much until now.

I do not really understand what you mean by this:
Your semaphore allows for 100 concurrent holders. That means that once
there are only 99 threads left to run, your main thread will be able to
acquire the semaphore and proceed.

The main thread must acquire all of the semaphores (100) until it
continues.

Regards,
DC
 
D

DC

[...]
Your semaphore allows for 100 concurrent holders.  That means that once
there are only 99 threads left to run, your main thread will be able to
acquire the semaphore and proceed.
The main thread must acquire all of the semaphores (100) until it
continues.

Sorry, you're right.  I wasn't looking closely enough at how you were  
using the semaphore.

At this point, I'm in agreement with Stefan's replies.  Until you can be  
more specific about what the code is doing that you don't expect, I don't 
see a way to answer the question, whatever it is.

Pete

Thanks, Pete. Actually it is already good information for me that
there is nothing obviously wrong with my code, so I can tackle the
problem in other areas. If the code is OK, then this is an easy to use
thread pump. I have enclosed the corrected test program which also
uses ThreadPool to match my application more closely.

Regards
DC



using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace ThreadingTest
{
class Program
{
static void Main(string[] args)
{
int maxthreads = 100;

Semaphore workSemaphore = new Semaphore(maxthreads, maxthreads);

int minwt, mincpt;
ThreadPool.GetMinThreads(out minwt, out mincpt);
ThreadPool.SetMinThreads(maxthreads, mincpt);

for (int i = 0; i < 1000; i++)
{
workSemaphore.WaitOne();

ThreadData td = new ThreadData();
td.DataSemaphore = workSemaphore;
td.IntData = i;
ThreadPool.QueueUserWorkItem(new WaitCallback(DoWork), td);
}

// Is this the way to wait until all threads have finished?
for (int k = 0; k < maxthreads; k++)
workSemaphore.WaitOne();

Console.WriteLine("Done.");
Console.ReadKey();
}

public static void DoWork(object data)
{
try
{
// wait 0 - 2000 ms, simulate work
Random ran = new Random();
Thread.Sleep((int)(ran.NextDouble() * 2000.0));

Console.Write("{0, 3}-", ((ThreadData)data).IntData);
}
finally
{
((ThreadData)data).DataSemaphore.Release();
}
}

public class ThreadData
{
public Semaphore DataSemaphore{ get; set; }
public int IntData { get; set; }
}
}
}
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Top