Home >Backend Development >C#.Net Tutorial >C# Message Queuing Application-2

C# Message Queuing Application-2

黄舟
黄舟Original
2016-12-17 16:59:101329browse

Inside this array, the CWorker class creates an implementation
of the CWorkerThread class. CWorkerThread A class (discussed below) is an abstract class that must be inherited. The exported
class defines how messages are processed:
aThreads = new ArrayList();
for (int idx=0; idx〈sfWorker.NumberThreads; idx++)
{
  WorkerThreadFormatter wfThread = new WorkerThreadFormatter();
  wfThread.PRocessName = sfWorker.ProcessName;
  wfThread.ProcessDesc = sfWorker.ProcessDesc;
  wfThread.ThreadNumber = idx;
  wfThread.InputQueue = sfWorker.InputQueue;
  wfThread.ErrorQueue = sfWorker.ErrorQueue;
  wfThread.OutputName = sfWorker.OutputName;
  // Define the auxiliary type and insert it into the auxiliary thread structure
  CWorkerThread wtBase;
  switch (sfWorker.ProcessType)
  {
  case WorkerFormatter.SFProcessType.ProcessRoundRobin:
  wtBase = new CWorkerThreadRoundRobin(this, wfThread);
   break;
  case WorkerFormatter.SFProcessType.ProcessAppSpecific:
   wtBase = new CWorkerThreadAppSpecific(this, wfThread);
   break;
  case WorkerFormatter.SFProcessType.ProcessAssembly:
  wtBase = new CWorkerThreadAssembly(this, wfThread);
  break;
 default:
   throw new Exception("Unknown Processing Type");
  }
  // Add a call to the array
  aThreads.Insert(idx, wtBase);
}

Once all objects have been created, you can start them by calling the Start method of each thread object:
foreach(CWorkerThread cThread in aThreads)
 cThread.Start();

 The Stop, Pause and Continue methods perform similar operations in the foreach loop.

Stop method has the following garbage collection operations:
GC.SuppressFinalize(this);

  The Stop method will be called in the class destructor, so that the object can be terminated correctly without explicitly calling the Stop method

. If Stop is called method, there will be no need to parse the
constructor. The SuppressFinalize method prevents calling the object's Finalize method (the actual implementation of the constructor).

CWorkerThread abstract class

 CWorkerThread is a class composed of CWorkerThreadAppSpecifc, CWorkerThread

RoundRobin and Abstract class inherited by CWorkerThreadAssembly. No matter how you handle the message, most of the processing of the queue is the same, so the CWorkerThread class provides this functionality.

This class provides abstract methods (which must be replaced by real methods) to manage resources and handle messages.

 The work of the class is once again accomplished through the Start, Stop, Pause and Continue methods.
The input and error queues are referenced in the Start method. exist In the .NET framework, messages are handled by the System.
Messaging namespace:
// Try to open the queue and set the default read and write properties

MessageQueue mqInput = new MessageQueue(sInputQueue);

mqInput.MessageReadPropertyFilter.Body = true;
mqInput.MessageReadPropertyFilter.AppSpecific = true;
MessageQueue mqError = new MessageQueue(sErrorQueue);
// If using MSMQ COM, set the formatter to ActiveX
mqInput.Formatter = new ActiveXMessageFormatter();
mqError.Formatter = new ActiveXMessageFormatter();

Once the message queue reference is defined, a thread is created for the actual processing function
(called ProcessMessages). In the .NET Framework, use System.Threading
namespace makes it easy to implement threading:
procMessage = new Thread(new ThreadStart(ProcessMessages));

procMessage.Start();


 The ProcessMessages function is a processing loop based on Boolean values. When the value is set to
False, the processing loop will terminate. Therefore, the thread object's Stop The method only sets this Boolean
value, then closes the open message queue, and joins the thread with the main thread:
// Join the service thread and processing thread

bRun = false;

procMessage.Join();
// Close the open message queue
mqInput.Close();
mqError.Close();

Pause method only sets a Boolean value to make the processing thread sleep for half a second:

if (bPause)
Thread.Sleep(500);

Finally, each Start, Stop, Pause and Continue methods will call abstract

OnStart, OnStop, OnPause and OnContinue method. These abstract methods provide hooks for classes implementing

to capture and release required resources.

 The ProcessMessages loop has the following basic structure:

●Receive Message.
● If the Message has a successful Receive, the abstract ProcessMessage method is called.
●If Receive or ProcessMessage fails, send the Message to the error queue.

Message mInput;
try
{
  // Read from the queue and wait for 1 second
  mInput = mqInput.Receive(new TimeSpan(0,0,0,1));
}
catch (MessageQueueException mqe)
{
  // Set the message to null
  mInput = null;
  // Check the error code to see if it timed out
  if (mqe.ErrorCode != (-1072824293) ) //0xC00E001B
  {
   // If it does not time out, issue an error and log the error number
  LogError("Error: " + mqe.Message);
  throw mqe;
  }
}
if (mInput != null)
{
  // Get a message to be processed and call the message processing abstract method
  try
  {
 ProcessMessage(mInput);
  }
  // Capture errors with known exception status
  catch (CWorkerThreadException ex)
  {
 ProcessError(mInput, ex.Terminate);
  }
  // Catch unknown exceptions and call Terminate
  catch
  {
  ProcessError(mInput, true);
 }
}

 The ProcessError method sends the error message to the error queue. In addition, it may also lead to
Send an exception to terminate the thread. It will do this if the ProcessMessage method throws a termination error or CWorker
ThreadException type.

CworkerThread exported class

Any class that inherits from CWorkerThread must provide OnStart, OnStop, On
Pause, OnContinue and ProcessMessage method. The OnStart and OnStop methods acquire and release processing resources. OnPause and OnContinue Methods allow temporary release and reacquisition of these resources. The ProcessMessage method should process the message and raise
when a failure event occurs CWorkerThreadException exception.

Since the CWorkerThread constructor defines runtime parameters, the derived class must call the base class

Constructor:

public CWorkerThreadDerived(CWorker v_cParent, WorkerThread
Formatter v_wfThread)
  : base (v_cParent, v_wfThread) {}

  The exported class provides two types of processing: sending the message to another queue, or calling the component method. Both implementations of receiving and sending messages use looping techniques or application offsets (keeping

Leave in the message AppSpecific property) as a deciding factor in which queue to use. The configuration file in this scenario

should include a list of queue paths. Implemented OnStart and The OnStop method
should open and close references to these queues:
iQueues = wfThread.OutputName.Length;
mqOutput = new MessageQueue[iQueues];
for (int idx=0; idx〈iQueues; idx++)
{
  mqOutput[idx] = new MessageQueue(wfThread.OutputName[idx]);
  mqOutput[idx].Formatter = new ActiveXMessageFormatter();
}

In these scenarios, message processing is simple: send the message to the necessary output queue. In the case of
loop, this process is:

try

{
  mqOutput[iNextQueue].Send(v_mInput);
}
catch (Exception ex)
{
  // Forced termination exception if error
  throw new CWorkerThreadException(ex.Message, true);
}
// Calculate the next queue number
iNextQueue++;
iNextQueue %= iQueues;

  The latter method of calling a component with message parameters is more interesting. ProcessMessage The
method uses the IWebMessage interface to call a .NET component. The OnStart and OnStop methods obtain and release the reference to this component.

 The configuration file in this solution should contain two items: the complete class name and the
location of the file where the class is located. According to the definition in the IWebMessage interface, call it on the component Process method.

  To obtain an object reference, you need to use the Activator.CreateInstance method. this letter

The number requires an assembly type. Here it is exported from the assembly file path and class name.
Once the object reference is obtained, it will be put into the appropriate interface:

private IWebMessage iwmSample;

private string sFilePath, sTypeName;
// Save assembly path and type name
sFilePath = wfThread.OutputName[0];
sTypeName = wfThread.OutputName[1];
// Get a reference to the necessary object
Assembly asmSample = Assembly.LoadFrom(sFilePath);
Type typSample = asmSample.GetType(sTypeName);
object objSample = Activator.CreateInstance(typSample);
// Define the necessary interface for the object
iwmSample = (IWebMessage)objSample;

 After obtaining the object reference, the ProcessMessage method will call the
Process method on the IWebMessage interface:
WebMessageReturn wbrSample;
try
{
  // Define the parameters of method call
  string sLabel = v_mInput.Label;
  string sBody = (string)v_mInput.Body;
  int iAppSpecific = v_mInput.AppSpecific;
  // Call the method and capture the return code
  wbrSample = iwmSample.Process(sLabel, sBody, iAppSpecific);
}
catch (InvalidCastException ex)
{
  // If an error occurs in the message content, force a non-terminating exception to be issued
  throw new CWorkerThreadException(ex.Message, false);
}
catch (Exception ex)
{
  // If the assembly is called incorrectly, force a termination exception
  throw new CWorkerThreadException(ex.Message, true);
}
// If there is no error, check the return status of the object call
switch (wbrSample)
{
  case WebMessageReturn.ReturnBad:
  throw new CWorkerThreadException
   ("Unable to process message: Message marked bad", false);
  case WebMessageReturn.ReturnAbort:
  throw new CWorkerThreadException
   ("Unable to process message: Process terminating", true);
default:
 break;
}

  The provided sample component writes the message body to a database table. If a serious database error is captured
Error, you may want to terminate the process, but here, just mark the message as an error message
.

Since the class instance created in this example may acquire and retain expensive database resources, use the OnPause and OnContinue methods to release and reacquire object references.

