首頁 >Java >java教程 >Netty 線程模型的實例詳解

Netty 線程模型的實例詳解

零下一度
零下一度原創
2017-07-03 11:33:003711瀏覽

Netty 執行緒模型

Netty的執行緒模型主要是基於React,因為考慮到應用程式場景的不同所以演化出多種版本。

單執行緒模式

即接收服務請求以及執行IO操作都由一個執行緒來完成,由於採用的是IO多路復用這類無阻塞IO操作,所以在請求量不大的情況下單執行緒模式也是可以解決一部分場景問題的。

單接收多工作執行緒模式

當請求量增加後,原有的一個執行緒處理所有IO運算變得越來越無法支撐對應的效能指標,所以提到了一個工作線程池的概念,此時接收服務請求還是一個線程,接收請求的線程收到請求後會委託給後面的工作線程池,從線程池中取得一個線程去執行用戶請求。

多接收多工作執行緒模式

當請求量進一步增大後,單一的接收服務請求的執行緒無法處理所有客戶端的連接,所以將接收服務請求的也擴展成執行緒池,由多個執行緒同時負責接收客戶端的連線。

RPC 業務執行緒

上面提到的都是Netty自身的執行緒模型,伴隨著請求量的成長而不斷發展出來的最佳化策略。而RPC請求對應用系統來講最主要還是業務邏輯的處理,而這類業務有可能是計算密集型的也有可以是IO密集型,像大多數應用都伴隨著數據庫操作,redis或者是連接其它的網路服務等。如果業務請求中有這類耗時的IO操作,建議將處理業務請求的任務指派給獨立的執行緒池,否則可能會阻塞netty自身的執行緒。

接收請求執行緒與工作執行緒分工

  • #接收請求執行緒主要負責建立鏈路,然後將請求委派給工作線程

  • 工作線程負責編碼解碼讀取IO等操作

##方案實作

目前我實作的RPC是採用多接收多工作執行緒模式,在服務端是這樣綁定連接埠的:

public void bind(ServiceConfig serviceConfig) {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(this.rpcServerInitializer)
                    .childOption(ChannelOption.SO_KEEPALIVE,true)
            ;try {ChannelFuture channelFuture = bootstrap.bind(serviceConfig.getHost(),serviceConfig.getPort()).sync();//...channelFuture.channel().closeFuture().sync();


            } catch (InterruptedException e) {throw new RpcException(e);
            }
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
boosGroup就是一組用來接收服務請求的

workerGroup就是一組具體負責IO操作的

增加業務線程只需要將handle的操作進一步委派給線程池即可,這裡為了擴展所以需要定義接口:

定義線程池介面

public interface RpcThreadPool {Executor getExecutor(int threadSize,int queues);
}
實現固定大小線程池

#參考了dubbo線程池

@Qualifier("fixedRpcThreadPool")@Componentpublic class FixedRpcThreadPool implements RpcThreadPool {private Executor executor;@Overridepublic Executor getExecutor(int threadSize,int queues) {if(null==executor) {synchronized (this) {if(null==executor) {
                    executor= new ThreadPoolExecutor(threadSize, threadSize, 0L, TimeUnit.MILLISECONDS,
                            queues == 0 ? new SynchronousQueue<Runnable>() :(queues < 0 ? new LinkedBlockingQueue<Runnable>(): new LinkedBlockingQueue<Runnable>(queues)),new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {                                   //...}
                            });
                }
            }
        }return executor;
    }
}


小插曲:
記的有一次一朋友突然問java 線程池中的那個coreSize是什麼意思?我頓時短路了,因為平常也不怎麼寫多線程,想到平常用的比較多的資料庫線程池,裡面的參數倒是印像比較深,但就是想不起來有個coreSize。後來才又仔細看了下線程池的一些參數。現在藉這個機會又可以多再看看,以免再短路。

執行緒池工廠

###當有多個執行緒池實作時,透過執行緒池名稱來動態選擇執行緒池。 ######
@Componentpublic class RpcThreadPoolFactory {@Autowiredprivate Map<String,RpcThreadPool> rpcThreadPoolMap;public RpcThreadPool getThreadPool(String threadPoolName){return this.rpcThreadPoolMap.get(threadPoolName);
    }
}
######修改ChannelHandle的channelRead0方法#######將方法體包裝成Task交給執行緒池去執行。 ######
@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest) {this.executor.execute(new Runnable() {@Overridepublic void run() {RpcInvoker rpcInvoker=RpcServerInvoker.this.buildInvokerChain(RpcServerInvoker.this);RpcResponse response=(RpcResponse) rpcInvoker.invoke(RpcServerInvoker.this.buildRpcInvocation(rpcRequest));
            channelHandlerContext.writeAndFlush(response);
        }
    });
}
######問題######目前缺乏壓測,所以暫時沒有明確的數據比較。 ###

以上是Netty 線程模型的實例詳解的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn