首頁  >  文章  >  Java  >  RPC框架的實例詳解

RPC框架的實例詳解

零下一度
零下一度原創
2017-07-17 14:07:562255瀏覽

1,背景

隨著互聯網的發展,網站應用的規模不斷擴大,常規的垂直應用架構已無法應對,分佈式服務架構以及流動運算架構勢在必行,亟需一個治理系統確保架構有條不紊的演進

單一應用架構

##當網站流量很小時,只需一個應用,將所有功能都部署在一起,以減少部署節點和成本

此時,用於簡化增刪改查工作量的 資料訪問框架(ORM) 是關鍵

垂直應用架構

當訪問量逐漸增加,單一應用增加機器帶來的加速度越來越小,將應用程式拆成互不相干的幾個應用,以提升效率

此時,用於加速前端頁面開發的 Web框架(MVC) 是關鍵

分散式服務架構

以業務線分割

#停止RPC濫用,垂直業務內優先透過本地jar調用,跨業務才採用RPC調用

正確的識別業務邏輯的歸屬,讓各個模組最大化內聚,從性能,可用性和維護性上減少耦合

每次發佈只部署部分伺服器

每個節點可依不同需求伸縮擴充

# #每個應用程式之間更新,部署,運行不影響

部署分離


團隊分離

資料分離

當垂直應用越來越多,應用程式之間互動不可避免,將核心業務抽取出來,作為獨立的服務,逐漸形成穩定的服務中心,使前端應用能更快速的響應多變的市場需求

此時,用於提高業務復用及整合的 分散式服務框架(RPC) 是關鍵

分散式服務RPC框架


流動運算架構


當服務越來越多,容量的評估,小服務資源的浪費等問題逐漸顯現,此時需增加一個調度中心基於訪問壓力實時管理叢集容量,提高叢集利用率

此時,用於提高機器利用率的資源調度和治理中心(SOA)是關鍵

Netty 線程模型

Netty的執行緒模型主要是基於React,因為考慮到應用場景的不同所以演化出多種版本。

單執行緒模式

即接收服務請求以及執行IO操作都由一個執行緒來完成,由於採用的是IO多路復用這類無阻塞IO操作,所以在請求量不大的情況下單執行緒模式也是可以解決一部分場景問題的。

單接收多工作執行緒模式

當請求量增加後,原有的一個執行緒處理所有IO運算變得越來越無法支撐對應的效能指標,所以提到了一個工作線程池的概念,此時接收服務請求還是一個線程,接收請求的線程收到請求後會委託給後面的工作線程池,從線程池中取得一個線程去執行用戶請求。

多接收多工作執行緒模式

當請求量進一步增大後,單一的接收服務請求的執行緒無法處理所有客戶端的連接,所以將接收服務請求的也擴展成執行緒池,由多個執行緒同時負責接收客戶端的連線。

RPC 業務執行緒

上面提到的都是Netty自身的執行緒模型,伴隨著請求量的成長而不斷發展出來的最佳化策略。而RPC請求對應用系統來講最主要還是業務邏輯的處理,而這類業務有可能是計算密集型的也有可以是IO密集型,像大多數應用都伴隨著數據庫操作,redis或者是連接其它的網路服務等。如果業務請求中有這類耗時的IO操作,建議將處理業務請求的任務指派給獨立的執行緒池,否則可能會阻塞netty自身的執行緒。

###

接收請求執行緒與工作執行緒分工

  • 接收請求執行緒主要負責建立鏈路,然後將請求委派給工作執行緒

  • 工作執行緒負責編碼解碼讀取IO等操作

方案實作

目前我實作的RPC是採用多接收多工作執行緒模式,在服務端是這樣綁定連接埠的:

public void bind(ServiceConfig serviceConfig) {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(this.rpcServerInitializer)
                    .childOption(ChannelOption.SO_KEEPALIVE,true)
            ;try {ChannelFuture channelFuture = bootstrap.bind(serviceConfig.getHost(),serviceConfig.getPort()).sync();//...channelFuture.channel().closeFuture().sync();


            } catch (InterruptedException e) {throw new RpcException(e);
            }
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

boosGroup就是一組用來接收服務請求的
workerGroup就是一組具體負責IO操作的

增加業務執行緒只需要將handle的操作進一步委派給執行緒池即可,這裡為了擴充所以需要定義介面:

定義執行緒池介面

public interface RpcThreadPool {Executor getExecutor(int threadSize,int queues);
}

實作固定大小執行緒池

參考了dubbo執行緒池

#
@Qualifier("fixedRpcThreadPool")@Componentpublic class FixedRpcThreadPool implements RpcThreadPool {private Executor executor;@Overridepublic Executor getExecutor(int threadSize,int queues) {if(null==executor) {synchronized (this) {if(null==executor) {
                    executor= new ThreadPoolExecutor(threadSize, threadSize, 0L, TimeUnit.MILLISECONDS,
                            queues == 0 ? new SynchronousQueue<Runnable>() :(queues < 0 ? new LinkedBlockingQueue<Runnable>(): new LinkedBlockingQueue<Runnable>(queues)),new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {                                   //...}
                            });
                }
            }
        }return executor;
    }
}
##小插曲:

記的有一次一朋友突然問java 線程池中的那個coreSize是什麼意思?我頓時短路了,因為平常也不怎麼寫多線程,想到平常用的比較多的資料庫線程池,裡面的參數倒是印像比較深,但就是想不起來有個coreSize。後來才又仔細看了下線程池的一些參數。現在藉這個機會又可以多再看看,以免再短路。

執行緒池工廠

當有多個執行緒池實作時,透過執行緒池名稱來動態選擇執行緒池。

@Componentpublic class RpcThreadPoolFactory {@Autowiredprivate Map<String,RpcThreadPool> rpcThreadPoolMap;public RpcThreadPool getThreadPool(String threadPoolName){return this.rpcThreadPoolMap.get(threadPoolName);
    }
}
修改ChannelHandle的channelRead0方法

#將方法體包裝成Task交給執行緒池去執行。

@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest) {this.executor.execute(new Runnable() {@Overridepublic void run() {RpcInvoker rpcInvoker=RpcServerInvoker.this.buildInvokerChain(RpcServerInvoker.this);RpcResponse response=(RpcResponse) rpcInvoker.invoke(RpcServerInvoker.this.buildRpcInvocation(rpcRequest));
            channelHandlerContext.writeAndFlush(response);
        }
    });

}
問題

目前缺乏壓測,所以暫時沒有明確的數據比較。

以上是RPC框架的實例詳解的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn