Getty是我為了學習 Java NIO 所寫的 NIO 框架,實作過程中參考了 Netty 的設計,同時使用 Groovy 來實作。雖然只是玩具,但是麻雀雖小,五臟俱全,在實現過程中,不僅熟悉了 NIO 的使用,還借鑒了很多 Netty 的設計思想,提升了自己的編碼和設計能力。
至於為什麼要用Groovy 來寫,因為我剛學了Groovy,剛好拿來練手,加上Groovy 是相容Java 的,所以只是語法上的差別,底層實作還是基於Java API的。
Getty 的核心程式碼行數不超過 500 行,一方面得益於 Groovy 簡潔的語法,另一方面是因為我只實現了核心的邏輯,最複雜的其實是解碼器實現。鷹架容易搭,摩天大樓哪有那麼容易蓋,但用來學習 NIO 足以。
Getty 使用的是Reactor 多執行緒模型
有專門一個NIO 執行緒- Acceptor 執行緒用於監聽服務端,接收客戶端的TCP 連線請求,然後將連線指派給工作執行緒,由工作執行緒來監聽寫入事件。
網路 IO 操作-讀/寫等由多個工作執行緒負責,由這些工作執行緒負責訊息的讀取、解碼、編碼和發送。
1 個工作線程可以同時處理N條鏈路,但是 1 個鏈路只對應 1 個工作線程,防止並發操作問題。
整個服務端的流程處理,建立於事件機制上。在[接受連線->讀->業務處理->寫->關閉連線]這個過程中,觸發器將觸發對應事件,由事件處理器對對應事件分別回應,完成伺服器端的業務處理。
onRead
:當客戶端發來數據,並已被工作執行緒正確讀取時,觸發該事件 。此事件通知各事件處理器可以對客戶端發送的資料進行實際處理了。
onWrite
:當客戶端可以開始接受服務端發送資料時觸發該事件,透過該事件,我們可以向客戶端發送回應資料。 (目前的實作中並未使用寫入事件)
onClosed
:當客戶端與伺服器斷開連線時觸發該事件。
在這個模型中,事件採用廣播方式,也就是所有註冊的事件處理器都能獲得事件通知。這樣可以將不同性質的業務處理,分別用不同的處理器實現,使每個處理器的功能盡可能單一。
如下圖:整個事件模型由監聽器、事件適配器、事件觸發器(HandlerChain,PipeLine)、事件處理器組成。
ServerListener
:這是事件介面,定義需監聽的伺服器事件
interface ServerListener extends Serializable{ /** * 可读事件回调 * @param request */ void onRead(ctx) /** * 可写事件回调 * @param request * @param response */ void onWrite(ctx) /** * 连接关闭回调 * @param request */ void onClosed(ctx) }
EventAdapter
:對Serverlistener 介面實作一個適配器(EventAdapter),這樣的好處是最終的事件處理器可以只處理所關心的事件。
class EventAdapter implements ServerListener { //下个处理器的引用 protected next void onRead(Object ctx) { } void onWrite(Object ctx) { } void onClosed(Object ctx) { } }
Not<a href="http://www.php.cn/wiki/109.html" target="_blank">if</a>ier
:用於在適當的時候透過觸發伺服器事件,通知在冊的事件處理器對事件做出響應。
interface Notifier extends Serializable{ /** * 触发所有可读事件回调 */ void fireOnRead(ctx) /** * 触发所有可写事件回调 */ void fireOnWrite(ctx) /** * 触发所有连接关闭事件回调 */ void fireOnClosed(ctx) }
HandlerChain
:實作了Notifier
接口,維持有序的事件處理器鏈條,每次從第一個處理器開始觸發。
class HandlerChain implements Notifier{ EventAdapter head EventAdapter tail /** * 添加处理器到执行链的最后 * @param handler */ void addLast(handler) { if (tail != null) { tail.next = handler tail = tail.next } else { head = handler tail = head } } void fireOnRead(ctx) { head.onRead(ctx) } void fireOnWrite(ctx) { head.onWrite(ctx) } void fireOnClosed(ctx) { head.onClosed(ctx) } }
PipeLine
:實作了Notifier
接口,作為事件總線,維持一個事件鏈的清單。
class PipeLine implements Notifier{ static logger = LoggerFactory.getLogger(PipeLine.name) //监听器队列 def listOfChain = [] PipeLine(){} /** * 添加监听器到监听队列中 * @param chain */ void addChain(chain) { synchronized (listOfChain) { if (!listOfChain.contains(chain)) { listOfChain.add(chain) } } } /** * 触发所有可读事件回调 */ void fireOnRead(ctx) { logger.debug("fireOnRead") listOfChain.each { chain -> chain.fireOnRead(ctx) } } /** * 触发所有可写事件回调 */ void fireOnWrite(ctx) { listOfChain.each { chain -> chain.fireOnWrite(ctx) } } /** * 触发所有连接关闭事件回调 */ void fireOnClosed(ctx) { listOfChain.each { chain -> chain.fireOnClosed(ctx) } } }
#程式設計模型
#事件處理採用職責鏈模式,每個處理器處理完資料之後會決定是否繼續下一個處理器。如果處理器不將任務交給執行緒池處理,那麼整個處理流程就在同一個執行緒中處理。而且每個連線都有單獨的PipeLine
,工作執行緒可以在多個連線上下文切換,但是一個連線上下文只會被一個執行緒處理。
连接上下文ConnectionCtx
class ConnectionCtx { /**socket连接*/ SocketChannel channel /**用于携带额外参数*/ Object attachment /**处理当前连接的工作线程*/ Worker worker /**连接超时时间*/ Long timeout /**每个连接拥有自己的pipeline*/ PipeLine pipeLine }
NioServer
主线程负责监听端口,持有工作线程的引用(使用轮转法分配连接),每次有连接到来时,将连接放入工作线程的连接队列,并唤醒线程selector.wakeup()
(线程可能阻塞在selector
上)。
class NioServer extends Thread { /**服务端的套接字通道*/ ServerSocketChannel ssc /**选择器*/ Selector selector /**事件总线*/ PipeLine pipeLine /**工作线程列表*/ def workers = [] /**当前工作线程索引*/ int index }
工作线程,负责注册server传递过来的socket连接。主要监听读事件,管理socket,处理写操作。
class Worker extends Thread { /**选择器*/ Selector selector /**读缓冲区*/ ByteBuffer buffer /**主线程分配的连接队列*/ def queue = [] /**存储按超时时间从小到大的连接*/ TreeMap<Long, ConnectionCtx> ctxTreeMap void run() { while (true) { selector.select() //注册主线程发送过来的连接 registerCtx() //关闭超时的连接 closeTimeoutCtx() //处理事件 dispatchEvent() } } }
我实现了一系列处理HTTP
请求的处理器,具体实现看代码。
LineBasedDecoder
:行解码器,按行解析数据
HttpRequestDecoder
:HTTP请求解析,目前只支持GET请求
HttpRequestHandler
:Http 请求处理器,目前只支持GET方法
HttpResponseHandler
:Http响应处理器
下面是写在test
中的例子
class WebServerTest { static void main(args) { def pipeLine = new PipeLine() def readChain = new HandlerChain() readChain.addLast(new LineBasedDecoder()) readChain.addLast(new HttpRequestDecoder()) readChain.addLast(new HttpRequestHandler()) readChain.addLast(new HttpResponseHandler()) def closeChain = new HandlerChain() closeChain.addLast(new ClosedHandler()) pipeLine.addChain(readChain) pipeLine.addChain(closeChain) NioServer nioServer = new NioServer(pipeLine) nioServer.start() } }
另外,还可以使用配置文件getty.properties
设置程序的运行参数。
#用于拼接消息时使用的二进制数组的缓存区 common_buffer_size=1024 #工作线程读取tcp数据的缓存大小 worker_rcv_buffer_size=1024 #监听的端口 port=4399 #工作线程的数量 worker_num=1 #连接超时自动断开时间 timeout=900 #根目录 root=.
Getty是我造的第二个小轮子,第一个是RedisHttpSession。都说不要重复造轮子。这话我是认同的,但是掌握一门技术最好的方法就是实践,在没有合适项目可以使用新技术的时候,造一个简单的轮子是不错的实践手段。
Getty 的缺点或者说还可以优化的点:
线程的使用直接用了Thread
类,看起来有点low。等以后水平提升了再来抽象一下。
目前只有读事件是异步的,写事件是同步的。未来将写事件也改为异步的。
以上是Getty-實作Java NIO框架設計的詳細解的詳細內容。更多資訊請關注PHP中文網其他相關文章!