Thread question - I know something is not done right.

A

admin

ok This is my main. Pretty much it goes through each category and
starts up 4 worker threads that then ask for groups to gether from. My
problem is that when the thread gets done it keeps the mysql
connections open so I end up with quite a few at the end. Is there a
different way that I should do this?

class Program
{
static string[] categories = { "emulation" , "audio" ,
"console" , "anime" , "xxx" , "tv" , "pictures" , "video" };

static void Main(string[] args)
{
string proc = Process.GetCurrentProcess().ProcessName;
// get the list of all processes by that name
Process[] processes = Process.GetProcessesByName(proc);
// if there is more than one process...
if (processes.Length > 1)
{
//MessageBox.Show("Application is already running");
return;
}
else
{
for (int x = 0; x < categories.Length; x++)
{
MasterList master = new MasterList();
Groups groups = new Groups(categories[x]);
WorkerClass WC1 = new WorkerClass(master, groups);
WorkerClass WC2 = new WorkerClass(master, groups);
WorkerClass WC3 = new WorkerClass(master, groups);
WorkerClass WC4 = new WorkerClass(master, groups);
Thread Worker1 = new Thread(new
ThreadStart(WC1.Start));
Thread Worker2 = new Thread(new
ThreadStart(WC2.Start));
Thread Worker3 = new Thread(new
ThreadStart(WC3.Start));
Thread Worker4 = new Thread(new
ThreadStart(WC4.Start));
Worker1.Name = "Worker1";
Worker2.Name = "Worker2";
Worker3.Name = "Worker3";
Worker4.Name = "Worker4";
Worker1.Start();
Worker2.Start();
Worker3.Start();
Worker4.Start();
Worker4.Join();
Worker3.Join();
Worker2.Join();
Worker1.Join();
Console.WriteLine(master.size());
master.SetEnumerator();
Updater U1 = new Updater(master, categories[x]);
Updater U2 = new Updater(master, categories[x]);
Updater U3 = new Updater(master, categories[x]);
Thread Updater1 = new Thread(new
ThreadStart(U1.insert));
Thread Updater2 = new Thread(new
ThreadStart(U2.insert));
Thread Updater3 = new Thread(new
ThreadStart(U3.insert));
Updater1.Start();
Updater2.Start();
Updater3.Start();
Updater1.Join();
Updater2.Join();
Updater3.Join();
}
}
}
}
 
V

Vadym Stetsyak

Hello, (e-mail address removed)!

a> ok This is my main. Pretty much it goes through each category and
a> starts up 4 worker threads that then ask for groups to gether from.
a> My
a> problem is that when the thread gets done it keeps the mysql
a> connections open so I end up with quite a few at the end. Is there a
a> different way that I should do this?

Instead of using separate thread you can use ThreadPool class. And several WaitHandle-s to
indicate thread completion.

