Netty的執行緒模型主要是基於React,因為考慮到應用程式場景的不同所以演化出多種版本。
即接收服務請求以及執行IO操作都由一個執行緒來完成,由於採用的是IO多路復用這類無阻塞IO操作,所以在請求量不大的情況下單執行緒模式也是可以解決一部分場景問題的。
當請求量增加後,原有的一個執行緒處理所有IO運算變得越來越無法支撐對應的效能指標,所以提到了一個工作線程池的概念,此時接收服務請求還是一個線程,接收請求的線程收到請求後會委託給後面的工作線程池,從線程池中取得一個線程去執行用戶請求。
當請求量進一步增大後,單一的接收服務請求的執行緒無法處理所有客戶端的連接,所以將接收服務請求的也擴展成執行緒池,由多個執行緒同時負責接收客戶端的連線。
上面提到的都是Netty自身的執行緒模型,伴隨著請求量的成長而不斷發展出來的最佳化策略。而RPC請求對應用系統來講最主要還是業務邏輯的處理,而這類業務有可能是計算密集型的也有可以是IO密集型,像大多數應用都伴隨著數據庫操作,redis或者是連接其它的網路服務等。如果業務請求中有這類耗時的IO操作,建議將處理業務請求的任務指派給獨立的執行緒池,否則可能會阻塞netty自身的執行緒。
接收請求執行緒與工作執行緒分工
#接收請求執行緒主要負責建立鏈路,然後將請求委派給工作線程
工作線程負責編碼解碼讀取IO等操作
public void bind(ServiceConfig serviceConfig) {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(this.rpcServerInitializer) .childOption(ChannelOption.SO_KEEPALIVE,true) ;try {ChannelFuture channelFuture = bootstrap.bind(serviceConfig.getHost(),serviceConfig.getPort()).sync();//...channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) {throw new RpcException(e); } }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
boosGroup就是一組用來接收服務請求的增加業務線程只需要將handle的操作進一步委派給線程池即可,這裡為了擴展所以需要定義接口:定義線程池介面workerGroup就是一組具體負責IO操作的
public interface RpcThreadPool {Executor getExecutor(int threadSize,int queues); }實現固定大小線程池
#參考了dubbo線程池
@Qualifier("fixedRpcThreadPool")@Componentpublic class FixedRpcThreadPool implements RpcThreadPool {private Executor executor;@Overridepublic Executor getExecutor(int threadSize,int queues) {if(null==executor) {synchronized (this) {if(null==executor) { executor= new ThreadPoolExecutor(threadSize, threadSize, 0L, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() :(queues < 0 ? new LinkedBlockingQueue<Runnable>(): new LinkedBlockingQueue<Runnable>(queues)),new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { //...} }); } } }return executor; } }
執行緒池工廠
###當有多個執行緒池實作時,透過執行緒池名稱來動態選擇執行緒池。 ######@Componentpublic class RpcThreadPoolFactory {@Autowiredprivate Map<String,RpcThreadPool> rpcThreadPoolMap;public RpcThreadPool getThreadPool(String threadPoolName){return this.rpcThreadPoolMap.get(threadPoolName); } }######修改ChannelHandle的channelRead0方法#######將方法體包裝成Task交給執行緒池去執行。 ######
@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest) {this.executor.execute(new Runnable() {@Overridepublic void run() {RpcInvoker rpcInvoker=RpcServerInvoker.this.buildInvokerChain(RpcServerInvoker.this);RpcResponse response=(RpcResponse) rpcInvoker.invoke(RpcServerInvoker.this.buildRpcInvocation(rpcRequest)); channelHandlerContext.writeAndFlush(response); } }); }######問題######目前缺乏壓測,所以暫時沒有明確的數據比較。 ###
以上是Netty 線程模型的實例詳解的詳細內容。更多資訊請關注PHP中文網其他相關文章!