Detection Devices

 Like in all good applications, detection devices are used to monitor the

status of the application. . NET Framework greatly simplifies integrating event logs, performance counters, and Windows management instrumentation (WMI ) into the application process. The messaging application uses time logs and performance counters, both from the System.Diagnostics assembly.

 In the ServiceBase class, you can automatically enable event logging. Additionally, the ServiceBase
EventLog member supports writing to application event logs:

EventLog.WriteEntry(sMyMessage, EventLogEntryType.Information);


For applications that write to the event log rather than the application log, it is easy to
create and get a reference to the EventLog resource (as described in the same as done in the CWorker class),

and be able to log a log entry using the WriteEntry method:

private EventLog cLog;
string sSource = ServiceControl.ServiceControlName;
string sLog = "application";
// Check if the source exists, if not, create the source
if (!EventLog.SourceExists(sSource))
  EventLog.CreateEventSource(sSource, sLog);
// Create a log object and reference the currently defined source
cLog = new EventLog();
cLog.Source = sSource;
// Write an entry in the log to indicate successful creation
cLog.WriteEntry("Created successfully", EventLogEntryType.Information);

 The .NET framework greatly simplifies performance counters. The messaging application provides counters for each processing thread, user of thread export
and the entire application to track messages.

The number of messages and the number of messages processed per second. To provide this functionality, you need to define a class of performance counters

and then increment the corresponding counter instance.

 The category of performance counters is defined in the service OnStart method. These categories represent two counters - the total number of messages and the number of messages processed per second:
CounterCreationData[] cdMessage = new CounterCreationData[2];

cdMessage[0] = new CounterCreationData("Messages/Total", "Total

Messages Processed",
PerformanceCounterType.NumberOfItems64);
cdMessage[1] = new CounterCreationData("Messages/Second",
"Messages Processed a Second",
PerformanceCounterType.RateOfChangePerSecond32);
PerformanceCounterCategory.Create("MSDN Message Service", "MSDN
Message Service Counters", cdMessage);

Once the performance counter category is defined, a PerformanceCounter object is created to access
Ask about counter instance function. A PerformanceCounter object requires a category, counter name, and an optional instance name. For the helper process, the process name from the xml file will be used, the code is as follows:
pcMsgTotWorker = new PerformanceCounter("MSDN Message Service",

"Messages/Total", sProcessName);

pcMsgSecWorker = new PerformanceCounter("MSDN Message Service",
"Messages/Second", sProcessName);
pcMsgTotWorker.RawValue = 0;
pcMsgSecWorker.RawValue = 0;

To increment the counter value, just call the appropriate method:

pcMsgTotWorker.IncrementBy(1);
pcMsgSecWorker.IncrementBy(1);

Finally, when the service is terminated, the installed performance counter category should be deleted from the system:

PerformanceCounterCategory.Delete("MSDN Message Service");

 Because performance counters work in the .NET Framework, a special service needs to be running.
This service (PerfCounterService) provides shared memory. Counter information is written to shared
memory and read by the performance counter system.

Installation

 Before we end, let’s briefly introduce installation and the
installation tool called installutil.exe. Since this application is Windows service, it must be installed using installutil.exe
. Therefore, one needs to use one from System.Configuration.Install Installer class inherited in assembly
:
public class ServiceRegister: Installer
{
  private ServiceInstaller serviceInstaller;
  private ServiceProcessInstaller processInstaller;
  public ServiceRegister()
  {
   // Create a service installer
 serviceInstaller = new ServiceInstaller();
 serviceInstaller.StartType = ServiceStart.Manual;
  serviceInstaller.ServiceName = ServiceControl.ServiceControl
  Name;
  serviceInstaller.DisplayName = ServiceControl.ServiceControl
  Desc;
  Installers.Add(serviceInstaller);
   // Create process installer
 processInstaller = new ServiceProcessInstaller();
 processInstaller.RunUnderSystemAccount = true;
  Installers.Add(processInstaller);
  }
}

  As shown in this sample class, for a Windows service, the service and the service process each require an installer to define the account under which the service runs. Other installers allow registering event logs and
Performance counters and other resources.

Summary

 As you can see from this .NET Framework application example, applications that were previously only possible for Visual C++

programmers can now be implemented using simple object-oriented programs. exhaust
Although our focus is on C#, the content described in this article also applies to Visual Basic and
Managed C++. The new .NET The framework enables developers to create powerful, scalable Windows applications and services using any programming language.

 The new .NET Framework not only simplifies and expands programming possibilities, but also makes it easy to integrate

often forgotten application instrumentation devices (such as performance monitoring counters and event log notifications)

Incorporated into the app. Although the application here does not use Windows Management Detection Devices
(WMI), the .NET Framework can apply it as well.

The above is the content of C# Message Queue Application-2. For more related articles, please pay attention to the PHP Chinese website (www.php.cn)!


Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn