Home  >  Article  >  Backend Development  >  C# message queue application -1

C# message queue application -1

黄舟
黄舟Original
2016-12-17 16:56:311385browse

Introduction

 Microsoft recently launched a new platform for generating integrated applications - Microsoft
.NET Framework. .NET The framework allows developers to quickly generate and deploy web
services and applications using any programming language. Microsoft Intermediate Language (MSIL) and real-time
(JIT) compiler enables this language-independent framework.

  At the same time as the .NET framework, there was also a new programming language C# (pronounced "C sharp").
C# is a simple, novel, object-oriented and type-safe programming language. Leveraging .NET frameworks
and C# (in addition to Microsoft? Visual Basic? and Managed C++), users
can write powerful Microsoft Windows? and web applications and services. This article
provides such a solution, which focuses on the .NET Framework and C# rather than the programming language
Words. An introduction to the C# language can be found in "C# Introduction and Overview (English)".

 The recent article "MSMQ: A Scalable, High-Availability Load Balancing Solution (English)"
Introduces a solution for scalable load balancing of high-availability message queues (MSMQ)
Solution architecture. This solution involves the development of a Windows service as a smart messaging router. Such a solution was previously available only from Microsoft Visual C++ can only be implemented by programmers, and the emergence of the .NET framework has changed this situation. You can see this in the solution
case below.

.NET Framework Application

 The solution introduced here is a Windows service used to handle several message queues;

Each queue is processed by multiple threads (receiving and processing messages). handler makes

Route messages from the destination queue list
table using round-robin techniques or application-specific values ​​(message AppSpecific properties), and use the message properties to call component methods. (The sample process also falls into this category
Situation. ) In the latter case, the requirement of the component is that it implements the given interface IWeb
Message To handle errors, the application needs to send messages that cannot be processed to the error queue.

 The Messaging application is structured similarly to the previous Active Template Library (ATL) application, it

The main difference between them is the encapsulation of the code used to manage the service and the use

of .NET Framework components. To create a Windows service, .NET Framework users simply need to create a slave Class inherited from ServiceBase
(from System.ServiceControl assembly). This is no surprise since the .NET
framework is object-oriented.

Application structure

 The main class in the application is ServiceControl, which inherits

from ServiceBase. Therefore, it must implement OnStart and OnStop method, and the optional OnPause and

OnContinue methods. In fact, the class is constructed inside the static method Main:

using System;

using System.ServicePRocess;


public class ServiceControl: ServiceBase

{

  // Main entry point for creating service objects
  public static void Main()
  {
  ServiceBase.Run(new ServiceControl());
  }

  // Constructor object that defines service parameters

  public ServiceControl()

  {
 CanPauseAndContinue = true;
 ServiceName = "MSDNMessageService";
 AutoLog = false;
  }

  protected override void OnStart(string[] args) {...}

  protected override void OnStop() {...}

  protected override void OnPause() {...}
protected override void OnContinue() {...}
}

 The ServiceControl class creates a series of CWorker objects, that is, created for each

message queue that needs to be processed An instance of the CWorker class. According to the number of threads required to process the queue in the definition, the CWorker class creates a series of CWorkerThread object. A processing thread created by the CWorkerThread

class will perform the actual service work.

 The main purpose of using the CWorker and CWorkerThread classes is to confirm the service controls Start,
Stop, Pause and Continue Order. Because these processes must be non-blocking, command operations will ultimately be executed on a background processing thread.

  CWorkerThread is an abstract class, which is divided into CWorkerThreadAppSpecific and
CWorkerThreadRoundRobin and CWorkerThreadAssembly inherit. These categories are not
Process messages in the same way. The first two classes process messages by sending them to another queue (the difference lies in the way the receiving queue path is determined), and the last class uses message attributes to

Use component method.


 The error handling within the .NET framework is based on the base class Exception. When the system raises or catches errors, these errors must originate from Classes exported in Exception. The CWorker
ThreadException class is one such implementation, by attaching extra properties (used to define
Whether the service should continue to run) to extend the base class.

 Finally, the application consists of two structures. These value types define
runtime parameters for worker processes or threads to simplify CWorker and The structure of the CWorkerThread object. Using a value type
structure (rather than a reference type class) ensures that these runtime parameters maintain values ​​(not
references).

IWebMessage interface

 One of the implementations of CWorkerThread is a class that calls component methods. This class named
CWorkerThreadAssembly uses The IWebMessage interface defines the contract between services and components.

 With the current version of Microsoft Visual Studio? Differently, C# interfaces can be explicitly defined in any

language without the need to create and compile IDL files. C# The
definition of the IWebMessage interface is as follows:
public interface IWebMessage
{
  WebMessageReturn Process(string sMessageLabel, string sMessage
 Body, int iAppSpecific);
  void Release();
}

The Process method in the ATL code is specified for processing messages. The return code of the Process method is defined as the enumeration type WebMessageReturn:


public enum WebMessageReturn

{

  ReturnGood,
  ReturnBad,
  ReturnAbort
}

 The definition of the enumeration is as follows: Good means to continue processing, Bad means to write the message to the error queue,

Abort means to terminate the processing. Release Methods provide a way for services to easily clear class instances.

Because the destructor of a class instance is only called during garbage collection, ensure that all classes that occupy
expensive resources (such as database connections) have a method that can be called before destruction,
This is a very good idea to release these resources.

Namespace

 Here is a brief introduction to namespaces. Namespaces allow applications to be organized into logical elements in both internal and external representations. All code within the service is contained in MSDNMessage

Service.Service namespace. Although the service code is contained in several files, users do not need to reference other files because they are contained in the same namespace.


Since the IWebMessage interface is included in the MSDNMessageService.Interface name
space, so thread classes that use this interface have an interface namespace.

Service class


 The purpose of the application is to monitor and process message queues. Each queue executes a different process when receiving a message. Applications are implemented as Windows services.

ServiceBase class

  As mentioned before, the basic structure of the service is the class inherited from ServiceBase. Important methods
include OnStart, OnStop, OnPause and OnContinue, each alternative method directly corresponds to a service control operation. The purpose of the OnStart method is to create a CWorker object,

And The CWorker class in turn creates a CWorkerThread object, and then creates a thread in this object that performs the service's work.

 The runtime configuration of the service (and the properties of the CWorker and CWorkerThread objects) is

maintained in an xml-based configuration file. It has the same name as the .exe file created, but
With a .cfg suffix. The configuration example is as follows:
〈?xml version="1.0"?〉
〈configuration〉
〈ProcessList〉

 〈ProcessDefinition

  ProcessName="Worker1"
 ProcessDesc="Message Worker with 2 Threads"
 ProcessType="AppSpecific"
 ProcessThreads="2"
  InputQueue=".private$test_load1"
  ErrorQueue=".private$test_error"〉
 〈OutputList〉
  〈OutputDefinition OutputName=".private$test_out11" /〉
  〈OutputDefinition OutputName=".private$test_out12" /〉
 〈/OutputList〉
 〈/ProcessDefinition〉
 〈ProcessDefinition
  ProcessName="Worker2"
  ProcessDesc="Assembly Worker with 1 Thread"
 ProcessType="Assembly"
 ProcessThreads="1"
  InputQueue=".private$test_load2"
  ErrorQueue=".private$test_error"〉
 〈OutputList〉
  OutputDefinition OutputName="C:MSDNMessageServiceMessage
  Example.dll" /〉
  OutputDefinition OutputName="MSDNMessageService.Message
 Sample.ExampleClass"/〉
 〈/OutputList〉
 〈/ProcessDefinition〉
〈/ProcessList〉
〈/configuration〉

Access to this information is managed through the Config
Manager class from the System.Configuration assembly. static The Get method returns a collection of information that will be enumerated to obtain individual properties. The settings of these property sets determine the runtime characteristics of the helper object. In addition to this
a configuration file, you should also create definitions The metafile of the XML file structure and references the
metafile located in the server machine.cfg configuration file:

〈?xml version ="1.0"?〉

〈MetaData xmlns="x-schema:CatMeta. xms"〉
  DatabaseMeta InternalName="MessageService"〉
  ServerWiring Interceptor="Core_XMLInterceptor"/〉
 〈Collection
  InternalName="Process" PublicName="ProcessList"
  PublicRowName="ProcessDefinition"
  SchemaGeneratorFlags="EMITXMLSCHEMA"〉
  Property InternalName="ProcessName" Type="String" Meta
 Flags="PRIMARYKEY" /〉
  〈Property InternalName="ProcessDesc" Type="String" /〉
   Value="RoundRobin" 〉
   〈Enum InternalName="RoundRobin" Value="0"/〉
   〈Enum InternalName="AppSpecific" Value="1"/〉
   Enum InternalName="Assembly" Value="2"/〉
  〈/Property〉
  Property InternalName="ProcessThreads" Type="Int32"
 DefaultValue="1" /〉
  Property InternalName="InputQueue" Type="String" /〉
  Property InternalName="ErrorQueue" Type="String" /〉
  Property InternalName="OutputName" Type="String" /〉
  〈QueryMeta InternalName="All" MetaFlags="ALL" /〉
  〈QueryMeta InternalName="QueryByFile" CellName="__FILE"
 Operator="EQUAL" /〉
 〈/Collection〉
 〈Collection
 InternalName="Output" PublicName="OutputList"
  PublicRowName="OutputDefinition"
  SchemaGeneratorFlags="EMITXMLSCHEMA"〉
  Property InternalName="ProcessName" Type="String" Meta
 Flags="PRIMARYKEY" /〉
  Property InternalName="OutputName" Type="String" Meta
 Flags="PRIMARYKEY" /〉
  QueryMeta InternalName="All" MetaFlags="ALL" /〉
  〈QueryMeta InternalName="QueryByFile" CellName="__FILE"
 Operator="EQUAL" /〉
 〈/Collection〉
 〈/DatabaseMeta〉
 〈RelationMeta 
 PrimaryTable="Process" PrimaryColumns="ProcessName"
 ForeignTable="Output" ForeignColumns="ProcessName"
  MetaFlags="USECONTAINMENT"/〉
