//File: ProcessStream.cs
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading; //AutoEvent
using System.Timers; //ModTimer
using System.IO; //FileStream
using TableProcessorFacade; //Facade for TableProcessor API
namespace MicrosoftCorp_MSTV_Metadata
{
public class ProcessStream
{
//Processing flags
Boolean done;
Boolean pModsAllCompleted, fModsAllCompleted;
const int SUCCESS = 0; //return callback status to API
const int FAILURE = 1; //return callback status to API
static TpFacade.TS_TYPE TsType = TpFacade.TS_TYPE.ActualTS;
static TpFacade.EIT_TYPE EitType = TpFacade.EIT_TYPE.PresentFollowing;
System.IO.FileStream fsInput;
System.IO.FileStream fsOutput;
static TpFacade.GetStreamBufCallback GetStreamBufCb;
static TpFacade.PutStreamBufCallback PutStreamBufCb;
//Progress indicators
short ReadProgressRollover65535 = 0;
short WriteProgressRollover65535 = 0;
char CheckEitTablesRollover256 = (char)0;
//Threading objects
//lock for thread sync with API processing
//ms-help://MS.VSCC.v80/MS.MSDN.v80/MS.NETDEVFX.v20.en/cpref12/html/T_System_Threading_AutoResetEvent.htm
static AutoResetEvent m_autoEventProcessingDone;
static System.Timers.Timer ModTimer;
public void Process(RunParmsDataset.RunFileSetRow fSet,
ProposedEventPair evtPair, StreamEventsDataset.EventServiceRow EventTriplet,
bool WriteStreamEventsLog, bool WritePacketsBeforeModifiedEitEvent,
System.Int64 SkipBytesBeforeEitCheck)
{
//processing flags
done = false;
pModsAllCompleted = false;
fModsAllCompleted = false;
//Disable API logging
string apiLogFileString = String.Empty;
bool m_EnableLogging = false;
int m_LogLevel = 0;
//Open files
//TODO Validate input file path better.
string InputPath = Path.GetFullPath(fSet.InputFilePath);
string OutputPath = Path.GetFullPath(fSet.OutputFilePath);
fsInput = File.OpenRead(InputPath);
fsOutput = File.OpenWrite(OutputPath);
//Maintain reference to callback delegates to avoid garbage
collection following return from InitProcessorFacade.
GetStreamBufCb = GetStreamBuf;
PutStreamBufCb = PutStreamBuf;
//TODO Exception handling, reporting, logging.
//prep, then call API InitProcessor
//The "done" clause of GetBuffer releases this lock.
m_autoEventProcessingDone = new AutoResetEvent(false);
//TODO: Ensure that delegates are not garbage collected after
the InitProcessor call returns.
// "If, however, the callback function can be invoked after the
call returns,
// the managed caller must take steps to ensure that the
delegate remains uncollected
// until the callback function finishes. For detailed
information about preventing garbage collection,
// see Interop Marshaling with Platform Invoke. "
//HandleRef Sample
//
http://msdn2.microsoft.com/en-us/library/hc662t8k(VS.80).aspx
BeginProcessing(apiLogFileString, m_EnableLogging, m_LogLevel);
m_autoEventProcessingDone.WaitOne();
//Processing completed for current file set
}//Process
private void BeginProcessing(string apiLogFileString, bool
m_EnableLogging, int m_LogLevel)
{
// initialize the API
if (TpFacade.InitProcessorFacade(GetStreamBufCb, PutStreamBufCb,
m_EnableLogging, m_LogLevel, apiLogFileString))
{
return;
}
}//BeginProcessing
#region TableProcessor API callbacks
// Callback function used by the API to read packet from the
application
public Int32 GetStreamBuf(Byte[] buf, Int32 bufSize)
{
if (fsInput != null)
{
try
{
if (fsInput.Read(buf, 0, bufSize) == TpFacade.PACKET_SIZE)
{
Console.Write("->{0}", ReadProgressRollover65535);
Console.Out.Flush();
//stability - wait for API threads
System.Threading.Thread.Sleep(200);
//progress indicator
if (ReadProgressRollover65535 == 0)
{
Console.Write("XX->");
Console.Out.Flush();
}
ReadProgressRollover65535++;
return SUCCESS; //return callback
status to API
}
else
{
Console.Write("Done reading input (failed read).");
Console.Out.Flush();
done = true;
return FAILURE; //return callback
status to API
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
done = true;
throw;
}
}
else
{
done = true;
Console.Write("Done reading input (eof).");
Console.Out.Flush();
return FAILURE;
}
}//GetStreamBuf
// Callback function used by the API to write packet to the
application
public Int32 PutStreamBuf(Byte[] buf, Int32 bufSize)
{
try
{
if (!(buf == null))
{
Console.Write("<-");
Console.Out.Flush();
fsOutput.Write(buf, 0, bufSize);
//TODO Stack overflow occurs regardless of this call.
//ModifyEit();
return SUCCESS; //Success == 0
}
m_autoEventProcessingDone.Set();
return FAILURE; //stop API thread?
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
//Processing completed. Release sync lock.
m_autoEventProcessingDone.Set();
throw;
}
}
private void ModifyEit()
{
if (!done)
{
//TODO Why allocate this here?
byte[] buf = new byte[TpFacade.SDT_SECTION_SIZE]; //4096*100
//Accumulate EIT event tables
//Get the ActualPresentEit table
if (TpFacade.GetActualPresentEitFacade(buf))
{
Console.Write("\nFound GetActualPresentEit\n");
Console.Out.Flush();
}
}
}//PutStreamBuf
#endregion //TableProcessor API callbacks
}//class ProcessStream
}
--Tom Allen