Home >Backend Development >C#.Net Tutorial >C# Comprehensive Revealed - Multi-threading in Detail (Part 2)
This article mainly introduces the development of multi-threading from the basic usage of threads, the development of worker threads and I/O threads in the CLR thread pool, and the parallel operation of PLINQ.
Among them, the delegated BeginInvoke method and callback function are the most commonly used.
The I/O thread may be easily ignored by everyone. In fact, when developing a multi-threaded system, you should pay more attention to the operation of the I/O thread. Especially in ASP.NET development, more people may only pay attention to using Ajax on the client side or using UpdatePanel on the server side. In fact, rational use of I/O threads can minimize the pressure on IIS when communicating projects or file downloads.
Parallel programming is an asynchronous operation method that is strongly promoted in Framework 4.0, and it is worth learning more deeply.
I hope this article can be helpful to your study and research. Please comment on any mistakes or omissions in it.
5. I/O threads of CLR thread pool
The threads introduced in the previous section all belong to the worker threads of the CLR thread pool. In this section, I will introduce to you the I/O threads of the CLR thread pool
The I/O thread is a thread set up by .NET specifically for accessing external resources. Because accessing external resources is often affected by external factors, in order to prevent the main thread from being affected and blocked for a long time, .NET provides multiple I/O threads for multiple I/O threads. /O operations have established asynchronous methods, such as: FileStream, TCP/IP, WebRequest, WebService, etc., and the usage of each asynchronous method is very similar, starting with BeginXXX and ending with EndXXX. Here is what you need to know Explain one by one.
5.1 Asynchronous reading and writing of FileStream
You need to call the I/O thread asynchronously in FileStream. You must use the following constructor to create a FileStream object and set useAsync to true.
FileStream stream = new FileStream ( string path, FileMode mode, FileAccess access, FileShare share, int bufferSize,bool useAsync ) ;
where path is the relative path or absolute path of the file; mode determines how to open or create the file; access determines The way to access the file; share determines how the file is shared between processes; bufferSize represents the buffer size, and the default minimum value is generally 8. When asynchronous reading or writing is started, the file size is generally larger than the buffer size; userAsync represents whether to start asynchronous I/ O thread.
Note: When using the BeginRead and BeginWrite methods, it works better when performing a large amount of reading or writing, but for a small amount of reading/writing, these methods may be slower than synchronous reading because it takes a lot of time to switch between threads. .
5.1.1 Asynchronous writing
FileStream contains BeginWrite and EndWrite methods that can start the I/O thread for asynchronous writing.
public override IAsyncResult BeginWrite (byte[] array, int offset, int numBytes, AsyncCallback userCallback, Object stateObject)
public override void EndWrite (IAsyncResult asyncResult)
BeginWrite return value is IAsyncResult, usage method is the same Delegated BeginInvoke method Similarly, it is best to use callback functions to avoid thread blocking. Among the last two parameters, the parameter AsyncCallback is used to bind the callback function; the parameter Object is used to pass external data. One thing to note: the callback function bound to AsyncCallback must be a non-returning method with a single IAsyncResult parameter.
In the example, the FileStream is passed as external data to the callback function, and then IAsyncResult.AsyncState is used to obtain the FileStream object in the callback function, and finally the writing is completed through FileStream.EndWrite (IAsyncResult).
class Program { static void Main(string[] args) { //把线程池的最大值设置为1000 ThreadPool.SetMaxThreads(1000, 1000); ThreadPoolMessage("Start"); //新立文件File.sour FileStream stream = new FileStream("File.sour", FileMode.OpenOrCreate, FileAccess.ReadWrite,FileShare.ReadWrite,1024,true); byte[] bytes = new byte[16384]; string message = "An operating-system ThreadId has no fixed relationship........"; bytes = Encoding.Unicode.GetBytes(message); //启动异步写入 stream.BeginWrite(bytes, 0, (int)bytes.Length,new AsyncCallback(Callback),stream); stream.Flush(); Console.ReadKey(); } static void Callback(IAsyncResult result) { //显示线程池现状 Thread.Sleep(200); ThreadPoolMessage("AsyncCallback"); //结束异步写入 FileStream stream = (FileStream)result.AsyncState; stream.EndWrite(result); stream.Close(); } //显示线程池现状 static void ThreadPoolMessage(string data) { int a, b; ThreadPool.GetAvailableThreads(out a, out b); string message = string.Format("{0}\n CurrentThreadId is {1}\n "+ "WorkerThreads is:{2} CompletionPortThreads is :{3}", data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); Console.WriteLine(message); } }
As you can see from the output, after using the FileStream.BeginWrite method, the system will automatically start the I/O thread in the CLR thread pool.
5.1.2 Asynchronous reading
FileStream includes BeginRead and EndRead, which can call the I/O thread asynchronously for reading.
public override IAsyncResult BeginRead (byte[] array,int offset,int numBytes, AsyncCallback userCallback,Object stateObject)
public override int EndRead(IAsyncResult asyncResult)
Its usage is similar to BeginWrite and EndWrite, AsyncCallback is used to bind Defined callback function; Object is used to pass external data. In the callback function, you only need to use IAsyncResut.AsyncState to obtain external data. The EndWrite method returns the number of bytes read from the stream.
First define the FileData class, which contains FileStream object, byte[] array and length. Then pass the FileData object as external data to the callback function. In the callback function, cast IAsyncResult.AsyncState to FileData, and then end the reading through FileStream.EndRead (IAsyncResult). Finally, compare the length. If the read length is different from the input data length, an exception will be thrown.
class Program { public class FileData { public FileStream Stream; public int Length; public byte[] ByteData; } static void Main(string[] args) { //把线程池的最大值设置为1000 ThreadPool.SetMaxThreads(1000, 1000); ThreadPoolMessage("Start"); ReadFile(); Console.ReadKey(); } static void ReadFile() { byte[] byteData=new byte[80961024]; FileStream stream = new FileStream("File1.sour", FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite, 1024, true); //把FileStream对象,byte[]对象,长度等有关数据绑定到FileData对象中,以附带属性方式送到回调函数 FileData fileData = new FileData(); fileData.Stream = stream; fileData.Length = (int)stream.Length; fileData.ByteData = byteData; //启动异步读取 stream.BeginRead(byteData, 0, fileData.Length, new AsyncCallback(Completed), fileData); } static void Completed(IAsyncResult result) { ThreadPoolMessage("Completed"); //把AsyncResult.AsyncState转换为FileData对象,以FileStream.EndRead完成异步读取 FileData fileData = (FileData)result.AsyncState; int length=fileData.Stream.EndRead(result); fileData.Stream.Close(); //如果读取到的长度与输入长度不一致,则抛出异常 if (length != fileData.Length) throw new Exception("Stream is not complete!"); string data=Encoding.ASCII.GetString(fileData.ByteData, 0, fileData.Length); Console.WriteLine(data.Substring(2,22)); } //显示线程池现状 static void ThreadPoolMessage(string data) { int a, b; ThreadPool.GetAvailableThreads(out a, out b); string message = string.Format("{0}\n CurrentThreadId is {1}\n "+ "WorkerThreads is:{2} CompletionPortThreads is :{3}", data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); Console.WriteLine(message); } }
As you can see from the output, after using the FileStream.BeginRead method, the system will automatically start the I/O thread in the CLR thread pool.
5.2 异步操作TCP/IP套接字
在介绍 TCP/IP 套接字前先简单介绍一下 NetworkStream 类,它是用于网络访问的基础数据流。 NetworkStream 提供了好几个方法控制套接字数据的发送与接收, 其中BeginRead、EndRead、BeginWrite、EndWrite 能够实现异步操作,而且异步线程是来自于CLR线程池的I/O线程。
public override int ReadByte ()
public override int Read (byte[] buffer,int offset, int size)
public override void WriteByte (byte value)
public override void Write (byte[] buffer,int offset, int size)
public override IAsyncResult BeginRead (byte [] buffer, int offset, int size, AsyncCallback callback, Object state )
public override int EndRead(IAsyncResult result)
public override IAsyncResult BeginWrite (byte [] buffer, int offset, int size, AsyncCallback callback, Object state )
public override void EndWrite(IAsyncResult result)
若要创建 NetworkStream,必须提供已连接的 Socket。而在.NET中使用TCP/IP套接字不需要直接与Socket打交道,因为.NET把Socket的大部分操作都放在System.Net.TcpListener和System.Net.Sockets.TcpClient里面,这两个类大大地简化了Socket的操作。一般套接字对象Socket包含一个Accept()方法,此方法能产生阻塞来等待客户端的请求,而在TcpListener类里也包含了一个相似的方法 public TcpClient AcceptTcpClient()用于等待客户端的请求。此方法将会返回一个TcpClient 对象,通过 TcpClient 的 public NetworkStream GetStream()方法就能获取NetworkStream对象,控制套接字数据的发送与接收。
首先在服务器端建立默认地址127.0.0.1用于收发信息,使用此地址与端口500新建TcpListener对象,调用TcpListener.Start 侦听传入的连接请求,再使用一个死循环来监听信息。
在ChatClient类包括有接收信息与发送信息两个功能:当接收到客户端请求时,它会利用 NetworkStream.BeginRead 读取客户端信息,并在回调函数ReceiveAsyncCallback中输出信息内容,若接收到的信息的大小小于1时,它将会抛出一个异常。当信息成功接收后,再使用 NetworkStream.BeginWrite 方法回馈信息到客户端
class Program { static void Main(string[] args) { //设置CLR线程池最大线程数 ThreadPool.SetMaxThreads(1000, 1000); //默认地址为127.0.0.1 IPAddress ipAddress = IPAddress.Parse(""); TcpListener tcpListener = new TcpListener(ipAddress, 500); tcpListener.Start(); //以一个死循环来实现监听 while (true) { //调用一个ChatClient对象来实现监听 ChatClient chatClient = new ChatClient(tcpListener.AcceptTcpClient()); } } } public class ChatClient { static TcpClient tcpClient; static byte[] byteMessage; static string clientEndPoint; public ChatClient(TcpClient tcpClient1) { tcpClient = tcpClient1; byteMessage = new byte[tcpClient.ReceiveBufferSize]; //显示客户端信息 clientEndPoint = tcpClient.Client.RemoteEndPoint.ToString(); Console.WriteLine("Client's endpoint is " + clientEndPoint); //使用NetworkStream.BeginRead异步读取信息 NetworkStream networkStream = tcpClient.GetStream(); networkStream.BeginRead(byteMessage, 0, tcpClient.ReceiveBufferSize, new AsyncCallback(ReceiveAsyncCallback), null); } public void ReceiveAsyncCallback(IAsyncResult iAsyncResult) { //显示CLR线程池状态 Thread.Sleep(100); ThreadPoolMessage("\nMessage is receiving"); //使用NetworkStream.EndRead结束异步读取 NetworkStream networkStreamRead = tcpClient.GetStream(); int length=networkStreamRead.EndRead(iAsyncResult); //如果接收到的数据长度少于1则抛出异常 if (length < 1) { tcpClient.GetStream().Close(); throw new Exception("Disconnection!"); } //显示接收信息 string message = Encoding.UTF8.GetString(byteMessage, 0, length); Console.WriteLine("Message:" + message); //使用NetworkStream.BeginWrite异步发送信息 byte[] sendMessage = Encoding.UTF8.GetBytes("Message is received!"); NetworkStream networkStreamWrite=tcpClient.GetStream(); networkStreamWrite.BeginWrite(sendMessage, 0, sendMessage.Length, new AsyncCallback(SendAsyncCallback), null); } //把信息转换成二进制数据,然后发送到客户端 public void SendAsyncCallback(IAsyncResult iAsyncResult) { //显示CLR线程池状态 Thread.Sleep(100); ThreadPoolMessage("\nMessage is sending"); //使用NetworkStream.EndWrite结束异步发送 tcpClient.GetStream().EndWrite(iAsyncResult); //重新监听 tcpClient.GetStream().BeginRead(byteMessage, 0, tcpClient.ReceiveBufferSize, new AsyncCallback(ReceiveAsyncCallback), null); } //显示线程池现状 static void ThreadPoolMessage(string data) { int a, b; ThreadPool.GetAvailableThreads(out a, out b); string message = string.Format("{0}\n CurrentThreadId is {1}\n " + "WorkerThreads is:{2} CompletionPortThreads is :{3}\n", data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); Console.WriteLine(message); } }
static void Main(string[] args) { //连接服务端 TcpClient tcpClient = new TcpClient("", 500); //发送信息 NetworkStream networkStream = tcpClient.GetStream(); byte[] sendMessage = Encoding.UTF8.GetBytes("Client request connection!"); networkStream.Write(sendMessage, 0, sendMessage.Length); networkStream.Flush(); //接收信息 byte[] receiveMessage=new byte[1024]; int count=networkStream.Read(receiveMessage, 0,1024); Console.WriteLine(Encoding.UTF8.GetString(receiveMessage)); Console.ReadKey(); }
5.3 异步WebRequest
System.Net.WebRequest 是 .NET 为实现访问 Internet 的 “请求/响应模型” 而开发的一个 abstract 基类, 它主要有三个子类:FtpWebRequest、HttpWebRequest、FileWebRequest。当使用WebRequest.Create(string uri)创建对象时,应用程序就可以根据请求协议判断实现类来进行操作。FileWebRequest、FtpWebRequest、HttpWebRequest 各有其作用:FileWebRequest 使用 “file://路径” 的URI方式实现对本地资源和内部文件的请求/响应、FtpWebRequest 使用FTP文件传输协议实现文件请求/响应、HttpWebRequest 用于处理HTTP的页面请求/响应。由于使用方法相类似,下面就以常用的HttpWebRequest为例子介绍一下异步WebRequest的使用方法。
在使用ASP.NET开发网站的时候,往往会忽略了HttpWebRequest的使用,因为开发都假设客户端是使用浏览器等工具去阅读页面的。但如果你对REST开发方式有所了解,那对 HttpWebRequest 就应该非常熟悉。它可以在路径参数、头文件、页面主体、Cookie 等多处地方加入请求条件,然后对回复数据进行适当处理。HttpWebRequest 包含有以下几个常用方法用于处理请求/响应:
public override Stream GetRequestStream ()
public override WebResponse GetResponse ()
public override IAsyncResult BeginGetRequestStream ( AsyncCallback callback, Object state )
public override Stream EndGetRequestStream ( IAsyncResult asyncResult )
public override IAsyncResult BeginGetResponse ( AsyncCallback callback, Object state )
public override WebResponse EndGetResponse ( IAsyncResult asyncResult )
其中BeginGetRequestStream、EndGetRequestStream 用于异步向HttpWebRequest对象写入请求信息; BeginGetResponse、EndGetResponse 用于异步发送页面请求并获取返回信息。使用异步方式操作Internet的“请求/响应”,避免主线程长期处于等待状态,而操作期间异步线程是来自CLR线程池的I/O线程。
首先为Person类加上可序列化特性,在服务器端建立Hanlder.ashx,通过Request.InputStream 获取到请求数据并把数据转化为String对象,此实例中数据是以 “Id:1” 的形式实现传送的。然后根据Id查找对应的Person对象,并把Person对象写入Response.OutStream 中返还到客户端。
在客户端先把 HttpWebRequird.Method 设置为 "post",使用异步方式通过BeginGetRequireStream获取请求数据流,然后写入请求数据 “Id:1”。再使用异步方法BeginGetResponse 获取回复数据,最后把数据反序列化为Person对象显示出来。
注意:HttpWebRequire.Method默认为get,在写入请求前必须把HttpWebRequire.Method设置为post,否则在使用BeginGetRequireStream 获取请求数据流的时候,系统就会发出 “无法发送具有此谓词类型的内容正文" 的异常。
namespace Model { [Serializable] public class Person { public int ID { get; set; } public string Name { get; set; } public int Age { get; set; } } }
public class Handler : IHttpHandler { public void ProcessRequest(HttpContext context) { //把信息转换为String,找出输入条件Id byte[] bytes=new byte[1024]; int length=context.Request.InputStream.Read(bytes,0,1024); string condition = Encoding.Default.GetString(bytes); int id = int.Parse(condition.Split(new string[] { ":" }, StringSplitOptions.RemoveEmptyEntries)[1]); //根据Id查找对应Person对象 var person = GetPersonList().Where(x => x.ID == id).First(); //所Person格式化为二进制数据写入OutputStream BinaryFormatter formatter = new BinaryFormatter(); formatter.Serialize(context.Response.OutputStream, person); } //模拟源数据 private IList<Person> GetPersonList() { var personList = new List<Person>(); var person1 = new Person(); person1.ID = 1; person1.Name = "Leslie"; person1.Age = 30; personList.Add(person1); ........... return personList; } public bool IsReusable { get { return true;} } }
class Program { static void Main(string[] args) { ThreadPool.SetMaxThreads(1000, 1000); Request(); Console.ReadKey(); } static void Request() { ThreadPoolMessage("Start"); //使用WebRequest.Create方法建立HttpWebRequest对象 HttpWebRequest webRequest = (HttpWebRequest)WebRequest.Create( "http://localhost:5700/Handler.ashx"); webRequest.Method = "post"; //对写入数据的RequestStream对象进行异步请求 IAsyncResult result=webRequest.BeginGetRequestStream( new AsyncCallback(EndGetRequestStream),webRequest); } static void EndGetRequestStream(IAsyncResult result) { ThreadPoolMessage("RequestStream Complete"); //获取RequestStream HttpWebRequest webRequest = (HttpWebRequest)result.AsyncState; Stream stream=webRequest.EndGetRequestStream(result); //写入请求条件 byte[] condition = Encoding.Default.GetBytes("Id:1"); stream.Write(condition, 0, condition.Length); //异步接收回传信息 IAsyncResult responseResult = webRequest.BeginGetResponse( new AsyncCallback(EndGetResponse), webRequest); } static void EndGetResponse(IAsyncResult result) { //显出线程池现状 ThreadPoolMessage("GetResponse Complete"); //结束异步请求,获取结果 HttpWebRequest webRequest = (HttpWebRequest)result.AsyncState; WebResponse webResponse = webRequest.EndGetResponse(result); //把输出结果转化为Person对象 Stream stream = webResponse.GetResponseStream(); BinaryFormatter formatter = new BinaryFormatter(); var person=(Person)formatter.Deserialize(stream); Console.WriteLine(string.Format("Person Id:{0} Name:{1} Age:{2}", person.ID, person.Name, person.Age)); } //显示线程池现状 static void ThreadPoolMessage(string data) { int a, b; ThreadPool.GetAvailableThreads(out a, out b); string message = string.Format("{0}\n CurrentThreadId is {1}\n " + "WorkerThreads is:{2} CompletionPortThreads is :{3}\n", data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); Console.WriteLine(message); } }
5.4 异步调用WebService
相比TCP/IP套接字,在使用WebService的时候,服务器端需要更复杂的操作处理,使用时间往往会更长。为了避免客户端长期处于等待状态,在配置服务引用时选择 “生成异步操作”,系统可以自动建立异步调用的方式。
以.NET 2.0以前,系统都是使用ASMX来设计WebService,而近年来WCF可说是火热登场,下面就以WCF为例子简单介绍一下异步调用WebService的例子。
由于系统可以自动生成异步方法,使用起来非常简单,首先在服务器端建立服务ExampleService,里面包含方法Method。客户端引用此服务时,选择 “生成异步操作”。然后使用 BeginMethod 启动异步方法, 在回调函数中调用EndMethod结束异步调用。
[ServiceContract] public interface IExampleService { [OperationContract] string Method(string name); } public class ExampleService : IExampleService { public string Method(string name) { return "Hello " + name; } } class Program { static void Main(string[] args) { ServiceHost host = new ServiceHost(typeof(ExampleService)); host.Open(); Console.ReadKey(); host.Close(); } } <configuration> <system.serviceModel> <services> <service name="Example.ExampleService"> <endpoint address="" binding="wsHttpBinding" contract="Example.IExampleService"> <identity> <dns value="localhost" /> </identity> </endpoint> <endpoint address="mex" binding="mexHttpBinding" contract="IMetadataExchange" /> <host> <baseAddresses> <add baseAddress="http://localhost:7200/Example/ExampleService/" /> </baseAddresses> </host> </service> </services> </system.serviceModel> </configuration>
class Program { static void Main(string[] args) { //设置最大线程数 ThreadPool.SetMaxThreads(1000, 1000); ThreadPoolMessage("Start"); //建立服务对象,异步调用服务方法 ExampleServiceReference.ExampleServiceClient exampleService = new ExampleServiceReference.ExampleServiceClient(); exampleService.BeginMethod("Leslie",new AsyncCallback(AsyncCallbackMethod), exampleService); Console.ReadKey(); } static void AsyncCallbackMethod(IAsyncResult result) { Thread.Sleep(1000); ThreadPoolMessage("Complete"); ExampleServiceReference.ExampleServiceClient example = (ExampleServiceReference.ExampleServiceClient)result.AsyncState; string data=example.EndMethod(result); Console.WriteLine(data); } //显示线程池现状 static void ThreadPoolMessage(string data) { int a, b; ThreadPool.GetAvailableThreads(out a, out b); string message = string.Format("{0}\n CurrentThreadId is {1}\n " + "WorkerThreads is:{2} CompletionPortThreads is :{3}\n", data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); Console.WriteLine(message); } } <configuration> <system.serviceModel> <bindings> <wsHttpBinding> <binding name="WSHttpBinding_IExampleService" closeTimeout="00:01:00" openTimeout="00:01:00" receiveTimeout="00:10:00" sendTimeout="00:01:00" bypassProxyOnLocal="false" transactionFlow="false" hostNameComparisonMode="StrongWildcard" maxBufferPoolSize="524288" maxReceivedMessageSize="65536" messageEncoding="Text" textEncoding="utf-8" useDefaultWebProxy="true" allowCookies="false"> <readerQuotas maxDepth="32" maxStringContentLength="8192" maxArrayLength="16384" maxBytesPerRead="4096" maxNameTableCharCount="16384" /> <reliableSession ordered="true" inactivityTimeout="00:10:00" enabled="false" /> <security mode="Message"> <transport clientCredentialType="Windows" proxyCredentialType="None" realm="" /> <message clientCredentialType="Windows" negotiateServiceCredential="true" algorithmSuite="Default" /> </security> </binding> </wsHttpBinding> </bindings> <client> <endpoint address="http://localhost:7200/Example/ExampleService/" binding="wsHttpBinding" bindingConfiguration="WSHttpBinding_IExampleService" contract="ExampleServiceReference.IExampleService" name="WSHttpBinding_IExampleService"> <identity> <dns value="localhost" /> </identity> </endpoint> </client> </system.serviceModel> </configuration>
六、异步 SqlCommand
从ADO.NET 2.0开始,SqlCommand就新增了几个异步方法执行SQL命令。相对于同步执行方式,它使主线程不需要等待数据库的返回结果,在使用复杂性查询或批量插入时将有效提高主线程的效率。使用异步SqlCommand的时候,请注意把ConnectionString 的 Asynchronous Processing 设置为 true 。
public IAsyncResult BeginExecuteNonQuery (......)
public int EndExecuteNonQuery(IAsyncResult)
public IAsyncResult BeginExecuteReader(......)
public SqlDataReader EndExecuteReader(IAsyncResult)
public IAsyncResult BeginExecuteXmlReader (......)
public XmlReader EndExecuteXmlReader(IAsyncResult)
由于使用方式相似,此处就以 BeginExecuteNonQuery 为例子,介绍一下异步SqlCommand的使用。首先建立connectionString,注意把Asynchronous Processing设置为true来启动异步命令,然后把SqlCommand.CommandText设置为 WAITFOR DELAY "0:0:3" 来虚拟数据库操作。再通过BeginExecuteNonQuery启动异步操作,利用轮询方式监测操作情况。最后在操作完成后使用EndExecuteNonQuery完成异步操作。
class Program { //把Asynchronous Processing设置为true static string connectionString = "Data Source=LESLIE-PC;Initial Catalog=Business;“+ "Integrated Security=True;Asynchronous Processing=true"; static void Main(string[] args) { //把CLR线程池最大线程数设置为1000 ThreadPool.SetMaxThreads(1000, 1000); ThreadPoolMessage("Start"); //使用WAITFOR DELAY命令来虚拟操作 SqlConnection connection = new SqlConnection(connectionString); SqlCommand command = new SqlCommand("WAITFOR DELAY '0:0:3';", connection); connection.Open(); //启动异步SqlCommand操作,利用轮询方式监测操作 IAsyncResult result = command.BeginExecuteNonQuery(); ThreadPoolMessage("BeginRead"); while (!result.AsyncWaitHandle.WaitOne(500)) Console.WriteLine("Main thread do work........"); //结束异步SqlCommand int count= command.EndExecuteNonQuery(result); ThreadPoolMessage("\nCompleted"); Console.ReadKey(); } //显示线程池现状 static void ThreadPoolMessage(string data) { int a, b; ThreadPool.GetAvailableThreads(out a, out b); string message = string.Format("{0}\n CurrentThreadId is {1}\n "+ "WorkerThreads is:{2} CompletionPortThreads is :{3}\n", data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); Console.WriteLine(message); } }
class Program { //把Asynchronous Processing设置为true static string connectionString = "Data Source=LESLIE-PC;Initial Catalog=Business;”+ “Integrated Security=True;Asynchronous Processing=true"; static void Main(string[] args) { //把CLR线程池最大线程数设置为1000 ThreadPool.SetMaxThreads(1000, 1000); ThreadPoolMessage("Start"); //使用WAITFOR DELAY命令来虚拟操作 SqlConnection connection = new SqlConnection(connectionString); SqlCommand command = new SqlCommand("WAITFOR DELAY '0:0:3';", connection); connection.Open(); //启动异步SqlCommand操作,并把SqlCommand对象传递到回调函数 IAsyncResult result = command.BeginExecuteNonQuery( new AsyncCallback(AsyncCallbackMethod),command); Console.ReadKey(); } static void AsyncCallbackMethod(IAsyncResult result) { Thread.Sleep(200); ThreadPoolMessage("AsyncCallback"); SqlCommand command = (SqlCommand)result.AsyncState; int count=command.EndExecuteNonQuery(result); command.Connection.Close(); } //显示线程池现状 static void ThreadPoolMessage(string data) { int a, b; ThreadPool.GetAvailableThreads(out a, out b); string message = string.Format("{0}\n CurrentThreadId is {1}\n "+ "WorkerThreads is:{2} CompletionPortThreads is :{3}\n", data, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); Console.WriteLine(message); } }
要使用多线程开发,必须非常熟悉Thread的使用,而且在开发过程中可能会面对很多未知的问题。为了简化开发,.NET 4.0 特别提供一个并行编程库System.Threading.Tasks,它可以简化并行开发,你无需直接跟线程或线程池打交道,就可以简单建立多线程应用程序。此外,.NET还提供了新的一组扩展方法PLINQ,它具有自动分析查询功能,如果并行查询能提高系统效率,则同时运行,如果查询未能从并行查询中受益,则按原顺序查询。下面将详细介绍并行操作的方式。
7.1 泛型委托
Funca8093152e673feb7aba1828c43532094是一个能接受多个参数和一个返回值的泛型委托,它能接受0个到16个输入参数, 其中 T1,T2,T3,T4......T16 代表自定的输入类型,TResult为自定义的返回值。
public delegate TResult Funcb54c2c292509147c0b54128f4eb90887()
public delegate TResult Funcd2167048053f3ba189f5f1e4cad978c8(T1 arg1)
public delegate TResult Func30dd6f973459f5160fc55050a4c63168(T1 arg1,T2 arg2)
public delegate TResult Func84a1de30089849cc11553cf7573f8f76(T1 arg1,T2 arg2,T3 arg3)
public delegate TResult Funcc9a3c0355358689bb3dfb380836f4c6b(T1 arg1,T2 arg2,T3 arg3,T4 arg4)
public delegate TResult Func59b1f11d2a5fb54f3d53120de4829070(T1 arg1,T2 arg2,T3 arg3,T4 arg4,...... ,T16 arg16)
public delegate void Action7e967c91083bb24e2a2d13053cb29e83()
public delegate void Actionfa585c9f3e0b0e6baae3087fd7941722(T1 arg1,T2 arg2)
public delegate void Action0bbe2edebc476b9a4174023d1bffa547(T1 arg1,T2 arg2, T3 arg3)
public delegate void Actiondd4c01aa8f44b08fc1f25c3dbf50f170(T1 arg1,T2 arg2,T3 arg3,T4 arg4,...... ,T16 arg16)
7.2 任务并行库(TPL)
System.Threading.Tasks中的类被统称为任务并行库(Task Parallel Library,TPL),TPL使用CLR线程池把工作分配到CPU,并能自动处理工作分区、线程调度、取消支持、状态管理以及其他低级别的细节操作,极大地简化了多线程的开发。
7.2.1 数据并行
数据并行的核心类就是System.Threading.Tasks.Parallel,它包含两个静态方法 Parallel.For 与 Parallel.ForEach, 使用方式与for、foreach相仿。通过这两个方法可以并行处理System.Funca8093152e673feb7aba1828c43532094、System.Actiona8093152e673feb7aba1828c43532094委托。
以下一个例子就是利用 public static ParallelLoopResult For( int from, int max, Actionbd43222e33876353aff11e13a7dc75f6) 方法对List8abf60ac54173a2785e603c7a1f95b4e进行并行查询。
class Program { static void Main(string[] args) { //设置最大线程数 ThreadPool.SetMaxThreads(1000, 1000); //并行查询 Parallel.For(0, 3,n => { Thread.Sleep(2000); //模拟查询 ThreadPoolMessage(GetPersonList()[n]); }); Console.ReadKey(); } //模拟源数据 static IList<Person> GetPersonList() { var personList = new List<Person>(); var person1 = new Person(); person1.ID = 1; person1.Name = "Leslie"; person1.Age = 30; personList.Add(person1); ........... return personList; } //显示线程池现状 static void ThreadPoolMessage(Person person) { int a, b; ThreadPool.GetAvailableThreads(out a, out b); string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" + " CurrentThreadId is {3}\n WorkerThreads is:{4}" + " CompletionPortThreads is :{5}\n", person.ID, person.Name, person.Age, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); Console.WriteLine(message); } }
public static ParallelLoopResult ForEach767cb18821eb42d0be2dbae71b5cc905( IEnumerable767cb18821eb42d0be2dbae71b5cc905 source, Actione3122bb2e828f66dc207d24dd5c51fbe action)
其中source为数据集,在Action0917abb69ff32f3cb36837b8f3ed949b委托的ParallelLoopState参数当中包含有Break()和 Stop()两个方法都可以使迭代停止。Break的使用跟传统for里面的使用方式相似,但因为处于并行处理当中,使用Break并不能保证所有运行能立即停止,在当前迭代之前的迭代会继续执行。若想立即停止操作,可以使用Stop方法,它能保证立即终止所有的操作,无论它们是处于当前迭代的之前还是之后。
class Program { static void Main(string[] args) { //设置最大线程数 ThreadPool.SetMaxThreads(1000, 1000); //并行查询 Parallel.ForEach(GetPersonList(), (person, state) => { if (person.ID == 2) state.Stop(); ThreadPoolMessage(person); }); Console.ReadKey(); } //模拟源数据 static IList<Person> GetPersonList() { var personList = new List<Person>(); var person1 = new Person(); person1.ID = 1; person1.Name = "Leslie"; person1.Age = 30; personList.Add(person1); .......... return personList; } //显示线程池现状 static void ThreadPoolMessage(Person person) { int a, b; ThreadPool.GetAvailableThreads(out a, out b); string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" + " CurrentThreadId is {3}\n WorkerThreads is:{4}" + " CompletionPortThreads is :{5}\n", person.ID, person.Name, person.Age, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); Console.WriteLine(message); } }
public static ParallelLoopResult ForEach17bd359d0d4870e621c14ec9e4feb8fc(IEnumerable0fcfc9b8421917508e4e163d1cf0e798, Func88203973307294ba1ee175f5ae9edd9d, Func398c86853ba6dee84d8470dc78a625ec, Action88203973307294ba1ee175f5ae9edd9d)
第 三个参数是委托Funce097c43e62c92f1b4602bba77a4340da,它能对数据集的每个成员进行迭代,当中T1是数据集的成员,T2是一个ParallelLoopState对 象,它可以控制迭代的状态,T3是线程中的本地变量;
public class Order { public int ID; public float Price; } public class OrderItem { public int ID; public string Goods; public int OrderID; public float Price; public int Count; } class Program { static void Main(string[] args) { //设置最大线程数 ThreadPool.SetMaxThreads(1000, 1000); float totalPrice = 0f; //并行查询 var parallelResult = Parallel.ForEach(GetOrderList(), () => 0f, //把参数初始值设为0 (order, state, orderPrice) => { //计算单个Order的价格 orderPrice = GetOrderItem().Where(item => item.OrderID == order.ID) .Sum(item => item.Price * item.Count); order.Price = orderPrice; ThreadPoolMessage(order); return orderPrice; }, (finallyPrice) => { totalPrice += finallyPrice;//计算多个Order的总体价格 } ); while (!parallelResult.IsCompleted) Console.WriteLine("Doing Work!"); Console.WriteLine("Total Price is:" + totalPrice); Console.ReadKey(); } //虚拟数据 static IList<Order> GetOrderList() { IList<Order> orderList = new List<Order>(); Order order1 = new Order(); order1.ID = 1; orderList.Add(order1); ............ return orderList; } //虚拟数据 static IList<OrderItem> GetOrderItem() { IList<OrderItem> itemList = new List<OrderItem>(); OrderItem orderItem1 = new OrderItem(); orderItem1.ID = 1; orderItem1.Goods = "iPhone 4S"; orderItem1.Price = 6700; orderItem1.Count = 2; orderItem1.OrderID = 1; itemList.Add(orderItem1); ........... return itemList; } //显示线程池现状 static void ThreadPoolMessage(Order order) { int a, b; ThreadPool.GetAvailableThreads(out a, out b); string message = string.Format("OrderID:{0} OrderPrice:{1}\n" + " CurrentThreadId is {2}\n WorkerThreads is:{3}" + " CompletionPortThreads is:{4}\n", order.ID, order.Price, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); Console.WriteLine(message); } }
7.2.2 任务并行
在TPL当中还可以使用Parallel.Invoke方法触发多个异步任务,其中 actions 中可以包含多个方法或者委托,parallelOptions用于配置Parallel类的操作。
public static void Invoke(Action[] actions )
public static void Invoke(ParallelOptions parallelOptions, Action[] actions )
class Program { static void Main(string[] args) { //设置最大线程数 ThreadPool.SetMaxThreads(1000, 1000); //任务并行 Parallel.Invoke(option, PersonMessage, ()=>ThreadPoolMessage(GetPersonList()[1]), delegate(){ ThreadPoolMessage(GetPersonList()[2]); }); Console.ReadKey(); } static void PersonMessage() { ThreadPoolMessage(GetPersonList()[0]); } //显示线程池现状 static void ThreadPoolMessage(Person person) { int a, b; ThreadPool.GetAvailableThreads(out a, out b); string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" + " CurrentThreadId is {3}\n WorkerThreads is:{4}" + " CompletionPortThreads is :{5}\n", person.ID, person.Name, person.Age, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); Console.WriteLine(message); } //模拟源数据 static IList<Person> GetPersonList() { var personList = new List<Person>(); var person1 = new Person(); person1.ID = 1; person1.Name = "Leslie"; person1.Age = 30; personList.Add(person1); .......... return personList; } }
7.3 Task简介
class Program { static void Main(string[] args) { ThreadPool.SetMaxThreads(1000, 1000); Task.Factory.StartNew(() => ThreadPoolMessage()); Console.ReadKey(); } //显示线程池现状 static void ThreadPoolMessage() { int a, b; ThreadPool.GetAvailableThreads(out a, out b); string message = string.Format("CurrentThreadId is:{0}\n" + "CurrentThread IsBackground:{1}\n" + "WorkerThreads is:{2}\nCompletionPortThreads is:{3}\n", Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsBackground.ToString(), a.ToString(), b.ToString()); Console.WriteLine(message); } }
public Task StartNew( Action action, CancellationToken cancellationToken )
<html xmlns="http://www.w3.org/1999/xhtml"> <head runat="server"> <title></title> <script type="text/C#" runat="server"> private static List<string> url=new List<string>(); protected void Page_Load(object sender, EventArgs e) { if (!Page.IsPostBack) { url.Clear(); Application["Url"] = null; } } protected void CheckBox_CheckedChanged(object sender, EventArgs e) { CheckBox checkBox = (CheckBox)sender; if (checkBox.Checked) url.Add(checkBox.Text); else url.Remove(checkBox.Text); Application["Url"]= url; } </script> </head> <body> <form id="form1" runat="server" > <div align="left"> <div align="center" style="float: left;"> <asp:Image ID="Image1" runat="server" ImageUrl="~/Images/A.jpg" /><br /> <asp:CheckBox ID="CheckBox1" runat="server" AutoPostBack="True" oncheckedchanged="CheckBox_CheckedChanged" Text="A.jpg" /> </div> <div align="center" style="float: left"> <asp:Image ID="Image2" runat="server" ImageUrl="~/Images/B.jpg" /><br /> <asp:CheckBox ID="CheckBox2" runat="server" AutoPostBack="True" oncheckedchanged="CheckBox_CheckedChanged" Text="B.jpg" /> </div> <div align="center" style="float: left"> <asp:Image ID="Image3" runat="server" ImageUrl="~/Images/C.jpg" /><br /> <asp:CheckBox ID="CheckBox3" runat="server" AutoPostBack="True" oncheckedchanged="CheckBox_CheckedChanged" Text="C.jpg" /> </div> <div align="center" style="float: left"> <asp:Image ID="Image4" runat="server" ImageUrl="~/Images/D.jpg" /><br /> <asp:CheckBox ID="CheckBox4" runat="server" AutoPostBack="True" oncheckedchanged="CheckBox_CheckedChanged" Text="D.jpg" /> </div> <div align="center" style="float: left"> <asp:Image ID="Image5" runat="server" ImageUrl="~/Images/E.jpg" /><br /> <asp:CheckBox ID="CheckBox5" runat="server" AutoPostBack="True" oncheckedchanged="CheckBox_CheckedChanged" Text="E.jpg" /> </div> </div> </form> </body> </html>
Handler.ashx 处理图片的下载,它从 Application["Url"] 当中获取所选择图片的路径,并把图片转化成byte[]二进制数据。
最后把图片的二进制数据记入 OutputStream 一并输出。
public class Handler : IHttpHandler { public void ProcessRequest(HttpContext context) { //获取图片名,把图片数量写OutputStream List<String> urlList = (List<string>)context.Application["Url"]; context.Response.OutputStream.Write(BitConverter.GetBytes(urlList.Count), 0, 4); //把图片转换成二进制数据 List<string> imageList = GetImages(urlList); //把每副图片长度写入OutputStream foreach (string image in imageList) { byte[] imageByte=Convert.FromBase64String(image); context.Response.OutputStream.Write(BitConverter.GetBytes(imageByte.Length),0,4); } //把图片写入OutputStream foreach (string image in imageList) { byte[] imageByte = Convert.FromBase64String(image); context.Response.OutputStream.Write(imageByte,0,imageByte.Length); } } //获取多个图片的二进制数据 private List<string> GetImages(List<string> urlList) { List<string> imageList = new List<string>(); foreach (string url in urlList) imageList.Add(GetImage(url)); return imageList; } //获取单副图片的二进制数据 private string GetImage(string url) { string path = "E:/My Projects/Example/WebSite/Images/"+url; FileStream stream = new FileStream(path, FileMode.Open, FileAccess.Read); byte[] imgBytes = new byte[10240]; int imgLength = stream.Read(imgBytes, 0, 10240); return Convert.ToBase64String(imgBytes,0,imgLength); } public bool IsReusable { get{ return false;} } }
系统利用TaskFactory.StartNew(action,cancellationToken) 方式异步调用GetImages方法进行图片下载。
public partial class Form1 : Form { private CancellationTokenSource tokenSource = new CancellationTokenSource(); public Form1() { InitializeComponent(); ThreadPool.SetMaxThreads(1000, 1000); } private void downloadToolStripMenuItem_Click(object sender, EventArgs e) { Task.Factory.StartNew(GetImages,tokenSource.Token); } private void cancelToolStripMenuItem_Click(object sender, EventArgs e) { tokenSource.Cancel(); } private void GetImages() { //发送请求,获取输出流 WebRequest webRequest = HttpWebRequest.Create("Http://localhost:5800/Handler.ashx"); Stream responseStream=webRequest.GetResponse().GetResponseStream(); byte[] responseByte = new byte[81960]; IAsyncResult result=responseStream.BeginRead(responseByte,0,81960,null,null); int responseLength = responseStream.EndRead(result); //获取图片数量 int imageCount = BitConverter.ToInt32(responseByte, 0); //获取每副图片的长度 int[] lengths = new int[imageCount]; for (int n = 0; n < imageCount; n++) { int length = BitConverter.ToInt32(responseByte, (n + 1) * 4); lengths[n] = length; } try { //保存图片 for (int n = 0; n < imageCount; n++) { string path = string.Format("E:/My Projects/Example/Test/Images/pic{0}.jpg", n); FileStream file = new FileStream(path, FileMode.Create, FileAccess.ReadWrite); //计算字节偏移量 int offset = (imageCount + 1) * 4; for (int a = 0; a < n; a++) offset += lengths[a]; file.Write(responseByte, offset, lengths[n]); file.Flush(); //模拟操作 Thread.Sleep(1000); //检测CancellationToken变化 tokenSource.Token.ThrowIfCancellationRequested(); } } catch (OperationCanceledException ex) { MessageBox.Show("Download cancel!"); } } }
7.4 并行查询(PLINQ)
并行 LINQ (PLINQ) 是 LINQ 模式的并行实现,主要区别在于 PLINQ 尝试充分利用系统中的所有处理器。 它利用所有处理器的方法,把数据源分成片段,然后在多个处理器上对单独工作线程上的每个片段并行执行查询, 在许多情况下,并行执行意味着查询运行速度显著提高。但这并不说明所有PLINQ都会使用并行方式,当系统测试要并行查询会对系统性能造成损害时,那将自动化地使用同步执行。
7.4.1 AsParallel
通常想要实现并行查询,只需向数据源添加 AsParallel 查询操作即可。
class Program { static void Main(string[] args) { var personList=GetPersonList().AsParallel() .Where(x=>x.Age>30); Console.ReadKey(); } //模拟源数据 static IList<Person> GetPersonList() { var personList = new List<Person>(); var person1 = new Person(); person1.ID = 1; person1.Name = "Leslie"; person1.Age = 30; personList.Add(person1); ........... return personList; } }
7.4.2 AsOrdered
class Program { static void Main(string[] args) { var personList=GetPersonList().AsParallel().AsOrdered() .Where(x=>x.Age<30); Console.ReadKey(); } static IList<Person> GetPersonList() {......} }
7.4.3 WithDegreeOfParallelism
默认情况下,PLINQ 使用主机上的所有处理器,这些处理器的数量最多可达 64 个。
通过使用 WithDegreeOfParallelism(Of TSource) 方法,可以指示 PLINQ 使用不多于指定数量的处理器。
class Program { static void Main(string[] args) { var personList=GetPersonList().AsParallel().WithDegreeOfParallelism(2) .Where(x=>x.Age<30); Console.ReadKey(); } static IList<Person> GetPersonList() {.........} }
7.4.4 ForAll
class Program { static void Main(string[] args) { ThreadPool.SetMaxThreads(1000, 1000); GetPersonList().AsParallel().ForAll(person =>{ ThreadPoolMessage(person); }); Console.ReadKey(); } static IList<Person> GetPersonList() {.......} //显示线程池现状 static void ThreadPoolMessage(Person person) { int a, b; ThreadPool.GetAvailableThreads(out a, out b); string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" + " CurrentThreadId is {3}\n WorkerThreads is:{4}" + " CompletionPortThreads is :{5}\n", person.ID, person.Name, person.Age, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); Console.WriteLine(message); } }
7.4.5 WithCancellation
如果需要停止查询,可以使用 WithCancellation(Of TSource) 运算符并提供 CancellationToken 实例作为参数。
与第三节Task的例子相似,如果标记上的 IsCancellationRequested 属性设置为 true,则 PLINQ 将会注意到它,并停止所有线程上的处理,然后引发 OperationCanceledException。这可以保证并行查询能够立即停止。
class Program { static CancellationTokenSource tokenSource = new CancellationTokenSource(); static void Main(string[] args) { Task.Factory.StartNew(Cancel); try { GetPersonList().AsParallel().WithCancellation(tokenSource.Token) .ForAll(person => { ThreadPoolMessage(person); }); } catch (OperationCanceledException ex) { } Console.ReadKey(); } //在10~50毫秒内发出停止信号 static void Cancel() { Random random = new Random(); Thread.Sleep(random.Next(10,50)); tokenSource.Cancel(); } static IList<Person> GetPersonList() {......} //显示线程池现状 static void ThreadPoolMessage(Person person) { int a, b; ThreadPool.GetAvailableThreads(out a, out b); string message = string.Format("Person ID:{0} Name:{1} Age:{2}\n" + " CurrentThreadId is {3}\n WorkerThreads is:{4}" + " CompletionPortThreads is :{5}\n", person.ID, person.Name, person.Age, Thread.CurrentThread.ManagedThreadId, a.ToString(), b.ToString()); Console.WriteLine(message); } }
Timer类中最常用的构造函数为 public Timer( timerCallback , object , int , int )
第二个参数是为 timerCallback 委托输入的参数对象。
class Program { static void Main(string[] args) { ThreadPool.SetMaxThreads(1000, 1000); TimerCallback callback = new TimerCallback(ThreadPoolMessage); Timer t = new Timer(callback,"Hello Jack! ", 0, 1000); Console.ReadKey(); } //显示线程池现状 static void ThreadPoolMessage(object data) { int a, b; ThreadPool.GetAvailableThreads(out a, out b); string message = string.Format("{0}\n CurrentThreadId is:{1}\n" + " CurrentThread IsBackground:{2}\n" + " WorkerThreads is:{3}\n CompletionPortThreads is:{4}\n", data + "Time now is " + DateTime.Now.ToLongTimeString(), Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsBackground.ToString(), a.ToString(), b.ToString()); Console.WriteLine(message); } }
8.2 锁
8.2.1 lock
private void Method() { lock(this) { //在此进行的操作能保证在同一时间内只有一个线程对此对象操作 } }
class Control { private object obj=new object(); public void Method() { lock(obj) {.......} } }
8.2.2 Montior
它存在 Enter, Exit 两个方法,它可以对对象进行锁定与解锁,比lock使用更灵活。
class Control { private object obj=new object(); public void Method() { Monitor.Enter(obj); try {......} catch(Excetion ex) {......} finally { Monitor.Exit(obj); } } }
8.2.3 Interlocked
Increment、Decrement 可以使参数安全地加1或减1并返回递增后的新值。
class Example { private int a=1; public void AddOne() { int newA=Interlocked.Increment(ref a); } }
public void SetData() { Interlocked.Exchange(ref a,100); }
public void CompareAndExchange() { Interlocked.CompareExchange(ref a,100,1); }