〈/MetaData〉

 Since the Service class must maintain a list of created auxiliary objects, it uses the

Hashtable collection to hold the name of the type object/ List of numeric pairs. Hashtable does not only support enumerations, but also allows querying values ​​by keywords. In the application, the XML process name
is the only keyword:
private Hashtable htWorkers = new Hashtable();
IConfigCollection cWorkers = ConfigManager.Get("ProcessList", new
AppDomainSelector());
foreach (IConfigItem ciWorker in cWorkers)
{
  WorkerFormatter sfWorker = new WorkerFormatter();
  sfWorker.ProcessName = (string)ciWorker["ProcessName"];
  sfWorker.ProcessDesc = (string)ciWorker["ProcessDesc"];
  sfWorker.NumberThreads = (int)ciWorker["ProcessThreads"];
  sfWorker.InputQueue = (string)ciWorker["InputQueue"];
  sfWorker.ErrorQueue = (string)ciWorker["ErrorQueue"];

 // 计算并定义进程类型
  switch ((int)ciWorker["ProcessType"])
  {
   case 0:
     sfWorker.ProcessType = WorkerFormatter.SFProcessType.
     ProcessRoundRobin;
     break;
   case 1:
     sfWorker.ProcessType = WorkerFormatter.SFProcessType.
     ProcessAppSpecific;
     break;
   case 2:
     sfWorker.ProcessType = WorkerFormatter.SFProcessType.
     ProcessAssembly;
     break;
   default:
     throw new Exception("Unknown Processing Type");
  }
  // 执行更多的工作以读取输出信息
  string sProcessName = (string)ciWorker["ProcessName"];
  if (htWorkers.ContainsKey(sProcessName))
   throw new ArgumentException("Process Name Must be Unique: "
   + sProcessName);
  htWorkers.Add(sProcessName, new CWorker(sfWorker));
}

  在这段代码中没有包含的主要信息是输出数据的获取。每一个进程定
义中都有一组相应的输出定义项。该信息是通过如下的简单查询读取的:

string sQuery = "SELECT * FROM OutputList WHERE ProcessName=" +
  sfWorker.ProcessName + " AND Selector=appdomain://";
ConfigQuery QQuery = new ConfigQuery(sQuery);
IConfigCollection cOutputs = ConfigManager.Get("OutputList",
qQuery);
int iSize = cOutputs.Count, iLoop = 0;
sfWorker.OutputName = new string[iSize];
foreach (IConfigItem ciOutput in cOutputs)
  sfWorker.OutputName[iLoop++] = (string)ciOutput["OutputName"];

  CWorkerThread 和 Cworker类都有相应的服务控制方法,根据服务控
制操作进行调用。由于 Hashtable中引用了每一个 CWorker对象,因此需
要枚举 Hashtable的内容,以调用适当的服务控制方法:
foreach (CWorker cWorker in htWorkers.Values)
  cWorker.Start();

  类似地,实现的 OnPause、OnContinue和 OnStop 方法是通过调用
CWorker 对象上的相应方法来执行操作的。

CWorker 类

  CWorker 类的主要功能是创建和管理 CWorkerThread对象。Start 、
Stop、Pause 和 Continue 方法调用相应的 CWorkerThread方法。实际的
CWorkerThread 对象是在Start 方法中创建的。与使用 Hashtable管理辅
助对象引用的 Service类相似,CWorker 使用 ArrayList(简单的动态数
组)来维护线程对象的列表。 

 以上就是C#消息队列应用程序 -1的内容,更多相关文章请关注PHP中文网(www.php.cn)! 


Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact [email protected]
Previous article:C# GDI+ Programming (1)Next article:C# GDI+ Programming (1)