Multithreaded Database access with C# on an Sql2005 and TransactionScope class (Bug or did I some mi

M

Michael Schöller

Hello,

First of all english is not my natural language so please fogive me some bad
mistakes in gramatic and use of some vocables :).

I have a great problem here. Well I will not use it anymore but I want to
know why it is as it is ^^. I tried with .NET3.0 but I think it will be the
same with 2.0 and 3.5.
MSDTC is configured and working.

On the SQL2005-Server I created a Table Test (bigint, varchar(20)) filled
with
1, 'Test1'
2, 'Test2'
3, 'Test3'
4, 'Test4'
5, 'Test5'
6, 'Test6'

Execution the following Code works 1 of 20 Times (the other timed the
following Exception appears). After deleten the added Lines in the table and
Execute it again you always get the Exception:
(Sorry the Exception is partitially german and I don't know the english
counterpart but must be something like "Error while upgrading the
Transaction" and "There is already an opened DataReader assigned to this
connection" )
"System.Transactions.TransactionAbortedException: Die Transaktion wurde
abgebrochen. ---> System.Transactions.TransactionPromotionException: Fehler
beim Versuch, die Transaktion heraufzustufen. --->
System.Data.SqlClient.SqlException: Diesem Befehl ist bereits ein geöffneter
DataReader zugeordnet, der zuerst geschlossen werden muss.\r\n bei
System.Data.SqlClient.SqlInternalConnectionTds.ExecuteTransactionYukon(TransactionRequest
transactionRequest, String transactionName, IsolationLevel iso,
SqlInternalTransaction internalTransaction, Boolean
isDelegateControlRequest)\r\n bei
System.Data.SqlClient.SqlInternalConnectionTds.ExecuteTransaction(TransactionRequest
transactionRequest, String name, IsolationLevel iso, SqlInternalTransaction
internalTransaction, Boolean isDelegateControlRequest)\r\n bei
System.Data.SqlClient.SqlDelegatedTransaction.Promote()\r\n --- Ende der
internen Ausnahmestapelüberwachung ---\r\n bei
System.Data.SqlClient.SqlDelegatedTransaction.Promote()\r\n bei
System.Transactions.Tr

ansactionStatePSPEOperation.PSPEPromote(InternalTransaction tx)\r\n bei
System.Transactions.TransactionStateDelegatedBase.EnterState(InternalTransaction
tx)\r\n --- Ende der internen Ausnahmestapelüberwachung ---\r\n bei
System.Transactions.TransactionStateAborted.CheckForFinishedTransaction(InternalTransaction
tx)\r\n bei
System.Transactions.TransactionStatePhase0.Promote(InternalTransaction
tx)\r\n bei System.Transactions.Transaction.Promote()\r\n bei
System.Transactions.TransactionInterop.ConvertToOletxTransaction(Transaction
transaction)\r\n bei
System.Transactions.TransactionInterop.GetExportCookie(Transaction
transaction, Byte[] whereabouts)\r\n bei
System.Data.SqlClient.SqlInternalConnection.EnlistNonNull(Transaction
tx)\r\n bei System.Data.SqlClient.SqlInternalConnection.Enlist(Transaction
tx)\r\n bei
System.Data.SqlClient.SqlInternalConnectionTds.Activate(Transaction
transaction)\r\n bei
System.Data.ProviderBase.DbConnectionInternal.ActivateConnection(Transaction
transaction)\r\n

bei System.Data.ProviderBase.DbConnectionPool.GetConnection(DbConnection
owningObject)\r\n bei
System.Data.ProviderBase.DbConnectionFactory.GetConnection(DbConnection
owningConnection)\r\n bei
System.Data.ProviderBase.DbConnectionClosed.OpenConnection(DbConnection
outerConnection, DbConnectionFactory connectionFactory)\r\n bei
System.Data.SqlClient.SqlConnection.Open()\r\n bei
TestTransaktion.Program.worker2(Object ar) in D:\\DATEN\\SchoellerM\\Visual
Studio 2005\\Projects\\TestTransaktion\\TestTransaktion\\Program.cs:Zeile
120.\r\n bei System.Threading.ThreadHelper.ThreadStart_Context(Object
state)\r\n bei System.Threading.ExecutionContext.Run(ExecutionContext
executionContext, ContextCallback callback, Object state)\r\n bei
System.Threading.ThreadHelper.ThreadStart(Object obj)"

Well personaly I don't belief the part with the DataReader..here is the
code

using System;

using System.Collections.Generic;

using System.Text;

using System.Data.SqlClient;

using System.Transactions;

using System.Threading;

namespace TestTransaktion

{

class Program

{

static void Main(string[] args)

{

try

{

using (TransactionScope ts = new
TransactionScope(TransactionScopeOption.Required, TimeSpan.MaxValue))

{

Console.WriteLine("Check Transaction (RootStart): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Thread worker1 = new Thread(Program.worker1);

Thread worker2 = new Thread(Program.worker2);

worker1.IsBackground = true;

worker2.IsBackground = true;

worker1.Start(Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete));

worker2.Start(Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete));


//ThreadPool.QueueUserWorkItem(worker1,
Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete));

//ThreadPool.QueueUserWorkItem(worker2,
Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete));

Console.WriteLine("Check Transaction (RootEnd): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Console.WriteLine("About to complete the main thread");

ts.Complete();

}

Console.WriteLine("Transaction Completed");

}

