L
Leonardo Hyppolito
Hello,
I am trying to write a multithread program that simulates producers and
consumers. My program can have many producers and many consumers (each in a
separate thread). It has a storage place (buffer) with "n" capacity. The
user provides these parameters on startup. The buffer works with a circular
queue.
It's a console application. My implementation is working with Semaphores
to synchornize everything. I think it's synchornized OK, but the problem is:
I tell it to produce 10 products, and it produces 10 products, but the
consumers consume only 8 (they should consume all the products). I can't
find out why this happens.
Any help on this will be appreciated.
Thanks in advance!!!
Leonardo Hyppolito
here is the code:
It's also published (with syntax highlighting) here:
http://www.rafb.net/paste/results/u2946385.html
----
using System;
using System.Threading;
namespace TesteProdCons
{
public class Semaphore
{
private int counter;
// constructor
public Semaphore(int c)
{
counter = c;
}
public void p()
{
lock(this)
{
while(counter <= 0)
{
Monitor.Wait(this, Timeout.Infinite);
}
// we have control, decrement the counter
counter--;
}
}
public void v()
{
lock(this)
{
counter++;
Monitor.Pulse(this);
}
}
}
// can hold 1 product at a time (buffer of 1 position)
public class Storage
{
// semaphores used for synchronization
private Semaphore notFull;
private Semaphore notEmpty;
private Semaphore exclusive;
// prodBox is the buffer that keeps the products
private int[] prodBox;
private int readIndex;
private int writeIndex;
private int capacity;
// actual product number
private static int n;
// max number of products to be produced
private static int nMax;
// property for n
public int Number
{
get {return n;}
set {n = value;}
}
// property for nMax
public int MaxNumber
{
get {return nMax;}
set {nMax = value;}
}
// writes a string representation of the storage
public override string ToString()
{
string ret = "";
for (int i = 0; i < capacity; i++)
{
ret = ret + prodBox + " ";
}
ret = ret + "readIndex=" + readIndex + " writeIndex=" + writeIndex;
return ret;
}
// constructor
public Storage(int max, int cap)
{
n = 0;
nMax = max;
notFull = new Semaphore(cap);
notEmpty = new Semaphore(0);
exclusive = new Semaphore(1);
prodBox = new int[cap];
capacity = cap;
readIndex = 0;
writeIndex = 0;
}
public int Product
{
get
{
// wait for a nonEmpty resource
notEmpty.p();
// wait for exclusive access
exclusive.p();
// read the product
int ret = prodBox[readIndex];
Console.WriteLine(" " + Thread.CurrentThread.Name + " reads " + ret);
// set the new readIndex
readIndex = (readIndex + 1) % capacity;
// for debug
//Console.WriteLine(" " + Thread.CurrentThread.Name + "--> " + this);
// release the exclusive access
exclusive.v();
// release a notFull resource
notFull.v();
return ret;
}
set
{
// wait for a notFull resource
notFull.p();
// wait for exclusive access
exclusive.p();
// if already reached maximum number of productions,
// abort the producer thread
if(n >= nMax)
{
Console.WriteLine("Aborting " + Thread.CurrentThread.Name +
" (reached max number of productions)");
Thread.CurrentThread.Abort();
}
// store the product
prodBox[writeIndex] = value;
// set the new writeIndex
writeIndex = (writeIndex + 1) % capacity;
// increment the actual product number
n = n + 1;
// write information on screen
Console.WriteLine("++ [" + Thread.CurrentThread.Name + "] produces n." +
Number + ": " + value);
//Console.WriteLine(" " + Thread.CurrentThread.Name + "--> " + this);
// release the exclusive access
exclusive.v();
// release a notEmpty resource
notEmpty.v();
}
}
}
public class Producer
{
// a reference to the storage
private Storage a;
// seed for the random
private int seed;
// constructor
public Producer(Storage arm, int s)
{
a = arm;
seed = s;
}
// entry point for the producer thread
public void produce()
{
Random r = new Random(seed);
while(a.Number <= a.MaxNumber)
{
// get a random integer (from 0 to 2 seconds)
int prod = r.Next(2001);
// store the product
a.Product = prod;
// rest for a while...
Thread.Sleep(200);
}
}
}
public class Consumer
{
// a reference to the storage
private Storage a;
// constructor
public Consumer(Storage arm)
{
a = arm;
}
// entry point for the consumer thread
public void consume()
{
while(true)
{
// get the product value
int prod = a.Product;
// "consume" the product (pauses for the product's miliseconds)
Thread.Sleep(prod);
Console.WriteLine("-- [" + Thread.CurrentThread.Name + "] consumed " +
prod);
}
}
}
class TesteProdCons
{
static void Main(string[] args)
{
int numProducts;
int numProducers;
int numConsumers;
int capacity;
// read information from the user
Console.Write("Max number of products: ");
numProducts = Int32.Parse(Console.ReadLine());
Console.Write("Number of producers: ");
numProducers = Int32.Parse(Console.ReadLine());
Console.Write("Numero of consumers: ");
numConsumers = Int32.Parse(Console.ReadLine());
Console.Write("Capacity of the storage buffer: ");
capacity = Int32.Parse(Console.ReadLine());
// a single shared storage place
Storage a = new Storage(numProducts, capacity);
// one producer object for each producer thread,
// to have different random generators
Producer[] prod = new Producer[numProducers];
Thread[] tProd = new Thread[numProducers];
// only one consumer object for all consumer threads
Consumer c = new Consumer(a);
Thread[] tCons = new Thread[numConsumers];
Random r = new Random();
// create the producers objects and threads
for(int i = 0; i < numProducers; i++)
{
prod = new Producer(a, r.Next(999999));
tProd = new Thread(new ThreadStart(prod.produce));
tProd.Name = "p" + (i+1);
}
// create the consumers threads
for(int i = 0; i < numConsumers; i++)
{
tCons = new Thread(new ThreadStart(c.consume));
tCons.Name = "c" + (i+1);
}
// start the threads
foreach (Thread t in tProd) {t.Start();}
foreach (Thread t in tCons) {t.Start();}
Console.WriteLine("Storage, producers and consumers created!");
}
}
}
I am trying to write a multithread program that simulates producers and
consumers. My program can have many producers and many consumers (each in a
separate thread). It has a storage place (buffer) with "n" capacity. The
user provides these parameters on startup. The buffer works with a circular
queue.
It's a console application. My implementation is working with Semaphores
to synchornize everything. I think it's synchornized OK, but the problem is:
I tell it to produce 10 products, and it produces 10 products, but the
consumers consume only 8 (they should consume all the products). I can't
find out why this happens.
Any help on this will be appreciated.
Thanks in advance!!!
Leonardo Hyppolito
here is the code:
It's also published (with syntax highlighting) here:
http://www.rafb.net/paste/results/u2946385.html
----
using System;
using System.Threading;
namespace TesteProdCons
{
public class Semaphore
{
private int counter;
// constructor
public Semaphore(int c)
{
counter = c;
}
public void p()
{
lock(this)
{
while(counter <= 0)
{
Monitor.Wait(this, Timeout.Infinite);
}
// we have control, decrement the counter
counter--;
}
}
public void v()
{
lock(this)
{
counter++;
Monitor.Pulse(this);
}
}
}
// can hold 1 product at a time (buffer of 1 position)
public class Storage
{
// semaphores used for synchronization
private Semaphore notFull;
private Semaphore notEmpty;
private Semaphore exclusive;
// prodBox is the buffer that keeps the products
private int[] prodBox;
private int readIndex;
private int writeIndex;
private int capacity;
// actual product number
private static int n;
// max number of products to be produced
private static int nMax;
// property for n
public int Number
{
get {return n;}
set {n = value;}
}
// property for nMax
public int MaxNumber
{
get {return nMax;}
set {nMax = value;}
}
// writes a string representation of the storage
public override string ToString()
{
string ret = "";
for (int i = 0; i < capacity; i++)
{
ret = ret + prodBox + " ";
}
ret = ret + "readIndex=" + readIndex + " writeIndex=" + writeIndex;
return ret;
}
// constructor
public Storage(int max, int cap)
{
n = 0;
nMax = max;
notFull = new Semaphore(cap);
notEmpty = new Semaphore(0);
exclusive = new Semaphore(1);
prodBox = new int[cap];
capacity = cap;
readIndex = 0;
writeIndex = 0;
}
public int Product
{
get
{
// wait for a nonEmpty resource
notEmpty.p();
// wait for exclusive access
exclusive.p();
// read the product
int ret = prodBox[readIndex];
Console.WriteLine(" " + Thread.CurrentThread.Name + " reads " + ret);
// set the new readIndex
readIndex = (readIndex + 1) % capacity;
// for debug
//Console.WriteLine(" " + Thread.CurrentThread.Name + "--> " + this);
// release the exclusive access
exclusive.v();
// release a notFull resource
notFull.v();
return ret;
}
set
{
// wait for a notFull resource
notFull.p();
// wait for exclusive access
exclusive.p();
// if already reached maximum number of productions,
// abort the producer thread
if(n >= nMax)
{
Console.WriteLine("Aborting " + Thread.CurrentThread.Name +
" (reached max number of productions)");
Thread.CurrentThread.Abort();
}
// store the product
prodBox[writeIndex] = value;
// set the new writeIndex
writeIndex = (writeIndex + 1) % capacity;
// increment the actual product number
n = n + 1;
// write information on screen
Console.WriteLine("++ [" + Thread.CurrentThread.Name + "] produces n." +
Number + ": " + value);
//Console.WriteLine(" " + Thread.CurrentThread.Name + "--> " + this);
// release the exclusive access
exclusive.v();
// release a notEmpty resource
notEmpty.v();
}
}
}
public class Producer
{
// a reference to the storage
private Storage a;
// seed for the random
private int seed;
// constructor
public Producer(Storage arm, int s)
{
a = arm;
seed = s;
}
// entry point for the producer thread
public void produce()
{
Random r = new Random(seed);
while(a.Number <= a.MaxNumber)
{
// get a random integer (from 0 to 2 seconds)
int prod = r.Next(2001);
// store the product
a.Product = prod;
// rest for a while...
Thread.Sleep(200);
}
}
}
public class Consumer
{
// a reference to the storage
private Storage a;
// constructor
public Consumer(Storage arm)
{
a = arm;
}
// entry point for the consumer thread
public void consume()
{
while(true)
{
// get the product value
int prod = a.Product;
// "consume" the product (pauses for the product's miliseconds)
Thread.Sleep(prod);
Console.WriteLine("-- [" + Thread.CurrentThread.Name + "] consumed " +
prod);
}
}
}
class TesteProdCons
{
static void Main(string[] args)
{
int numProducts;
int numProducers;
int numConsumers;
int capacity;
// read information from the user
Console.Write("Max number of products: ");
numProducts = Int32.Parse(Console.ReadLine());
Console.Write("Number of producers: ");
numProducers = Int32.Parse(Console.ReadLine());
Console.Write("Numero of consumers: ");
numConsumers = Int32.Parse(Console.ReadLine());
Console.Write("Capacity of the storage buffer: ");
capacity = Int32.Parse(Console.ReadLine());
// a single shared storage place
Storage a = new Storage(numProducts, capacity);
// one producer object for each producer thread,
// to have different random generators
Producer[] prod = new Producer[numProducers];
Thread[] tProd = new Thread[numProducers];
// only one consumer object for all consumer threads
Consumer c = new Consumer(a);
Thread[] tCons = new Thread[numConsumers];
Random r = new Random();
// create the producers objects and threads
for(int i = 0; i < numProducers; i++)
{
prod = new Producer(a, r.Next(999999));
tProd = new Thread(new ThreadStart(prod.produce));
tProd.Name = "p" + (i+1);
}
// create the consumers threads
for(int i = 0; i < numConsumers; i++)
{
tCons = new Thread(new ThreadStart(c.consume));
tCons.Name = "c" + (i+1);
}
// start the threads
foreach (Thread t in tProd) {t.Start();}
foreach (Thread t in tCons) {t.Start();}
Console.WriteLine("Storage, producers and consumers created!");
}
}
}