Help, THREADS problem (producers and consumers) !

L

Leonardo Hyppolito

Hello!

I am trying to implement a program that uses Threads. I chose the producers
and consumers scenario. The producers put a "product" (which is an int
number) in a shared storage place of one position. The consumers must
consume all the products that are put in the storage place.

The user of the program enters how many products it will be produced, and
also, how many producers and consumers (each one in a separate thread) there
will be.

My program is not working ok... When I choose a high number of producers
(let's say 5), and a low number of consumers (let's say 2), it doesn't work
well.. I can't spot where the problem is.

Here is the code... Thanks in advance for any help !

Leo Hyppolito

---

using System;
using System.Threading;
namespace TesteProdCons
{
// can hold 1 product at a time (buffer of 1 position)
public class Storage
{
// represents the product (a delay in miliseconds)
// -1 is "empty"
private int p;
// actual product number
private int n;

// number of products (max) to be produced
private int nMax;
public int Product
{
get
{
lock(this)
{
// if it's empty, wait
if(p == -1)
{
Console.WriteLine("CONSUMER [" + Thread.CurrentThread.Name + "] has to
wait (storage is empty)");
Monitor.Wait(this);
}
// read the product
int ret = p;

// empty the buffer (so that it can receive a new product)
p = -1;
// signal the next pending thread to move on
Monitor.Pulse(this);

// return the product value
return ret;
}
}
set
{
lock(this)
{
// if it has a product, wait
if(p != -1)
{
Console.WriteLine("PRODUCER [" + Thread.CurrentThread.Name + "] has to
wait (storage is full)");
Monitor.Wait(this);
}
// store the product
p = value;
// write information on screen
Console.WriteLine("++ [" + Thread.CurrentThread.Name + "] produces n."
+ Number + ": " + p);

// increment the actual product number
Number = Number + 1;
// signal the next pending thread to move on
Monitor.Pulse(this);
}
}
}
public int Number
{
get {return n;}
set {n = value;}
}
public int MaxNumber
{
get
{
lock(this)
{
return nMax;
}
}
set {nMax = value;}
}
// constructor
public Storage(int i)
{
p = -1;
Number = 1;
MaxNumber = i;
}
}
public class Producer
{
// a reference to the storage
private Storage a;
// seed for the random
private int seed;
// constructor
public Producer(Storage arm, int s)
{
a = arm;
seed = s;
}
public void produce()
{
Random r = new Random(seed);
while(a.Number <= a.MaxNumber)
{
// get a random integer (from 0 to 2 seconds)
int prod = r.Next(2001);

// store the product
a.Product = prod;

// rest for a while...
Thread.Sleep(500);
}
}
}
public class Consumer
{
// a reference to the storage
Storage a;
// constructor
public Consumer(Storage arm)
{
a = arm;
}
public void consume()
{
while(true)
{
// get the product value
int prod = a.Product;
// "consume" the product (pauses for the product's miliseconds)
Thread.Sleep(prod);

Console.WriteLine("-- [" + Thread.CurrentThread.Name + "] consumed " +
prod);
}
}
}
class Class1
{
static void Main(string[] args)
{
int numProducts;
int numProducers;
int numConsumers;
Console.Write("Max number of products: ");
numProducts = Int32.Parse(Console.ReadLine());
Console.Write("Number of producers: ");
numProducers = Int32.Parse(Console.ReadLine());
Console.Write("Numero of consumers: ");
numConsumers = Int32.Parse(Console.ReadLine());
Storage a = new Storage(numProducts);
Producer[] prod = new Producer[numProducers];
Consumer[] cons = new Consumer[numConsumers];
Thread[] tProd = new Thread[numProducers];
Thread[] tCons = new Thread[numConsumers];
Random r = new Random();
// create the producers objects and threads
for(int i = 0; i < numProducers; i++)
{
prod = new Producer(a, r.Next(999999));
tProd = new Thread(new ThreadStart(prod.produce));
tProd.Name = "p" + (i+1);
}
// create the consumers objects and threads
for(int i = 0; i < numConsumers; i++)
{
cons = new Consumer(a);
tCons = new Thread(new ThreadStart(cons.consume));
tCons.Name = "c" + (i+1);
}

// start the threads
foreach (Thread t in tProd) {t.Start();}
foreach (Thread t in tCons) {t.Start();}

Console.WriteLine("Storage, producers and consumers created!");
}
}
}
 
L

Leonardo Hyppolito

I forgot to include the program output:

The problem is:
it is producing OK, mas the consumers are NOT consuming all the products.

Here is the output:

---
Max number of products: 10
Number of producers: 3
Numero of consumers: 1
Storage, producers and consumers created!
++ [p1] produces n.1: 1576
PRODUCER [p2] has to wait (storage is full)
PRODUCER [p3] has to wait (storage is full)
++ [p2] produces n.2: 16
++ [p3] produces n.3: 1872
PRODUCER [p1] has to wait (storage is full)
PRODUCER [p2] has to wait (storage is full)
PRODUCER [p3] has to wait (storage is full)
-- [c1] consumed 1576
++ [p1] produces n.4: 1769
++ [p2] produces n.5: 687
++ [p3] produces n.6: 1117
PRODUCER [p1] has to wait (storage is full)
PRODUCER [p2] has to wait (storage is full)
PRODUCER [p3] has to wait (storage is full)
-- [c1] consumed 1872
++ [p1] produces n.7: 539
++ [p2] produces n.8: 515
++ [p3] produces n.9: 990
PRODUCER [p1] has to wait (storage is full)
PRODUCER [p2] has to wait (storage is full)
PRODUCER [p3] has to wait (storage is full)
-- [c1] consumed 1117
++ [p1] produces n.10: 1662
++ [p2] produces n.11: 288
++ [p3] produces n.12: 1049
-- [c1] consumed 990
-- [c1] consumed 1049
CONSUMER [c1] has to wait (storage is empty)



Thanks
 
G

Gianluca Varenni

There is a bug in the synchronization mechanism you are using.
Suppose 3 producers (pA pB pC ) try to set the value of Storage.Product, and
cD is a consumer:
1. pA enters the locked section, fills the slot in Storage, and exits
2. pB enters, the slot is full, so it blocks on the Wait call (the lock is
released)
3. pC enters, the slot is full, so it blocks on the Wait call (the lock is
released)
4. cD consumes the slot, pulses, so the next thread ready to execute will be
pB
5. pB continues its execution, fills the slot and pulses the monitor, so the
next thread ready to execute is pC
6. pC continues its execution, and fill the slot, BUT THE SLOT WAS FULL!!!!

The actual problem is that you are using the same synchronization object,
for marking the storage as free or full. You must use two entities.

In order to prove what I've said, add the marked line to your code, and see
what happens...

Console.WriteLine("PRODUCER [" + Thread.CurrentThread.Name + "] has to
wait (storage is full)");
Monitor.Wait(this);
}

--> if (p != -1) Console.WriteLine("You are in trouble!");
// store the product
p = value;


Have a nice day
GV
 
L

Leonardo Hyppolito

Gianluca, thanks a lot for you answer! I could understand where the problem
is.

I tried to fix it, using distinct objects for synchornization. Here is
the new code:

---
using System;

using System.Threading;

namespace TesteProdCons

{

// can hold 1 product at a time (buffer of 1 position)

public class Storage

{

private object syncProd;

private object syncCons;

// represents the product (a delay in miliseconds)

// -1 is "empty"

private int p;

// actual product number

private int n;


// number of products (max) to be produced

private int nMax;

public int Product

{

get

{

lock(syncCons)

{

// if it's empty, wait

if(p == -1)

{

Console.WriteLine("CONSUMER [" + Thread.CurrentThread.Name + "] has to wait
(storage is empty)");

Monitor.Wait(syncCons);

}

// read the product

int ret = p;


// empty the buffer (so that it can receive a new product)

p = -1;

// signal the next pending thread to move on

Monitor.Pulse(syncCons);


// return the product value

return ret;

}

}

set

{

lock(syncProd)

{

// if it has a product, wait

if(p != -1)

{

Console.WriteLine("PRODUCER [" + Thread.CurrentThread.Name + "] has to wait
(storage is full)");

Monitor.Wait(syncProd);

}

// store the product

p = value;

// write information on screen

Console.WriteLine("++ [" + Thread.CurrentThread.Name + "] produces n." +
Number + ": " + p);


// increment the actual product number

Number = Number + 1;

// signal the next pending thread to move on

Monitor.Pulse(syncProd);

}

}

}

public int Number

{

get {return n;}

set

{

lock(this)

{

n = value;

}

}

}

public int MaxNumber

{

get

{

lock(this)

{

return nMax;

}

}

set {nMax = value;}

}

// constructor

public Storage(int i)

{

p = -1;

Number = 1;

MaxNumber = i;

syncProd = new object();

syncCons = new object();

}

}

public class Producer

{

// a reference to the storage

private Storage a;

// seed for the random

private int seed;

// constructor

public Producer(Storage arm, int s)

{

a = arm;

seed = s;

}

public void produce()

{

Random r = new Random(seed);

while(a.Number <= a.MaxNumber)

{

// get a random integer (from 0 to 2 seconds)

int prod = r.Next(2001);


// store the product

a.Product = prod;


// rest for a while...

Thread.Sleep(500);

}

}

}

public class Consumer

{

// a reference to the storage

Storage a;

// constructor

public Consumer(Storage arm)

{

a = arm;

}

public void consume()

{

while(true)

{

// get the product value

int prod = a.Product;

// "consume" the product (pauses for the product's miliseconds)

Thread.Sleep(prod);


Console.WriteLine("-- [" + Thread.CurrentThread.Name + "] consumed " +
prod);

}

}

}

class Class1

{

static void Main(string[] args)

{

int numProducts;

int numProducers;

int numConsumers;

Console.Write("Max number of products: ");

numProducts = Int32.Parse(Console.ReadLine());

Console.Write("Number of producers: ");

numProducers = Int32.Parse(Console.ReadLine());

Console.Write("Numero of consumers: ");

numConsumers = Int32.Parse(Console.ReadLine());

Storage a = new Storage(numProducts);

Producer[] prod = new Producer[numProducers];

Consumer[] cons = new Consumer[numConsumers];

Thread[] tProd = new Thread[numProducers];

Thread[] tCons = new Thread[numConsumers];

Random r = new Random();

// create the producers objects and threads

for(int i = 0; i < numProducers; i++)

{

prod = new Producer(a, r.Next(999999));

tProd = new Thread(new ThreadStart(prod.produce));

tProd.Name = "p" + (i+1);

}

// create the consumers objects and threads

for(int i = 0; i < numConsumers; i++)

{

cons = new Consumer(a);

tCons = new Thread(new ThreadStart(cons.consume));

tCons.Name = "c" + (i+1);

}


// start the threads

foreach (Thread t in tProd) {t.Start();}

foreach (Thread t in tCons) {t.Start();}


Console.WriteLine("Storage, producers and consumers created!");

}

}

}

---



However, it didn't work..... Here's the output:

Max number of products: 10
Number of producers: 3
Numero of consumers: 1
Storage, producers and consumers created!
++ [p1] produces n.1: 924
PRODUCER [p2] has to wait (storage is full)
PRODUCER [p3] has to wait (storage is full)
++ [p1] produces n.2: 1169
++ [p2] produces n.3: 1503
++ [p3] produces n.4: 1483
-- [c1] consumed 924
++ [p2] produces n.5: 77
PRODUCER [p3] has to wait (storage is full)
PRODUCER [p1] has to wait (storage is full)
PRODUCER [p2] has to wait (storage is full)
-- [c1] consumed 1483
-- [c1] consumed 77
CONSUMER [c1] has to wait (storage is empty)



---



Any idea on what's wrong?

Thanks again,



Leonardo Hyppolito
 
G

Gianluca Varenni

Here is some code that works, by using two autoreset events (in practice
they act as two 1 slot semaphores).

Have a nice day
GV


using System;
using System.Threading;
namespace TesteProdCons
{
// can hold 1 product at a time (buffer of 1 position)
public class Storage
{
private AutoResetEvent FilledEvent = new AutoResetEvent(false);
private AutoResetEvent VoidEvent = new AutoResetEvent(true);

private int p;

// actual product number
private int n;

// number of products (max) to be produced
private int nMax;
public int Product
{
get
{
FilledEvent.WaitOne();

int ret = p;

VoidEvent.Set();

return ret;
}
set
{
VoidEvent.WaitOne();

p = value;

Console.WriteLine("++ [" + Thread.CurrentThread.Name + "] produces n." +
Number + ": " + p);
Number = Number + 1;

FilledEvent.Set();
}


}
public int Number
{
get {return n;}
set {n = value;}
}
public int MaxNumber
{
get
{
// lock(this)
// {
return nMax;
// }
}
set {nMax = value;}
}
// constructor
public Storage(int i)
{
Number = 1;
MaxNumber = i;
}
}
public class Producer
{
// a reference to the storage
private Storage a;
// seed for the random
private int seed;
// constructor
public Producer(Storage arm, int s)
{
a = arm;
seed = s;
}
public void produce()
{
Random r = new Random(seed);
while(a.Number <= a.MaxNumber)
{
// get a random integer (from 0 to 2 seconds)
int prod = r.Next(2001);

// store the product
a.Product = prod;

// rest for a while...
Thread.Sleep(500);
}
}
}
public class Consumer
{
// a reference to the storage
Storage a;
// constructor
public Consumer(Storage arm)
{
a = arm;
}
public void consume()
{
while(true)
{
// get the product value
int prod = a.Product;
// "consume" the product (pauses for the product's miliseconds)
Thread.Sleep(prod);

Console.WriteLine("-- [" + Thread.CurrentThread.Name + "] consumed " +
prod);
}
}
}
class Class1
{
static void Main(string[] args)
{
int numProducts;
int numProducers;
int numConsumers;
Console.Write("Max number of products: ");
numProducts = Int32.Parse(Console.ReadLine());
Console.Write("Number of producers: ");
numProducers = Int32.Parse(Console.ReadLine());
Console.Write("Numero of consumers: ");
numConsumers = Int32.Parse(Console.ReadLine());
Storage a = new Storage(numProducts);
Producer[] prod = new Producer[numProducers];
Consumer[] cons = new Consumer[numConsumers];
Thread[] tProd = new Thread[numProducers];
Thread[] tCons = new Thread[numConsumers];
Random r = new Random();
// create the producers objects and threads
for(int i = 0; i < numProducers; i++)
{
prod = new Producer(a, r.Next(999999));
tProd = new Thread(new ThreadStart(prod.produce));
tProd.Name = "p" + (i+1);
}
// create the consumers objects and threads
for(int i = 0; i < numConsumers; i++)
{
cons = new Consumer(a);
tCons = new Thread(new ThreadStart(cons.consume));
tCons.Name = "c" + (i+1);
}

// start the threads
foreach (Thread t in tProd) {t.Start();}
foreach (Thread t in tCons) {t.Start();}

Console.WriteLine("Storage, producers and consumers created!");
}
}
}



Leonardo Hyppolito said:
Gianluca, thanks a lot for you answer! I could understand where the problem
is.

I tried to fix it, using distinct objects for synchornization. Here is
the new code:

---
using System;

using System.Threading;

namespace TesteProdCons

{

// can hold 1 product at a time (buffer of 1 position)

public class Storage

{

private object syncProd;

private object syncCons;

// represents the product (a delay in miliseconds)

// -1 is "empty"

private int p;

// actual product number

private int n;


// number of products (max) to be produced

private int nMax;

public int Product

{

get

{

lock(syncCons)

{

// if it's empty, wait

if(p == -1)

{

Console.WriteLine("CONSUMER [" + Thread.CurrentThread.Name + "] has to wait
(storage is empty)");

Monitor.Wait(syncCons);

}

// read the product

int ret = p;


// empty the buffer (so that it can receive a new product)

p = -1;

// signal the next pending thread to move on

Monitor.Pulse(syncCons);


// return the product value

return ret;

}

}

set

{

lock(syncProd)

{

// if it has a product, wait

if(p != -1)

{

Console.WriteLine("PRODUCER [" + Thread.CurrentThread.Name + "] has to wait
(storage is full)");

Monitor.Wait(syncProd);

}

// store the product

p = value;

// write information on screen

Console.WriteLine("++ [" + Thread.CurrentThread.Name + "] produces n." +
Number + ": " + p);


// increment the actual product number

Number = Number + 1;

// signal the next pending thread to move on

Monitor.Pulse(syncProd);

}

}

}

public int Number

{

get {return n;}

set

{

lock(this)

{

n = value;

}

}

}

public int MaxNumber

{

get

{

lock(this)

{

return nMax;

}

}

set {nMax = value;}

}

// constructor

public Storage(int i)

{

p = -1;

Number = 1;

MaxNumber = i;

syncProd = new object();

syncCons = new object();

}

}

public class Producer

{

// a reference to the storage

private Storage a;

// seed for the random

private int seed;

// constructor

public Producer(Storage arm, int s)

{

a = arm;

seed = s;

}

public void produce()

{

Random r = new Random(seed);

while(a.Number <= a.MaxNumber)

{

// get a random integer (from 0 to 2 seconds)

int prod = r.Next(2001);


// store the product

a.Product = prod;


// rest for a while...

Thread.Sleep(500);

}

}

}

public class Consumer

{

// a reference to the storage

Storage a;

// constructor

public Consumer(Storage arm)

{

a = arm;

}

public void consume()

{

while(true)

{

// get the product value

int prod = a.Product;

// "consume" the product (pauses for the product's miliseconds)

Thread.Sleep(prod);


Console.WriteLine("-- [" + Thread.CurrentThread.Name + "] consumed " +
prod);

}

}

}

class Class1

{

static void Main(string[] args)

{

int numProducts;

int numProducers;

int numConsumers;

Console.Write("Max number of products: ");

numProducts = Int32.Parse(Console.ReadLine());

Console.Write("Number of producers: ");

numProducers = Int32.Parse(Console.ReadLine());

Console.Write("Numero of consumers: ");

numConsumers = Int32.Parse(Console.ReadLine());

Storage a = new Storage(numProducts);

Producer[] prod = new Producer[numProducers];

Consumer[] cons = new Consumer[numConsumers];

Thread[] tProd = new Thread[numProducers];

Thread[] tCons = new Thread[numConsumers];

Random r = new Random();

// create the producers objects and threads

for(int i = 0; i < numProducers; i++)

{

prod = new Producer(a, r.Next(999999));

tProd = new Thread(new ThreadStart(prod.produce));

tProd.Name = "p" + (i+1);

}

// create the consumers objects and threads

for(int i = 0; i < numConsumers; i++)

{

cons = new Consumer(a);

tCons = new Thread(new ThreadStart(cons.consume));

tCons.Name = "c" + (i+1);

}


// start the threads

foreach (Thread t in tProd) {t.Start();}

foreach (Thread t in tCons) {t.Start();}


Console.WriteLine("Storage, producers and consumers created!");

}

}

}

---



However, it didn't work..... Here's the output:

Max number of products: 10
Number of producers: 3
Numero of consumers: 1
Storage, producers and consumers created!
++ [p1] produces n.1: 924
PRODUCER [p2] has to wait (storage is full)
PRODUCER [p3] has to wait (storage is full)
++ [p1] produces n.2: 1169
++ [p2] produces n.3: 1503
++ [p3] produces n.4: 1483
-- [c1] consumed 924
++ [p2] produces n.5: 77
PRODUCER [p3] has to wait (storage is full)
PRODUCER [p1] has to wait (storage is full)
PRODUCER [p2] has to wait (storage is full)
-- [c1] consumed 1483
-- [c1] consumed 77
CONSUMER [c1] has to wait (storage is empty)



---



Any idea on what's wrong?

Thanks again,



Leonardo Hyppolito
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Top