catch (Exception ex)

{

Console.WriteLine("Top Catch");

Console.WriteLine(ex.ToString());

}

Console.WriteLine("Enter <Enter>");

Console.ReadLine();

}

static void worker1(object ar)

{

try{

DependentTransaction dtx = (DependentTransaction)ar;

using (TransactionScope ts = new TransactionScope(dtx))

{

Console.WriteLine("Check Transaction (Worker1Start): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

using (SqlConnection conn = new SqlConnection("Data
Source=WKOEDEV01\\SQL2005;Initial Catalog=WKOBASE_CLONE;Persist Security
Info=True;User ID=binreader;Password=readme2002"))

{

conn.Open();

SqlCommand co = new SqlCommand("INSERT INTO TEST VALUES(8, 'Test8')", conn);

co.ExecuteNonQuery();

co = new SqlCommand("SELECT * FROM TEST", conn);

{

SqlDataReader r = co.ExecuteReader();

while (r.Read())

{

Console.WriteLine("Reader1: {0}, {1}", r.GetInt64(0), r.GetString(1));

System.Threading.Thread.Sleep(TimeSpan.FromSeconds(5));

}

r.Close();

}

}

Thread.Sleep(9000);

//throw new Exception("Aufzah!");

Console.WriteLine("Check Transaction (Worker1End): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Console.WriteLine("About to complete the worker9 thread's transaction
scope");

ts.Complete();

}

Console.WriteLine("Completing the dependent clone");

dtx.Complete();

}

catch (Exception ex)

{

Console.WriteLine("Worker1 Catch");

Console.WriteLine(ex.ToString());

throw;

}

}

static void worker2(object ar)

{

try{

DependentTransaction dtx = (DependentTransaction)ar;

using (TransactionScope ts = new TransactionScope(dtx))

{

Console.WriteLine("Check Transaction (Worker1End): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

using(SqlConnection conn = new SqlConnection("Data
Source=WKOEDEV01\\SQL2005;Initial Catalog=WKOBASE_CLONE;Persist Security
Info=True;User ID=binreader;Password=readme2002"))

{

conn.Open();

Thread.Sleep(TimeSpan.FromSeconds(10)); //<- Wait till Main Thread reaches
ts.Commit();

SqlCommand co = new SqlCommand("INSERT INTO TEST VALUES(7, 'Test7')", conn);

co.ExecuteNonQuery();

co = new SqlCommand("SELECT * FROM TEST", conn);

{

SqlDataReader r = co.ExecuteReader();

while (r.Read())

{

Console.WriteLine("Reader2: {0}, {1}", r.GetInt64(0), r.GetString(1));

System.Threading.Thread.Sleep(TimeSpan.FromSeconds(2));

}

r.Close();

}

}

Thread.Sleep(5000);

Console.WriteLine("Check Transaction (Worker2End): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Console.WriteLine("About to complete the worker2 thread's transaction
scope");

ts.Complete();

}

Console.WriteLine("Completing the dependent clone");

dtx.Complete();

}

catch (Exception ex)

{

Console.WriteLine("Worker2 Catch");

Console.WriteLine(ex.ToString());

throw;

}

}

}

}



Alternativ habe ich auch diesen Code hier versucht der eher dem enspricht
was wir Momentan verwenden. Allerdings liefert der wiederum folgende
Exception (fast immer oder eben die obere)

Alternativ I tried out the following code, that was in a locical pouint of
few a little bit more like the one I am using at the moment. This code
throws the following execption (not every time but mostly the other time the
exception I showed you earlyer was thrown) (BTW funny thing same PC same
Studio but now an english exception.)

"System.Data.SqlClient.SqlException: Distributed transaction completed.
Either enlist this session in a new transaction or the NULL transaction.\r\n
bei System.Data.SqlClient.SqlConnection.OnError(SqlException exception,
Boolean breakConnection)\r\n bei
System.Data.SqlClient.SqlInternalConnection.OnError(SqlException exception,
Boolean breakConnection)\r\n bei
System.Data.SqlClient.TdsParser.ThrowExceptionAndWarning(TdsParserStateObject
stateObj)\r\n bei System.Data.SqlClient.TdsParser.Run(RunBehavior
runBehavior, SqlCommand cmdHandler, SqlDataReader dataStream,
BulkCopySimpleResultSet bulkCopyHandler, TdsParserStateObject stateObj)\r\n
bei System.Data.SqlClient.SqlCommand.RunExecuteNonQueryTds(String
methodName, Boolean async)\r\n bei
System.Data.SqlClient.SqlCommand.InternalExecuteNonQuery(DbAsyncResult
result, String methodName, Boolean sendToPipe)\r\n bei
System.Data.SqlClient.SqlCommand.ExecuteNonQuery()\r\n bei
TestTransaktion.Program.worker2(Object ar) in D:\\DATEN\\SchoellerM\\Visual
St

udio 2005\\Projects\\TestTransaktion\\TestTransaktion\\Program.cs:Zeile
133.\r\n bei System.Threading.ThreadHelper.ThreadStart_Context(Object
state)\r\n bei System.Threading.ExecutionContext.Run(ExecutionContext
executionContext, ContextCallback callback, Object state)\r\n bei
System.Threading.ThreadHelper.ThreadStart(Object obj)"

The Code

using System;

using System.Collections.Generic;

using System.Text;

using System.Data.SqlClient;

using System.Transactions;

using System.Threading;

namespace TestTransaktion

{

class Program

{

static SqlConnection conn1;

static SqlConnection conn2;

static void Main(string[] args)

{

try

{

conn1 = new SqlConnection("Data Source=WKOEDEV01\\SQL2005;Initial
Catalog=WKOBASE_CLONE;Persist Security Info=True;User
ID=binreader;Password=readme2002");

conn2 = new SqlConnection("Data Source=WKOEDEV01\\SQL2005;Initial
Catalog=WKOBASE_CLONE;Persist Security Info=True;User
ID=binreader;Password=readme2002");

conn1.Open();

conn2.Open();

using (TransactionScope ts = new
TransactionScope(TransactionScopeOption.Required, TimeSpan.MaxValue))

{

conn1.EnlistTransaction(Transaction.Current);

conn2.EnlistTransaction(Transaction.Current);

Console.WriteLine("Check Transaction (RootStart): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Thread worker1 = new Thread(Program.worker1);

Thread worker2 = new Thread(Program.worker2);

worker1.IsBackground = true;

worker2.IsBackground = true;

worker1.Start(Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete));

worker2.Start(Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete));


//ThreadPool.QueueUserWorkItem(worker1,
Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete));

//ThreadPool.QueueUserWorkItem(worker2,
Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete));

Console.WriteLine("Check Transaction (RootEnd): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Console.WriteLine("About to complete the main thread");

ts.Complete();

}

conn1.Close();

conn2.Close();

Console.WriteLine("Transaction Completed");

}

catch (Exception ex)

{

Console.WriteLine("Top Catch");

Console.WriteLine(ex.ToString());

}

Console.WriteLine("Enter <Enter>");

Console.ReadLine();

}

static void worker1(object ar)

{

try{

DependentTransaction dtx = (DependentTransaction)ar;

using (TransactionScope ts = new TransactionScope(dtx))

{

Console.WriteLine("Check Transaction (Worker1Start): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

//using (SqlConnection conn = new SqlConnection("Data
Source=WKOEDEV01\\SQL2005;Initial Catalog=WKOBASE_CLONE;Persist Security
Info=True;User ID=binreader;Password=readme2002"))

SqlConnection conn = conn1;

{

//conn.Open();

SqlCommand co = new SqlCommand("INSERT INTO TEST VALUES(8, 'Test8')", conn);

co.ExecuteNonQuery();

co = new SqlCommand("SELECT * FROM TEST", conn);

{

SqlDataReader r = co.ExecuteReader();

while (r.Read())

{

Console.WriteLine("Reader1: {0}, {1}", r.GetInt64(0), r.GetString(1));

System.Threading.Thread.Sleep(TimeSpan.FromSeconds(5));

}

r.Close();

}

}

Thread.Sleep(9000);

//throw new Exception("Aufzah!");

Console.WriteLine("Check Transaction (Worker1End): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Console.WriteLine("About to complete the worker9 thread's transaction
scope");

ts.Complete();

}

Console.WriteLine("Completing the dependent clone");

dtx.Complete();

}

catch (Exception ex)

{

Console.WriteLine("Worker1 Catch");

Console.WriteLine(ex.ToString());

throw;

}

}

static void worker2(object ar)

{

try{

DependentTransaction dtx = (DependentTransaction)ar;

using (TransactionScope ts = new TransactionScope(dtx))

{

Console.WriteLine("Check Transaction (Worker1End): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

//using(SqlConnection conn = new SqlConnection("Data
Source=WKOEDEV01\\SQL2005;Initial Catalog=WKOBASE_CLONE;Persist Security
Info=True;User ID=binreader;Password=readme2002"))

SqlConnection conn = conn2;

{

//conn.Open();

Thread.Sleep(TimeSpan.FromSeconds(10)); //<- Wait till Main Thread reaches
ts.Commit();

SqlCommand co = new SqlCommand("INSERT INTO TEST VALUES(7, 'Test7')", conn);

co.ExecuteNonQuery();

co = new SqlCommand("SELECT * FROM TEST", conn);

{

SqlDataReader r = co.ExecuteReader();

while (r.Read())

{

Console.WriteLine("Reader2: {0}, {1}", r.GetInt64(0), r.GetString(1));

System.Threading.Thread.Sleep(TimeSpan.FromSeconds(2));

}

r.Close();

}

}

Thread.Sleep(5000);

Console.WriteLine("Check Transaction (Worker2End): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Console.WriteLine("About to complete the worker2 thread's transaction
scope");

ts.Complete();

}

Console.WriteLine("Completing the dependent clone");

dtx.Complete();

}

catch (Exception ex)

{

Console.WriteLine("Worker2 Catch");

Console.WriteLine(ex.ToString());

throw;

}

}

}

}



I have no Idea why nothig there is working like I would expected it.

Is there an working method to access an database with multible Threads or
should I bury that idea?

Examples of working Templates are wery welcome.



With the request for help

Michael Schöller
 
S

Sheng Jiang[MVP]

Is there a reason why you have one scope for two threads?

--
Sheng Jiang
Microsoft MVP in VC++
Michael Schöller said:
Hello,

First of all english is not my natural language so please fogive me some bad
mistakes in gramatic and use of some vocables :).

I have a great problem here. Well I will not use it anymore but I want to
know why it is as it is ^^. I tried with .NET3.0 but I think it will be the
same with 2.0 and 3.5.
MSDTC is configured and working.

On the SQL2005-Server I created a Table Test (bigint, varchar(20)) filled
with
1, 'Test1'
2, 'Test2'
3, 'Test3'
4, 'Test4'
5, 'Test5'
6, 'Test6'

Execution the following Code works 1 of 20 Times (the other timed the
following Exception appears). After deleten the added Lines in the table and
Execute it again you always get the Exception:
(Sorry the Exception is partitially german and I don't know the english
counterpart but must be something like "Error while upgrading the
Transaction" and "There is already an opened DataReader assigned to this
connection" )
"System.Transactions.TransactionAbortedException: Die Transaktion wurde
abgebrochen. ---> System.Transactions.TransactionPromotionException: Fehler
beim Versuch, die Transaktion heraufzustufen. --->
System.Data.SqlClient.SqlException: Diesem Befehl ist bereits ein geöffneter
DataReader zugeordnet, der zuerst geschlossen werden muss.\r\n bei
System.Data.SqlClient.SqlInternalConnectionTds.ExecuteTransactionYukon(Trans
actionRequest
transactionRequest, String transactionName, IsolationLevel iso,
SqlInternalTransaction internalTransaction, Boolean
isDelegateControlRequest)\r\n bei
System.Data.SqlClient.SqlInternalConnectionTds.ExecuteTransaction(Transactio
nRequest
transactionRequest, String name, IsolationLevel iso, SqlInternalTransaction
internalTransaction, Boolean isDelegateControlRequest)\r\n bei
System.Data.SqlClient.SqlDelegatedTransaction.Promote()\r\n --- Ende der
internen Ausnahmestapelüberwachung ---\r\n bei
System.Data.SqlClient.SqlDelegatedTransaction.Promote()\r\n bei
System.Transactions.Tr

ansactionStatePSPEOperation.PSPEPromote(InternalTransaction tx)\r\n bei
System.Transactions.TransactionStateDelegatedBase.EnterState(InternalTransac
tion
tx)\r\n --- Ende der internen Ausnahmestapelüberwachung ---\r\n bei
System.Transactions.TransactionStateAborted.CheckForFinishedTransaction(Inte
rnalTransaction
tx)\r\n bei
System.Transactions.TransactionStatePhase0.Promote(InternalTransaction
tx)\r\n bei System.Transactions.Transaction.Promote()\r\n bei
System.Transactions.TransactionInterop.ConvertToOletxTransaction(Transaction
transaction)\r\n bei
System.Transactions.TransactionInterop.GetExportCookie(Transaction
transaction, Byte[] whereabouts)\r\n bei
System.Data.SqlClient.SqlInternalConnection.EnlistNonNull(Transaction
tx)\r\n bei System.Data.SqlClient.SqlInternalConnection.Enlist(Transaction
tx)\r\n bei
System.Data.SqlClient.SqlInternalConnectionTds.Activate(Transaction
transaction)\r\n bei
System.Data.ProviderBase.DbConnectionInternal.ActivateConnection(Transaction
transaction)\r\n

bei System.Data.ProviderBase.DbConnectionPool.GetConnection(DbConnection
owningObject)\r\n bei
System.Data.ProviderBase.DbConnectionFactory.GetConnection(DbConnection
owningConnection)\r\n bei
System.Data.ProviderBase.DbConnectionClosed.OpenConnection(DbConnection
outerConnection, DbConnectionFactory connectionFactory)\r\n bei
System.Data.SqlClient.SqlConnection.Open()\r\n bei
TestTransaktion.Program.worker2(Object ar) in D:\\DATEN\\SchoellerM\\Visual
Studio 2005\\Projects\\TestTransaktion\\TestTransaktion\\Program.cs:Zeile
120.\r\n bei System.Threading.ThreadHelper.ThreadStart_Context(Object
state)\r\n bei System.Threading.ExecutionContext.Run(ExecutionContext
executionContext, ContextCallback callback, Object state)\r\n bei
System.Threading.ThreadHelper.ThreadStart(Object obj)"

Well personaly I don't belief the part with the DataReader..here is the
code

using System;

using System.Collections.Generic;

using System.Text;

using System.Data.SqlClient;

using System.Transactions;

using System.Threading;

namespace TestTransaktion

{

class Program

{

static void Main(string[] args)

{

try

{

using (TransactionScope ts = new
TransactionScope(TransactionScopeOption.Required, TimeSpan.MaxValue))

{

Console.WriteLine("Check Transaction (RootStart): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Thread worker1 = new Thread(Program.worker1);

Thread worker2 = new Thread(Program.worker2);

worker1.IsBackground = true;

worker2.IsBackground = true;

worker1.Start(Transaction.Current.DependentClone(DependentCloneOption.BlockC
ommitUntilComplete));worker2.Start(Transaction.Current.DependentClone(DependentCloneOption.BlockC
ommitUntilComplete));
//ThreadPool.QueueUserWorkItem(worker1,
Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComp
lete));
//ThreadPool.QueueUserWorkItem(worker2,
Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComp
lete));

Console.WriteLine("Check Transaction (RootEnd): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Console.WriteLine("About to complete the main thread");

ts.Complete();

}

Console.WriteLine("Transaction Completed");

}

catch (Exception ex)

{

Console.WriteLine("Top Catch");

Console.WriteLine(ex.ToString());

}

Console.WriteLine("Enter <Enter>");

Console.ReadLine();

}

static void worker1(object ar)

{

try{

DependentTransaction dtx = (DependentTransaction)ar;

using (TransactionScope ts = new TransactionScope(dtx))

{

Console.WriteLine("Check Transaction (Worker1Start): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

using (SqlConnection conn = new SqlConnection("Data
Source=WKOEDEV01\\SQL2005;Initial Catalog=WKOBASE_CLONE;Persist Security
Info=True;User ID=binreader;Password=readme2002"))

{

conn.Open();

SqlCommand co = new SqlCommand("INSERT INTO TEST VALUES(8, 'Test8')", conn);

co.ExecuteNonQuery();

co = new SqlCommand("SELECT * FROM TEST", conn);

{

SqlDataReader r = co.ExecuteReader();

while (r.Read())

{

Console.WriteLine("Reader1: {0}, {1}", r.GetInt64(0), r.GetString(1));

System.Threading.Thread.Sleep(TimeSpan.FromSeconds(5));

}

r.Close();

}

}

Thread.Sleep(9000);

//throw new Exception("Aufzah!");

Console.WriteLine("Check Transaction (Worker1End): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Console.WriteLine("About to complete the worker9 thread's transaction
scope");

ts.Complete();

}

Console.WriteLine("Completing the dependent clone");

dtx.Complete();

}

catch (Exception ex)

{

Console.WriteLine("Worker1 Catch");

Console.WriteLine(ex.ToString());

throw;

}

}

static void worker2(object ar)

{

try{

DependentTransaction dtx = (DependentTransaction)ar;

using (TransactionScope ts = new TransactionScope(dtx))

{

Console.WriteLine("Check Transaction (Worker1End): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

using(SqlConnection conn = new SqlConnection("Data
Source=WKOEDEV01\\SQL2005;Initial Catalog=WKOBASE_CLONE;Persist Security
Info=True;User ID=binreader;Password=readme2002"))

{

conn.Open();

Thread.Sleep(TimeSpan.FromSeconds(10)); //<- Wait till Main Thread reaches
ts.Commit();

SqlCommand co = new SqlCommand("INSERT INTO TEST VALUES(7, 'Test7')", conn);

co.ExecuteNonQuery();

co = new SqlCommand("SELECT * FROM TEST", conn);

{

SqlDataReader r = co.ExecuteReader();

while (r.Read())

{

Console.WriteLine("Reader2: {0}, {1}", r.GetInt64(0), r.GetString(1));

System.Threading.Thread.Sleep(TimeSpan.FromSeconds(2));

}

r.Close();

}

}

Thread.Sleep(5000);

Console.WriteLine("Check Transaction (Worker2End): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Console.WriteLine("About to complete the worker2 thread's transaction
scope");

ts.Complete();

}

Console.WriteLine("Completing the dependent clone");

dtx.Complete();

}

catch (Exception ex)

{

Console.WriteLine("Worker2 Catch");

Console.WriteLine(ex.ToString());

throw;

}

}

}

}



Alternativ habe ich auch diesen Code hier versucht der eher dem enspricht
was wir Momentan verwenden. Allerdings liefert der wiederum folgende
Exception (fast immer oder eben die obere)

Alternativ I tried out the following code, that was in a locical pouint of
few a little bit more like the one I am using at the moment. This code
throws the following execption (not every time but mostly the other time the
exception I showed you earlyer was thrown) (BTW funny thing same PC same
Studio but now an english exception.)

"System.Data.SqlClient.SqlException: Distributed transaction completed.
Either enlist this session in a new transaction or the NULL transaction.\r\n
bei System.Data.SqlClient.SqlConnection.OnError(SqlException exception,
Boolean breakConnection)\r\n bei
System.Data.SqlClient.SqlInternalConnection.OnError(SqlException exception,
Boolean breakConnection)\r\n bei
System.Data.SqlClient.TdsParser.ThrowExceptionAndWarning(TdsParserStateObjec
t
stateObj)\r\n bei System.Data.SqlClient.TdsParser.Run(RunBehavior
runBehavior, SqlCommand cmdHandler, SqlDataReader dataStream,
BulkCopySimpleResultSet bulkCopyHandler, TdsParserStateObject stateObj)\r\n
bei System.Data.SqlClient.SqlCommand.RunExecuteNonQueryTds(String
methodName, Boolean async)\r\n bei
System.Data.SqlClient.SqlCommand.InternalExecuteNonQuery(DbAsyncResult
result, String methodName, Boolean sendToPipe)\r\n bei
System.Data.SqlClient.SqlCommand.ExecuteNonQuery()\r\n bei
TestTransaktion.Program.worker2(Object ar) in D:\\DATEN\\SchoellerM\\Visual
St

udio 2005\\Projects\\TestTransaktion\\TestTransaktion\\Program.cs:Zeile
133.\r\n bei System.Threading.ThreadHelper.ThreadStart_Context(Object
state)\r\n bei System.Threading.ExecutionContext.Run(ExecutionContext
executionContext, ContextCallback callback, Object state)\r\n bei
System.Threading.ThreadHelper.ThreadStart(Object obj)"

The Code

using System;

using System.Collections.Generic;

using System.Text;

using System.Data.SqlClient;

using System.Transactions;

using System.Threading;

namespace TestTransaktion

{

class Program

{

static SqlConnection conn1;

static SqlConnection conn2;

static void Main(string[] args)

{

try

{

conn1 = new SqlConnection("Data Source=WKOEDEV01\\SQL2005;Initial
Catalog=WKOBASE_CLONE;Persist Security Info=True;User
ID=binreader;Password=readme2002");

conn2 = new SqlConnection("Data Source=WKOEDEV01\\SQL2005;Initial
Catalog=WKOBASE_CLONE;Persist Security Info=True;User
ID=binreader;Password=readme2002");

conn1.Open();

conn2.Open();

using (TransactionScope ts = new
TransactionScope(TransactionScopeOption.Required, TimeSpan.MaxValue))

{

conn1.EnlistTransaction(Transaction.Current);

conn2.EnlistTransaction(Transaction.Current);

Console.WriteLine("Check Transaction (RootStart): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Thread worker1 = new Thread(Program.worker1);

Thread worker2 = new Thread(Program.worker2);

worker1.IsBackground = true;

worker2.IsBackground = true;

worker1.Start(Transaction.Current.DependentClone(DependentCloneOption.BlockC
ommitUntilComplete));worker2.Start(Transaction.Current.DependentClone(DependentCloneOption.BlockC
ommitUntilComplete));
//ThreadPool.QueueUserWorkItem(worker1,
Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComp
lete));
//ThreadPool.QueueUserWorkItem(worker2,
Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComp
lete));

Console.WriteLine("Check Transaction (RootEnd): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Console.WriteLine("About to complete the main thread");

ts.Complete();

}

conn1.Close();

conn2.Close();

Console.WriteLine("Transaction Completed");

}

catch (Exception ex)

{

Console.WriteLine("Top Catch");

Console.WriteLine(ex.ToString());

}

Console.WriteLine("Enter <Enter>");

Console.ReadLine();

}

static void worker1(object ar)

{

try{

DependentTransaction dtx = (DependentTransaction)ar;

using (TransactionScope ts = new TransactionScope(dtx))

{

Console.WriteLine("Check Transaction (Worker1Start): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

//using (SqlConnection conn = new SqlConnection("Data
Source=WKOEDEV01\\SQL2005;Initial Catalog=WKOBASE_CLONE;Persist Security
Info=True;User ID=binreader;Password=readme2002"))

SqlConnection conn = conn1;

{

//conn.Open();

SqlCommand co = new SqlCommand("INSERT INTO TEST VALUES(8, 'Test8')", conn);

co.ExecuteNonQuery();

co = new SqlCommand("SELECT * FROM TEST", conn);

{

SqlDataReader r = co.ExecuteReader();

while (r.Read())

{

Console.WriteLine("Reader1: {0}, {1}", r.GetInt64(0), r.GetString(1));

System.Threading.Thread.Sleep(TimeSpan.FromSeconds(5));

}

r.Close();

}

}

Thread.Sleep(9000);

//throw new Exception("Aufzah!");

Console.WriteLine("Check Transaction (Worker1End): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Console.WriteLine("About to complete the worker9 thread's transaction
scope");

ts.Complete();

}

Console.WriteLine("Completing the dependent clone");

dtx.Complete();

}

catch (Exception ex)

{

Console.WriteLine("Worker1 Catch");

Console.WriteLine(ex.ToString());

throw;

}

}

static void worker2(object ar)

{

try{

DependentTransaction dtx = (DependentTransaction)ar;

using (TransactionScope ts = new TransactionScope(dtx))

{

Console.WriteLine("Check Transaction (Worker1End): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

//using(SqlConnection conn = new SqlConnection("Data
Source=WKOEDEV01\\SQL2005;Initial Catalog=WKOBASE_CLONE;Persist Security
Info=True;User ID=binreader;Password=readme2002"))

SqlConnection conn = conn2;

{

//conn.Open();

Thread.Sleep(TimeSpan.FromSeconds(10)); //<- Wait till Main Thread reaches
ts.Commit();

SqlCommand co = new SqlCommand("INSERT INTO TEST VALUES(7, 'Test7')", conn);

co.ExecuteNonQuery();

co = new SqlCommand("SELECT * FROM TEST", conn);

{

SqlDataReader r = co.ExecuteReader();

while (r.Read())

{

Console.WriteLine("Reader2: {0}, {1}", r.GetInt64(0), r.GetString(1));

System.Threading.Thread.Sleep(TimeSpan.FromSeconds(2));

}

r.Close();

}

}

Thread.Sleep(5000);

Console.WriteLine("Check Transaction (Worker2End): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Console.WriteLine("About to complete the worker2 thread's transaction
scope");

ts.Complete();

}

Console.WriteLine("Completing the dependent clone");

dtx.Complete();

}

catch (Exception ex)

{

Console.WriteLine("Worker2 Catch");

Console.WriteLine(ex.ToString());

throw;

}

}

}

}



I have no Idea why nothig there is working like I would expected it.

Is there an working method to access an database with multible Threads or
should I bury that idea?

Examples of working Templates are wery welcome.



With the request for help

Michael Schöller
 
M

Michael Schöller

Well yes.

This is only an Test.

The real Programm should calculate some values for Customers.

There is a method that takes an Customer-Id as parameter an calculates all
values for that Customer.

The application process customers in blocks of 50. A Transaction is opend 50
Custumervalues are calculated and the transaction is closes. (If everything
goes right). This is done by calling the method in an loop with an customer
ID one by one.

The machine where the application is running has 8 CPUs so I was thinking
about speeding up the application by calling the method 10 times as Threads
with differend Customer-Ids and do data reading and calculating of the
values parallel.

The results of the calculation are taken and written to the database.

However if an error occur the whole block has to be taken back (In worst
case all 49 correctly calculated customers. The Transactionblock is logged
and marked as error then).

Data reading and calculaing is done within the loop so there is already an
open transaction around that.
 
M

Michael Schöller

Ok I changed my test to reflect my needs a bit more.
I also disabled connection pooling. The effect was that my first version is
working now. (so I tried an long time test)
But this one is still not working and I don't know why...

Alway get the error (the one with the already open DataReader)
"System.Transactions.TransactionAbortedException: Die Transaktion wurde
abgebrochen. ---> System.Transactions.TransactionPromotionException: Fehler
beim Versuch, die Transaktion heraufzustufen. --->
System.Data.SqlClient.SqlException: Diesem Befehl ist bereits ein geöffneter
DataReader zugeordnet, der zuerst geschlossen werden muss.\r\n bei
System.Data.SqlClient.SqlInternalConnectionTds.ExecuteTransactionYukon(TransactionRequest
transactionRequest, String transactionName, IsolationLevel iso,
SqlInternalTransaction internalTransaction, Boolean
isDelegateControlRequest)\r\n bei
System.Data.SqlClient.SqlInternalConnectionTds.ExecuteTransaction(TransactionRequest
transactionRequest, String name, IsolationLevel iso, SqlInternalTransaction
internalTransaction, Boolean isDelegateControlRequest)\r\n bei
System.Data.SqlClient.SqlDelegatedTransaction.Promote()\r\n --- Ende der
internen Ausnahmestapelüberwachung ---\r\n bei
System.Data.SqlClient.SqlDelegatedTransaction.Promote()\r\n bei
System.Transactions.Tr

ansactionStatePSPEOperation.PSPEPromote(InternalTransaction tx)\r\n bei
System.Transactions.TransactionStateDelegatedBase.EnterState(InternalTransaction
tx)\r\n --- Ende der internen Ausnahmestapelüberwachung ---\r\n bei
System.Transactions.TransactionStateAborted.EndCommit(InternalTransaction
tx)\r\n bei System.Transactions.CommittableTransaction.Commit()\r\n bei
System.Transactions.TransactionScope.InternalDispose()\r\n bei
System.Transactions.TransactionScope.Dispose()\r\n bei
TestTransaktion.Program.Main(String[] args) in D:\\DATEN\\SchoellerM\\Visual
Studio 2005\\Projects\\TestTransaktion\\TestTransaktion\\Program.cs:Zeile
31.")


Always get an...
using System;

using System.Collections.Generic;

using System.Text;

using System.Data.SqlClient;

using System.Transactions;

using System.Threading;

namespace TestTransaktion

{

class Program

{

static void Main(string[] args)

{

try

{

using (TransactionScope ts = new
TransactionScope(TransactionScopeOption.Required, TimeSpan.MaxValue))

{

Console.WriteLine("Check Transaction (RootStart): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

for(int i = 0; i < 1000; ++i)

{

using (TransactionScope ts2 = new
TransactionScope(TransactionScopeOption.RequiresNew))

{

Console.WriteLine("Check Transaction (InnerStart): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

for (int j = 0; j < 10; ++j)

{

ThreadPool.QueueUserWorkItem(worker3,
Transaction.Current.DependentClone(DependentCloneOption.BlockCommitUntilComplete));

}

Console.WriteLine("Check Transaction (InnerEnd): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Console.WriteLine("About to complete the Innter Transaction");

ts2.Complete();

}

}

Console.WriteLine("Check Transaction (RootEnd): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Console.WriteLine("About to complete the main thread");

ts.Complete();

}

Console.WriteLine("Transaction Completed");

}

catch (Exception ex)

{

Console.WriteLine("Top Catch");

Console.WriteLine(ex.ToString());

}

Console.WriteLine("Enter <Enter>");

Console.ReadLine();

}

static void worker3(object ar)

{

try

{

DependentTransaction dtx = (DependentTransaction)ar;

using (TransactionScope ts = new TransactionScope(dtx))

{

Console.WriteLine("Check Transaction (Worker3Start): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

using (SqlConnection conn = new SqlConnection("Data
Source=WKOEDEV01\\SQL2005;Initial Catalog=WKOBASE_CLONE;Persist Security
Info=True;User ID=binreader;Password=readme2002;Pooling=false"))

{

conn.Open();

SqlCommand co = new SqlCommand("SELECT * FROM TEST", conn);

{

SqlDataReader r = co.ExecuteReader();

while (r.Read())

{

Console.WriteLine("Reader3: {0}, {1}", r.GetInt64(0), r.GetString(1));

System.Threading.Thread.Sleep(TimeSpan.FromSeconds(2));

}

r.Close();

}

}

Console.WriteLine("Check Transaction (Worker3End): L:{0} D:{1}",
Transaction.Current.TransactionInformation.LocalIdentifier,
Transaction.Current.TransactionInformation.DistributedIdentifier);

Console.WriteLine("About to complete the worker3 thread's transaction
scope");

ts.Complete();

}

Console.WriteLine("Completing the dependent clone");

dtx.Complete();

}

catch (Exception ex)

{

Console.WriteLine("Worker3 Catch");

Console.WriteLine(ex.ToString());

}

}

}

}
 
Top