( http://msdn.microsoft.com/library/default.asp?url=/library/en-us/dndotnet/html/progthrepool.asp )

Also it would be more simpler to answer your question if you'll provide source for
WC1.Start method.or at least the part where you work with Db.


[skipped]


--
Regards, Vadym Stetsyak
www: http://vadmyst.blogspot.com
 
A

admin

ok will post more code when I get home. For now though I have a
managed threadpool class at home that is suppose to be better and
faster than the .net threadpool. Does anyone know if that is correct.
 
B

Brian Gideon

ok will post more code when I get home. For now though I have a
managed threadpool class at home that is suppose to be better and
faster than the .net threadpool. Does anyone know if that is correct.

Could you tell us more about this threadpool class? I'm usually
skeptical of anything home grown. If you're wanting something better
than the built-in ThreadPool class then take a look at the concurrency
and coordination runtime (CCR).

http://msdn.microsoft.com/msdnmag/issues/06/09/ConcurrentAffairs/default.aspx

Brian
 
A

admin

I think I got all the regions opened up so all code should be there.

// Stephen Toub
// (e-mail address removed)
//
// ManagedThreadPool.cs
// ThreadPool written in 100% managed code. Mimics the core
functionality of
// the System.Threading.ThreadPool class.
//
// HISTORY:
// v1.0.1 - Disposes of items remaining in queue when the queue is
emptied
// - Catches errors thrown during execution of delegates
// - Added reset to semaphore, called during empty queue
// - Catches errors when unable to dequeue delegates
// v1.0.0 - Original version
//
// August 27, 2002
// v1.0.1

//
http://www.gotdotnet.com/community/usersamples/Default.aspx?query=ManagedThreadPool

#region Namespaces
using System;
using System.Threading;
using System.Collections;
#endregion

namespace Toub.Threading
{
/// <summary>Implementation of Dijkstra's PV Semaphore based on the
Monitor class.</summary>
public class Semaphore
{
#region Member Variables
/// <summary>The number of units alloted by this semaphore.</summary>
private int _count;
#endregion

#region Construction
/// <summary> Initialize the semaphore as a binary
semaphore.</summary>
public Semaphore() : this(1)
{
}

/// <summary> Initialize the semaphore as a counting
semaphore.</summary>
/// <param name="count">Initial number of threads that can take out
units from this semaphore.</param>
/// <exception cref="ArgumentException">Throws if the count argument
is less than 1.</exception>
public Semaphore(int count)
{
if (count < 0) throw new ArgumentException("Semaphore must have a
count of at least 0.", "count");
_count = count;
}
#endregion

#region Synchronization Operations
/// <summary>V the semaphore (add 1 unit to it).</summary>
public void AddOne() { V(); }

/// <summary>P the semaphore (take out 1 unit from it).</summary>
public void WaitOne() { P(); }

/// <summary>P the semaphore (take out 1 unit from it).</summary>
public void P()
{
// Lock so we can work in peace. This works because lock is
actually
// built around Monitor.
lock(this)
{
// Wait until a unit becomes available. We need to wait
// in a loop in case someone else wakes up before us. This could
// happen if the Monitor.Pulse statements were changed to
Monitor.PulseAll
// statements in order to introduce some randomness into the order
// in which threads are woken.
while(_count <= 0) Monitor.Wait(this, Timeout.Infinite);
_count--;
}
}

/// <summary>V the semaphore (add 1 unit to it).</summary>
public void V()
{
// Lock so we can work in peace. This works because lock is
actually
// built around Monitor.
lock(this)
{
// Release our hold on the unit of control. Then tell everyone
// waiting on this object that there is a unit available.
_count++;
Monitor.Pulse(this);
}
}

/// <summary>Resets the semaphore to the specified count. Should be
used cautiously.</summary>
public void Reset(int count)
{
lock(this) { _count = count; }
}
#endregion
}

/// <summary>Managed thread pool.</summary>
public class ManagedThreadPool
{
#region Constants
/// <summary>Maximum number of threads the thread pool has at its
disposal.</summary>
private const int _maxWorkerThreads = 15;
#endregion

#region Member Variables
/// <summary>Queue of all the callbacks waiting to be
executed.</summary>
static Queue _waitingCallbacks;
/// <summary>
/// Used to signal that a worker thread is needed for processing.
Note that multiple
/// threads may be needed simultaneously and as such we use a
semaphore instead of
/// an auto reset event.
/// </summary>
static Semaphore _workerThreadNeeded;
/// <summary>List of all worker threads at the disposal of the thread
pool.</summary>
static ArrayList _workerThreads;
/// <summary>Number of threads currently active.</summary>
static int _inUseThreads;
#endregion

#region Construction
/// <summary>Initialize the thread pool.</summary>
static ManagedThreadPool()
{
// Create our thread stores; we handle synchronization ourself
// as we may run into situtations where multiple operations need to
be atomic.
// We keep track of the threads we've created just for good measure;
not actually
// needed for any core functionality.
_waitingCallbacks = new Queue();
_workerThreads = new ArrayList();
_inUseThreads = 0;

// Create our "thread needed" event
_workerThreadNeeded = new Semaphore(0);

// Create all of the worker threads
for(int i=0; i<_maxWorkerThreads; i++)
{
// Create a new thread and add it to the list of threads.
Thread newThread = new Thread(new ThreadStart(ProcessQueuedItems));
_workerThreads.Add(newThread);

// Configure the new thread and start it
newThread.Name = "ManagedPoolThread #" + i.ToString();
newThread.IsBackground = true;
newThread.Start();
}
}
#endregion

#region Public Methods
/// <summary>Queues a user work item to the thread pool.</summary>
/// <param name="callback">
/// A WaitCallback representing the delegate to invoke when the
thread in the
/// thread pool picks up the work item.
/// </param>
public static void QueueUserWorkItem(WaitCallback callback)
{
// Queue the delegate with no state
QueueUserWorkItem(callback, null);
}

/// <summary>Queues a user work item to the thread pool.</summary>
/// <param name="callback">
/// A WaitCallback representing the delegate to invoke when the
thread in the
/// thread pool picks up the work item.
/// </param>
/// <param name="state">
/// The object that is passed to the delegate when serviced from the
thread pool.
/// </param>
public static void QueueUserWorkItem(WaitCallback callback, object
state)
{
// Create a waiting callback that contains the delegate and its
state.
// Add it to the processing queue, and signal that data is waiting.
WaitingCallback waiting = new WaitingCallback(callback, state);
lock(_waitingCallbacks.SyncRoot) {
_waitingCallbacks.Enqueue(waiting); }
_workerThreadNeeded.AddOne();
}

/// <summary>Empties the work queue of any queued work
items.</summary>
public static void EmptyQueue()
{
lock(_waitingCallbacks.SyncRoot)
{
try
{
// Try to dispose of all remaining state
foreach(object obj in _waitingCallbacks)
{
WaitingCallback callback = (WaitingCallback)obj;
if (callback.State is IDisposable)
((IDisposable)callback.State).Dispose();
}
}
catch
{
// Make sure an error isn't thrown.
}

// Clear all waiting items and reset the number of worker threads
currently needed
// to be 0 (there is nothing for threads to do)
_waitingCallbacks.Clear();
_workerThreadNeeded.Reset(0);
}
}
#endregion

#region Properties
/// <summary>Gets the number of threads at the disposal of the thread
pool.</summary>
public static int MaxThreads { get { return _maxWorkerThreads; } }
/// <summary>Gets the number of currently active threads in the
thread pool.</summary>
public static int ActiveThreads { get { return _inUseThreads; } }
/// <summary>Gets the number of callback delegates currently waiting
in the thread pool.</summary>
public static int WaitingCallbacks { get {
lock(_waitingCallbacks.SyncRoot) { return _waitingCallbacks.Count; } }
}
#endregion

#region Thread Processing
/// <summary>A thread worker function that processes items from the
work queue.</summary>
private static void ProcessQueuedItems()
{
// Process indefinitely
while(true)
{
// Get the next item in the queue. If there is nothing there, go
to sleep
// for a while until we're woken up when a callback is waiting.
WaitingCallback callback = null;
while (callback == null)
{
// Try to get the next callback available. We need to lock on the

// queue in order to make our count check and retrieval atomic.
lock(_waitingCallbacks.SyncRoot)
{
if (_waitingCallbacks.Count > 0)
{
try { callback = (WaitingCallback)_waitingCallbacks.Dequeue(); }

catch{} // make sure not to fail here
}
}

// If we can't get one, go to sleep.
if (callback == null) _workerThreadNeeded.WaitOne();
}

// We now have a callback. Execute it. Make sure to accurately
// record how many callbacks are currently executing.
try
{
Interlocked.Increment(ref _inUseThreads);
callback.Callback(callback.State);
}
catch
{
// Make sure we don't throw here. Errors are not our problem.
}
finally
{
Interlocked.Decrement(ref _inUseThreads);
}
}
}
#endregion

/// <summary>Used to hold a callback delegate and the state for that
delegate.</summary>
private class WaitingCallback
{
#region Member Variables
/// <summary>Callback delegate for the callback.</summary>
private WaitCallback _callback;
/// <summary>State with which to call the callback
delegate.</summary>
private object _state;
#endregion

#region Construction
/// <summary>Initialize the callback holding object.</summary>
/// <param name="callback">Callback delegate for the
callback.</param>
/// <param name="state">State with which to call the callback
delegate.</param>
public WaitingCallback(WaitCallback callback, object state)
{
_callback = callback;
_state = state;
}
#endregion

#region Properties
/// <summary>Gets the callback delegate for the callback.</summary>
public WaitCallback Callback { get { return _callback; } }
/// <summary>Gets the state with which to call the callback
delegate.</summary>
public object State { get { return _state; } }
#endregion
}
}
}
 
Top