ホームページ >Java >&#&チュートリアル >Java での mina の詳細な紹介
Apache mina Serverは、ネットワーク通信アプリケーションフレームワークです。つまり、主にTCP/IP、UDP/IPプロトコルスタックに基づく通信フレームワークです(もちろん、Javaオブジェクトシリアル化サービスと仮想マシンパイプラインも提供できます)通信サービス等)、ミナ
これは、高性能で拡張性の高いネットワーク通信アプリケーションを迅速に開発するのに役立ちます。Mina は、イベント駆動型の非同期操作のプログラミング モデルを提供します (Mina の非同期 IO は、デフォルトで基礎的なサポートとして Java NIO を使用します)。ミナ
主に 1.x と 2.x の 2 つのブランチがあり、ここでは最新バージョン 2.0 について説明します。Mina 1.x を使用している場合は、一部の機能が適用されない可能性があります。このドキュメントを学習するには、JAVA IO、JAVA NIO、JAVAsocket、JAVA スレッド、および同時実行ライブラリ (java.util.concurrent.*) に関する知識が必要です。また、Mina は、ネットワーク通信のサーバー側とクライアント側のカプセル化も提供します。Mina は、Netcom 通信構造全体の中で次のような位置にあります。Mina の API は、実際のネットワーク通信をアプリケーションから分離していることがわかります。気にする必要があるのは、送受信したいデータとビジネス ロジックだけです。同様に、どちらの結末であっても、ミナ
実行プロセスは次のとおりです:
(1.) IoService: このインターフェイスはスレッド上でのソケットの確立を担当し、独自のセレクターを持ち、接続が確立されているかどうかを監視します。
(2.) IoProcessor: このインターフェースは別のスレッド上にあり、チャネル上で読み書きするデータがあるかどうかをチェックする責任があります。つまり、独自のセレクターもあります。これは、私たちが実行したときと同じです。 JAVA NIO コーディングを使用する 1 つの違いは、通常、JAVA NIO コーディングではすべてセレクターを使用することです。つまり、2 つの機能インターフェイス IoService と IoProcessor を区別しません。さらに、IoProcessor は、IoService に登録されているフィルターを呼び出し、フィルター チェーンの後に IoHandler を呼び出す役割を果たします。
(3.) IoFilter: このインターフェイスは、一連のインターセプターを定義します。これらのインターセプターには、ログ出力、ブラックリスト フィルタリング、データ エンコード (書き込み方向) およびデコード (読み取り方向) などの機能が含まれます。これは、Mina を使用するときに最も重要であり、重点を置くことです。
(4.) IoHandler: このインターフェイスは、データが送受信されるビジネス ロジックの作成を担当します。
1. シンプルな TCPServer:
(1.) ステップ 1: IoService を作成する
上記の実行プロセスに従って、最初に IoService 自体がサーバーとクライアントの両方であることを作成する必要があります。 IoAcceptor はプロトコルに依存しないため、TCPServer を記述したいので、IoAcceptor を使用して NioSocketAcceptor を実装します。 ServerSocketChannel クラス。もちろん、Apache の APR ライブラリを使用する場合は、AprSocketAcceptor を TCPServer として使用することを選択できます。
伝説によれば、Apache APR ライブラリのパフォーマンスは、JVM に付属するネイティブ ライブラリよりもはるかに高いです。その後、IoProcessor が作成され、指定された IoService によって内部的に呼び出されるので、気にする必要はありません。
IoAcceptor acceptor=new NioSocketAcceptor(); acceptor.getSessionConfig().setReadBufferSize(2048); acceptor.getSessionConfig.setIdleTime(IdleStatus.BOTH_IDLE,10); acceptor.bind(new InetSocketAddress(9123));
このコードでは、サーバー側で TCP/IP NIO ベースのソケットを初期化し、IoSessionConfig を呼び出してデータ読み取り用のバッファー サイズを設定します。これにより、読み取りチャネルと書き込みチャネルは内部での操作なしでアイドル状態に入ります。 10秒。
(2.) ステップ 2: フィルターを作成します
ここでは、最も単純な文字列送信を扱います。Mina は、文字列をエンコードおよびデコードするための TextLineCodecFactory コーデック ファクトリを提供しました。
acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory( <span style="white-space:pre"> </span>Charset.forName("UTF-8"), <span style="white-space:pre"> </span>LineDelimeter.WINDOWS.getValue(), <span style="white-space:pre"> </span>LineDelimiter. WINDOWS.getValue())) );
ソケットがバインドされた後はこれらの準備を行うことができないため、このコードは acceptor.bind() メソッドの前に実行する必要があります。ここでは、コーデックがどのように機能するかを知る必要はありません。これについては後で説明します。ここでは、送信するデータに改行文字が付いていることだけを知っていればよいため、Mina に付属の改行コーデック ファクトリを使用します。 。
(3.) ステップ 3: IoHandler を作成する
这里我们只是简单的打印Client 传说过来的数据。
public class MyIoHandler extends IoHandlerAdapter { // 这里我们使用的SLF4J作为日志门面,至于为什么在后面说明。 private final static Logger log = LoggerFactory .getLogger(MyIoHandler.class); @Override public void messageReceived(IoSession session, Object message) throws Exception { String str = message.toString(); log.info("The message received is [" + str + "]"); if (str.endsWith("quit")) { session.close(true); return; } } }
然后我们把这个IoHandler 注册到IoService:
acceptor.setHandler(new MyIoHandler());
当然这段代码也要在acceptor.bind()方法之前执行。然后我们运行MyServer 中的main 方法,你可以看到控制台一直处于阻塞状态,此时,我们用telnet 127.0.0.1 9123 访问,然后输入一些内容,当按下回车键,你会发现数据在Server 端被输出,但要注意不要输入中文,因为Windows 的命令行窗口不会对传输的数据进行UTF-8 编码。当输入quit 结尾的字符串时,连接被断开。这里注意你如果使用的操作系统,或者使用的Telnet 软件的换行符是什么,如果不清楚,可以删掉第二步中的两个红色的参数,使用TextLineCodec 内部的自动识别机制。
2. 简单的TCPClient:
这里我们实现Mina 中的TCPClient,因为前面说过无论是Server 端还是Client 端,在Mina中的执行流程都是一样的。唯一不同的就是IoService 的Client 端实现是IoConnector。
(1.) 第一步:编写IoService并注册过滤器
public class MyClient { main方法: IoConnector connector=new NioSocketConnector(); connector.setConnectTimeoutMillis(30000); connector.getFilterChain().addLast("codec", new ProtocolCodecFilter( new TextLineCodecFactory( Charset.forName("UTF-8"), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue() ) ) ); connector.connect(new InetSocketAddress("localhost", 9123)); }
(2.) 第三步:编写IoHandler
public class ClientHandler extends IoHandlerAdapter { private final static Logger LOGGER = LoggerFactory .getLogger(ClientHandler.class); private final String values; public ClientHandler(String values) { this.values = values; } @Override public void sessionOpened(IoSession session) { session.write(values); } }
注册IoHandler:
connector.setHandler(new ClientHandler("你好!\r\n 大家好!"));
然后我们运行MyClient,你会发现MyServer 输出如下语句:
The message received is [你好!]
The message received is [大家好!]
我们看到服务端是按照收到两条消息输出的,因为我们用的编解码器是以换行符判断数据是否读取完毕的。
3. 介绍Mina的TCP的主要接口:
通过上面的两个示例,你应该对Mina 如何编写TCP/IP 协议栈的网络通信有了一些感性的认识。
(1.)IoService:
这个接口是服务端IoAcceptor、客户端IoConnector 的抽象,提供IO 服务和管理IoSession的功能,它有如下几个常用的方法:
A. TransportMetadata getTransportMetadata():
这个方法获取传输方式的元数据描述信息,也就是底层到底基于什么的实现,譬如:nio、apr 等。
B. void addListener(IoServiceListener listener):
这个方法可以为IoService 增加一个监听器,用于监听IoService 的创建、活动、失效、空闲、销毁,具体可以参考IoServiceListener 接口中的方法,这为你参与IoService 的生命周期提供了机会。
C. void removeListener(IoServiceListener listener):
这个方法用于移除上面的方法添加的监听器。
D. void setHandler(IoHandler handler):
这个方法用于向IoService 注册IoHandler,同时有getHandler()方法获取Handler。
E. Mapfbef8c8cec0afa8bb72a3f5a4575a361 getManagedSessions():
这个方法获取IoService 上管理的所有IoSession,Map 的key 是IoSession 的id。
F. IoSessionConfig getSessionConfig():
这个方法用于获取IoSession 的配置对象,通过IoSessionConfig 对象可以设置Socket 连接的一些选项。
(2.)IoAcceptor:
这个接口是TCPServer 的接口,主要增加了void bind()监听端口、void unbind()解除对套接字的监听等方法。这里与传统的JAVA 中的ServerSocket 不同的是IoAcceptor 可以多次调用bind()方法(或者在一个方法中传入多个SocketAddress 参数)同时监听多个端口。
3.)IoConnector:
这个接口是TCPClient 的接口, 主要增加了ConnectFuture connect(SocketAddressremoteAddress,SocketAddress localAddress)方法,用于与Server 端建立连接,第二个参数如果不传递则使用本地的一个随机端口访问Server 端。这个方法是异步执行的,同样的,也可以同时连接多个服务端。
(4.)IoSession:
このインターフェイスは、サーバー側とクライアント側の間の接続を表すために使用され、IoAcceptor.accept() のときにインスタンスを返します。
このインターフェースには、以下の一般的に使用されるメソッドがあります:
A. WriteFuture write(Object message):
このメソッドはデータの書き込みに使用され、操作は非同期です。
B. CloseFuture close(boolean instant):
このメソッドは IoSession を閉じるために使用されます。この操作も非同期です。パラメータとして true を指定すると、すべての書き込み操作がフラッシュされた後に閉じられます。
C. オブジェクト setAttribute(Object key, Object value):
このメソッドは、HttpSession の setAttrbute() メソッドと同様に、セッション中に使用できるいくつかの属性を追加するために使用されます。 IoSession は内部で同期された HashMap を使用して、追加したカスタム属性を保存します。
D. SocketAddress getRemoteAddress():
このメソッドは、リモート接続のソケット アドレスを取得します。
E. void stopWrite():
このメソッドは書き込み操作を一時停止するために使用されるため、これとペアになっている voidresumeWrite() メソッドがあります。同じことが read() メソッドにも当てはまります。
F. ReadFuture read():
このメソッドはデータの読み取りに使用されますが、この非同期読み取りメソッドを使用するには、IoSessionConfig の setUseReadOperation(true) を呼び出す必要があります。通常、このメソッドは使用しません。これは、このメソッドの内部実装がサーバー側の場合、データを BlockingQueue に保存するためです。これは、クライアント側から送信された大量のデータがサーバー上でこのように読み取られるためです。側では、メモリリークが発生する可能性がありますが、クライアントにとっては、その方が便利な場合があります。
G. IoService getService():
このメソッドは、現在のセッション オブジェクトに関連付けられた IoService インスタンスを返します。
TCP 接続の終了について:
クライアント上でもサーバー上でも、基礎となる TCP 接続を表すために IoSession が使用されていることがわかります。サーバーまたはクライアントの IoSession で close() メソッドを呼び出した後です。これは、IoSession の close() が TCP 接続チャネルを閉じるだけで、サーバー側とクライアント側は閉じないためです。サイドプログラム。サーバーとクライアントを停止するには、IoService の destroy() メソッドを呼び出す必要があります。
(5.)IoSessionConfig: このメソッドは、このセッションの構成を指定するために使用されます。一般的に使用される次のメソッドがあります。
A. void setReadBufferSize(int size):
B. void setIdleTime(IdleStatus status, int idleTime):
このメソッドは、チャネルに関連付けられた読み取り、書き込み、または読み取りおよび書き込みイベントが指定された時間内に発生しないように設定し、チャネルはアイドル状態になります。このメソッドが呼び出されると、フィルターおよび IoHandler の sessionIdle() メソッドが idleTime ごとにコールバックされます。
C. void setWriteTimeout(int time):
このメソッドは、書き込み操作のタイムアウトを設定します。
D. void setUseReadOperation(boolean useReadOperation):
このメソッドは、IoSession の read() メソッドが使用可能かどうかを設定します。デフォルトは false です。
(6.)IoHandler:
このインターフェイスは、上記のサンプルコードからわかるように、データの読み取りとデータの送信は基本的にこのインターフェイス内で完了します。 IoService の 1 つのインスタンス (IoHandler インスタンスが IoService に挿入されていない場合、例外がスローされます)。次のメソッドがあります:
A. void sessionCreated(IoSession session):
このメソッドは、Session オブジェクトが作成されるときに呼び出されます。 TCP 接続の場合、接続が受け入れられたときに呼び出されますが、この時点では TCP 接続は確立されていないことに注意してください。このメソッドは文字通りの意味、つまり接続オブジェクト IoSession の作成時にのみ表されます。メソッドがコールバックされます。 UDP の場合、UDP はコネクションレスであるため、データ パケットの受信時にこのメソッドが呼び出されます。
B. void sessionOpened(IoSession session):
このメソッドは、接続が開かれたときに呼び出されます。このメソッドは、常に sessionCreated() メソッドの後に呼び出されます。 TCP の場合、接続が確立された後に呼び出され、認証操作の実行やデータの送信などが可能になります。 UDP の場合、このメソッドは sessionCreated() と変わりませんが、その直後に実行されます。時々データを送信する場合、sessionCreated() メソッドは初回のみ呼び出されますが、sessionOpened() メソッドは毎回呼び出されます。
C. void sessionClosed(IoSession session):
TCP の場合、このメソッドは接続が閉じられたときに呼び出されます。 UDP の場合、このメソッドは IoSession の close() メソッドが呼び出されたときにのみ破棄されます。
D. void sessionIdle(IoSession session, IdleStatus status):
このメソッドは、IoSession チャネルがアイドル状態になると呼び出されます。UDP プロトコルの場合、このメソッドは呼び出されません。
E. voidExceptionCaught(IoSession session, Throwable Cause):
このメソッドは、プログラムまたはミナ自体で例外が発生したときにコールバックします。通常、IoSession はここで閉じられます。
F. void messageReceived(IoSession session, Object message):
メッセージを受信するときに呼び出されるメソッド、つまり、メッセージを受信するために使用されるメソッドは、プロトコル コーデックを使用する場合に使用されます。必要な型にキャストできます。通常、上記の例と同様にプロトコル コーデックを使用します。これは、プロトコル コーデックが
TextLineCodecFactory であるため、メッセージを強制的に String 型に変換できるためです。
G. void messageSent(IoSession session, Object message):
このメソッドは、メッセージが正常に送信されたときに呼び出されます。メッセージが正常に送信された後は、このメソッドを使用してメッセージを送信できないことを意味します。メッセージ。
メッセージ送信のタイミング:
メッセージの送信は、sessionOpened() メソッドと messageReceived() メソッドの IoSession.write() メソッドを呼び出すことで完了する必要があります。 sessionOpened() メソッドでは TCP 接続が実際にオープンされており、同様に messageReceived() メソッドでも TCP 接続がオープンされていますが、この 2 つのタイミングは異なります。 sessionOpened() メソッドは、TCP 接続の確立後、データを受信する前に送信されます。messageReceived() メソッドは、データを受信した後に送信されます。受信したコンテンツの内容に基づいて、送信するデータの種類を決定できます。 。このインターフェイスにはメソッドが多すぎるため、通常は、関心のあるメソッドをカバーするためにアダプター モード IoHandlerAdapter が使用されます。
(7.)IoBuffer:
このインターフェイスは、JAVA NIO の ByteBuffer のカプセル化です。これは主に、ByteBuffer が基本データ型の読み取りおよび書き込み操作のみを提供し、文字列などのオブジェクト型の読み取りおよび書き込みを提供しないためです。 . メソッドを使用すると便利です。また、ByteBuffer を可変長にしたい場合は非常に面倒です。 IoBuffer の可変長実装は StringBuffer に似ています。 IoBuffer は、ByteBuffer と同様、スレッドセーフではありません。このセクションの内容が不明瞭な場合は、java.nio.ByteBuffer インターフェースを参照してください。このインターフェースには、以下の一般的に使用されるメソッドがあります:
A. static IoBuffer assign(int Capacity, boolean useDirectBuffer): このメソッドは、SimpleBufferAllocator を通じてインスタンスを内部的に作成し、2 番目のパラメーターは初期化容量を指定します。ダイレクト バッファまたは JAVA メモリ ヒープ キャッシュ領域。デフォルトは false です。
B. void free():
一部の IoBufferAllocator 実装で再利用できるようにバッファを解放します。一般に、パフォーマンスを向上させたい場合を除き、このメソッドを呼び出す必要はありません (ただし、その効果は明らかではない場合があります)。
C. IoBuffer setAutoExpand(boolean autoExpand):
このメソッドは、容量を自動的に拡張するように IoBuffer を設定します。これは、前述した可変長機能がデフォルトでは有効になっていないことがわかります。
D. IoBuffer setAutoShrink(boolean autoShrink):
このメソッドは、compact() メソッドの呼び出し後に未使用スペースの一部をカットできるように、IoBuffer を自動的に縮小するように設定します。このメソッドが呼び出されない場合、または false に設定されている場合は、shrink() メソッドを呼び出してスペースを手動で縮小することもできます。
このメソッドは、JAVA のデフォルトはビッグ エンディアンであり、C++ およびその他の言語は一般的にリトル エンディアンです。
F. IoBuffer asReadOnlyBuffer():
このメソッドは、IoBuffer を読み取り専用に設定します。
G. Boolean prefixedDataAvailable(int prefixLength, int maxDataLength):
このメソッドは、データの最初の 1、2、および 4 バイトがデータの長さを表す場合に使用されます。
maxDataLength は読み取られる最大バイト数を表します。返される結果は、式
remaining()-prefixLength>=maxDataLength によって異なります。これは、合計データ、つまり長さを表すバイト数です。残りのバイト数は、読み取られる予定のバイト数以上です。
H. String getPrefixedString(int prefixLength,CharsetDecoder decoder):
上記のメソッドが true を返した場合、このメソッドは長さを示すバイトの後にデータの読み取りを開始します。2 つのメソッドの prefixLength の値は保持される必要があることに注意してください。同じです。
GとHの2つのメソッドは、後述するPrefixedStringDecoderの内部実装で使用されます。
IoBuffer の残りのメソッドは ByteBuffer に似ていますが、次のようないくつかの便利な操作メソッドが追加されています:
IoBuffer putString(String value, CharsetEncoder encoder) は、指定されたエンコーディング メソッド、InputStream asInputStream() メソッドで文字列を便利に保存できます。 IoBuffer から入力ストリームへのデータなど。
(8.)IoFuture: Mina の多くの操作では、戻り値が XXXFuture であることがわかります。実際、このような戻り値を見ると、このメソッドはすべて IoFuture のサブクラスです。命令は非同期で実行され、主なサブクラスは ConnectFuture、CloseFuture、ReadFuture、および WriteFuture です。このインターフェースのほとんどの操作は、await()、awaitUninterruptibly() などの java.util.concurrent.Future インターフェースに似ています。一般に、非同期実行の結果が返されるのを待つために awaitUninterruptibly() メソッドを使用することがよくあります。 。このインターフェイスには、次の一般的に使用されるメソッドがあります:
A. IoFuture addListener(IoFutureListener6b3d0130bba23ae47fe2b8e8cddf0195listener):
このメソッドは、非同期実行の結果が返されるときに、コールバック メソッドoperationComplete(IoFuture future)を追加するために使用されます。つまり、これは awaitUninterruptibly() メソッドの代わりに非同期実行結果を待つ別のメソッドです。その利点は、ブロッキングが発生しないことです。
B. IoFuture RemoveListener(IoFutureListener6b3d0130bba23ae47fe2b8e8cddf0195 リスナー):
このメソッドは、指定されたリスナーを削除するために使用されます。
C. IoSession getSession():
このメソッドは現在の IoSession を返します。たとえば、クライアントで connect() メソッドを呼び出してサーバーにアクセスする場合、これは実際には非同期実行メソッドです。つまり、connect() メソッドを呼び出した直後に戻り、次のコードを実行します。つながっています
接成功。那么如果我想在连接成功之后执行一些事情(譬如:获取连接成功后的IoSession对象),该怎么办呢?按照上面的说明,你有如下两种办法:
第一种:
ConnectFuture future = connector.connect(new InetSocketAddress( HOSTNAME, PORT)); // 等待是否连接成功,相当于是转异步执行为同步执行。 future.awaitUninterruptibly(); // 连接成功后获取会话对象。如果没有上面的等待,由于connect()方法是异步的,session 可能会无法获取。 session = future.getSession();
第二种:
ConnectFuture future = connector.connect(new InetSocketAddress( HOSTNAME, PORT)); future.addListener(new IoFutureListener<ConnectFuture>() { @Override public void operationComplete(ConnectFuture future) { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } IoSession session = future.getSession(); System.out.println("++++++++++++++++++++++++++++"); } }); System.out.println("*************");
为了更好的看清楚使用监听器是异步的,而不是像awaitUninterruptibly()那样会阻塞主线程的执行,我们在回调方法中暂停5 秒钟,然后输出+++,在最后输出***。我们执行代码之后,你会发现首先输出***(这证明了监听器是异步执行的),然后IoSession 对象Created,系统暂停5 秒,然后输出+++,最后IoSession 对象Opened,也就是TCP 连接建立。
4.日志配置:
前面的示例代码中提到了使用SLF4J 作为日志门面,这是因为Mina 内部使用的就是SLF4J,你也使用SLF4J 可以与之保持一致性。Mina 如果想启用日志跟踪Mina 的运行细节,你可以配置LoggingFilter 过滤器,这样你可
以看到Session 建立、打开、空闲等一系列细节在日志中输出,默认SJF4J 是按照DEBUG级别输出跟踪信息的,如果你想给某一类别的Mina 运行信息输出指定日志输出级别,可以调用LoggingFilter 的setXXXLogLevel(LogLevel.XXX)。
例:
LoggingFilter lf = new LoggingFilter(); lf.setSessionOpenedLogLevel(LogLevel.ERROR); acceptor.getFilterChain().addLast("logger", lf);
这里IoSession 被打开的跟踪信息将以ERROR 级别输出到日志。
5.过滤器:
前面我们看到了LoggingFilter、ProtocolCodecFilter 两个过滤器,一个负责日志输出,一个负责数据的编解码,通过最前面的Mina 执行流程图,在IoProcessor 与IoHandler 之间可以有很多的过滤器,这种设计方式为你提供可插拔似的扩展功能提供了非常便利的方式,目前的Apache CXF、Apache Struts2 中的拦截器也都是一样的设计思路。Mina 中的IoFilter 是单例的,这与CXF、Apache Struts2 没什么区别。IoService 实例上会绑定一个DefaultIoFilterChainBuilder
实例,DefaultIoFilterChainBuilder 会把使用内部的EntryImpl 类把所有的过滤器按照顺序连在一起,组成一个过滤器链。
DefaultIoFilterChainBuilder 类如下常用的方法:
A. void addFirst(String name,IoFilter filter):
这个方法把过滤器添加到过滤器链的头部,头部就是IoProcessor 之后的第一个过滤器。同样的addLast()方法把过滤器添加到过滤器链的尾部。
B. void addBefore(String baseName,String name,IoFilter filter):
这个方法将过滤器添加到baseName 指定的过滤器的前面,同样的addAfter()方法把过滤器添加到baseName 指定的过滤器的后面。这里要注意无论是那种添加方法,每个过滤器的名字(参数name)必须是唯一的。
C. IoFilter remove(Stirng name):
这个方法移除指定名称的过滤器,你也可以调用另一个重载的remove()方法,指定要移除的IoFilter 的类型。
D. Lista9b6c012ae4ce6620825d53af49989c5 getAll():
这个方法返回当前IoService 上注册的所有过滤器。默认情况下,过滤器链中是空的,也就是getAll()方法返回长度为0 的List,但实际Mina内部有两个隐藏的过滤器:HeadFilter、TailFilter,分别在List 的最开始和最末端,很明显,TailFilter 在最末端是为了调用过滤器链之后,调用IoHandler。但这两个过滤器对你来说是透明的,可以忽略它们的存在。编写一个过滤器很简单,你需要实现IoFilter 接口,如果你只关注某几个方法,可以继承IoFilterAdapter 适配器类。IoFilter
接口中主要包含两类方法,一类是与IoHandler 中的方法名一致的方法,相当于拦截IoHandler 中的方法,另一类是IoFilter 的生命周期回调方法,这些回调方法的执行顺序和解释如下所示:
(1.)init()在首次添加到链中的时候被调用,但你必须将这个IoFilter 用
ReferenceCountingFilter 包装起来,否则init()方法永远不会被调用。
(2.)onPreAdd()在调用添加到链中的方法时被调用,但此时还未真正的加入到链。
(3.)onPostAdd()在调用添加到链中的方法后被调,如果在这个方法中有异常抛出,则过滤器会立即被移除,同时destroy()方法也会被调用(前提是使用ReferenceCountingFilter包装)。
(4.)onPreRemove()在从链中移除之前调用。
(5.)onPostRemove()在从链中移除之后调用。
(6.)destory()在从链中移除时被调用,使用方法与init()要求相同。
无论是哪个方法,要注意必须在实现时调用参数nextFilter 的同名方法,否则,过滤器链的执行将被中断,IoHandler 中的同名方法一样也不会被执行,这就相当于Servlet 中的Filter 必须调用filterChain.doFilter(request,response)才能继续前进是一样的道理。
示例:
public class MyIoFilter implements IoFilter { @Override public void destroy() throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%�stroy"); } @Override public void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%exceptionCaught"); nextFilter.exceptionCaught(session, cause); } @Override public void filterClose(NextFilter nextFilter, IoSession session) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%filterClose"); nextFilter.filterClose(session); } @Override public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%filterWrite"); nextFilter.filterWrite(session, writeRequest); } @Override public void init() throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%init"); } @Override public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%messageReceived"); nextFilter.messageReceived(session, message); } @Override public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%messageSent"); nextFilter.messageSent(session, writeRequest); } @Override public void onPostAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPostAdd"); } @Override public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPostRemove"); } @Override public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPreAdd"); } @Override public void onPreRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPreRemove"); } @Override public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionClosed"); nextFilter.sessionClosed(session); } @Override public void sessionCreated(NextFilter nextFilter, IoSession session) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionCreated"); nextFilter.sessionCreated(session); } @Override public void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionIdle"); nextFilter.sessionIdle(session, status); } @Override public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionOpened"); nextFilter.sessionOpened(session); } }
我们将这个拦截器注册到上面的TCPServer 的IoAcceptor 的过滤器链中的最后一个:
acceptor.getFilterChain().addLast("myIoFilter", new ReferenceCountingFilter(new MyIoFilter()));
这里我们将MyIoFilter 用ReferenceCountingFilter 包装起来,这样你可以看到init()、destroy()方法调用。我们启动客户端访问,然后关闭客户端,你会看到执行顺序如下所示:
init onPreAdd onPostAdd sessionCreated sessionOpened messageReceived filterClose sessionClosed onPreRemove onPostRemove destroy。
IoHandler 的对应方法会跟在上面的对应方法之后执行,这也就是说从横向(单独的看一个过滤器中的所有方法的执行顺序)上看,每个过滤器的执行顺序是上面所示的顺序;从纵向(方法链的调用)上看,如果有filter1、filter2 两个过滤器,sessionCreated()方法的执行顺序如下所示:
filter1-sessionCreated filter2-sessionCreated IoHandler-sessionCreated。
这里你要注意init、onPreAdd、onPostAdd 三个方法并不是在Server 启动时调用的,而是IoSession 对象创建之前调用的,也就是说IoFilterChain.addXXX()方法仅仅负责初始化过滤器并注册过滤器,但并不调用任何方法,包括init()初始化方法也是在IoProcessor 开始工作的时候被调用。IoFilter 是单例的,那么init()方法是否只被执行一次呢?这个是不一定的,因为IoFilter是被IoProcessor 调用的,而每个IoService 通常是关联多个IoProcessor,所以IoFilter的init()方法是在每个IoProcessor
线程上只执行一次。关于Mina 的线程问题,我们后面会详细讨论,这里你只需要清楚,init()与destroy()的调用次数与IoProceesor 的个数有关,假如一个IoService 关联了3 个IoProcessor,有五个并发的客户端请求,那么你会看到三次init()方法被调用,以后将不再会调用。Mina中自带的过滤器:
过滤器 说明
BlacklistFilter 设置一些IP 地址为黑名单,不允许访问。
BufferedWriteFilter 设置输出时像BufferedOutputStream 一样进行缓冲。
CompressionFilter 设置在输入、输出流时启用JZlib 压缩。
ConnectionThrottleFilter 这个过滤器指定同一个IP 地址(不含端口号)上的请求在多长的毫秒值内可以有一个请求,如果小于指定的时间间隔就有连续两个请求,那么第二个请求将被忽略(IoSession.close())。正如Throttle 的名字一样,调节访问的频率这个过滤器最好放在过滤器链的前面。
FileRegionWriteFilter 如果你想使用File 对象进行输出,请使用这个过滤器。要注意,你需要使用WriteFuture 或者在
messageSent() 方法中关闭File 所关联的FileChannel 通道。
StreamWriteFilter 如果你想使用InputStream 对象进行输出,请使用这个过滤器。要注意,你需要使用WriteFuture或者在messageSent()方法中关闭File 所关联的
FileChannel 通道。NoopFilter 这个过滤器什么也不做,如果你想测试过滤器链是否起作用,可以用它来测试。
ProfilerTimerFilter 这个过滤器用于检测每个事件方法执行的时间,所以最好放在过滤器链的前面。
ProxyFilter 这个过滤器在客户端使用ProxyConnector 作为实现时,会自动加入到过滤器链中,用于完成代理功能。
RequestResponseFilter 暂不知晓。
SessionAttributeInitializingFilter このフィルターは、初期化情報を配置するために、通常フィルターの前に配置される IoSession にいくつかの属性 (マップ) を配置します。
MdcInjectionFilter はログ出力に対して MDC 操作を実行します。LOG4J の MDC および NDC ドキュメントを参照できます。
WriteRequestFilter CompressionFilter および RequestResponseFilter の基本クラス。書き込みリクエストのフィルターをラップするために使用されます。
前の LoggingFilger ログ フィルターなど、いくつかのフィルターもありますが、これらについては各セクションで詳しく説明しますが、ここではリストされていません。
6. プロトコル コーデック:
前述したように、ネットワーク上で送信されるデータはバイナリ データ (バイト) であるため、Mina を使用する際に最も注意する必要があるのはプロトコル コーデックです。プログラム内で直面するのは JAVA オブジェクトであり、データを送信するときに JAVA オブジェクトをバイナリ データにエンコードし、データを受信するときにバイナリ データを JAVA オブジェクトにデコードする必要があります (これは、JAVA オブジェクトのシリアル化または逆シリアル化ではありません)。もの)。 mina のプロトコル コーデックは、ProtocolCodecFilter というフィルターを通じて構築されます。このフィルターの構築方法には、TextLineCodecFactory を先頭から登録する ProtocolCodecFactory が必要です。
コードが見られます。
ProtocolCodecFactory には次の 2 つのメソッドがあります:
public Interface ProtocolCodecFactory {
ProtocolEncoder getEncoder(IoSession session) throws Exception;
ProtocolDecoder getDecoder(IoSession session) throws Exception;
}
したがって、ProtocolCodecFactory の構築には ProtocolEncoder が必要です。 tocolDecoder 例が 2 つあります。 JAVA オブジェクトとバイナリ データの間で変換する方法を知りたいかもしれません。これは特定の通信プロトコルに依存します。つまり、サーバーとクライアントは、ネットワーク上で送信されるデータの形式について合意する必要があります。たとえば、最初のバイトはデータ長を表し、2 番目のバイトはデータ タイプを表します。以下は実データ (テキスト、画像など) で、最初のバイトで指定された指定の長さのデータを読み取るまで、長さに応じて 3 バイト目から逆方向に読み取ることができます。
簡単に言えば、HTTP プロトコルは、ブラウザと Web サーバーの間で合意された通信プロトコルであり、両者は指定されたプロトコルに従ってデータをエンコードおよびデコードします。より直感的に言えば、これまで使用してきた TextLine コーデックは、ネットワーク経由で渡されるデータを読み取るときに、どのバイトに ASCII 10 文字と 13 文字 (/r、/n) が格納されているかを検出するというものです。バイトは文字列とみなされます (デフォルトでは UTF-8 エンコーディングが使用されます)。上で述べたのは、さまざまなプロトコルは実際には 7 層のネットワーク構造におけるアプリケーション層プロトコルであり、Mina のプロトコル コーデックを使用すると、独自のプロトコル コーデックを実装できるようになります。アプリケーション層プロトコルスタック。
(6-1.) 簡単なコーデックの例:
以下に、通信事業者の SMS プロトコルをシミュレートするコーデックの実装を示します。
M sip:wap.fetion. .cn SIP-C/2.0
S: 1580101xxxx
R: 1889020xxxx
L: 21
Hello World!
这里的第一行表示状态行,一般表示协议的名字、版本号等,第二行表示短信的发送号码,第三行表示短信接收的号码,第四行表示短信的字节数,最后的内容就是短信的内容。上面的每一行的末尾使用ASC II 的10(/n)作为换行符,因为这是纯文本数据,协议要
求双方使用UTF-8 对字符串编解码。实际上如果你熟悉HTTP 协议,上面的这个精简的短信协议和HTTP 协议的组成是非常像的,第一行是状态行,中间的是消息报头,最后面的是消息正文。在解析这个短信协议之前,你需要知晓TCP 的一个事项,那就是数据的发送没有规模性,所谓的规模性就是作为数据的接收端,不知道到底什么时候数据算是读取完毕,所以应用层协议在制定的时候,必须指定数据读取的截至点。一般来说,有如下三种方式设置数据读取的长度:
(1.)使用分隔符,譬如:TextLine 编解码器。你可以使用/r、/n、NUL 这些ASC II 中的特殊的字符来告诉数据接收端,你只要遇见分隔符,就表示数据读完了,不用在那里傻等着不知道还有没有数据没读完啊?我可不可以开始把已经读取到的字节解码为指定的数据类型了啊?
(2.)定长的字节数,这种方式是使用长度固定的数据发送,一般适用于指令发送,譬如:数据发送端规定发送的数据都是双字节,AA 表示启动、BB 表示关闭等等。
(3.)在数据中的某个位置使用一个长度域,表示数据的长度,这种处理方式最为灵活,上面的短信协议中的那个L 就是短信文字的字节数,其实HTTP 协议的消息报头中的Content-Length 也是表示消息正文的长度,这样数据的接收端就知道我到底读到多长的
字节数就表示不用再读取数据了。相比较解码(字节转为JAVA 对象,也叫做拆包)来说,编码(JAVA 对象转为字节,也叫做打包)就很简单了,你只需要把JAVA 对象转为指定格式的字节流,write()就可以了。下面我们开始对上面的短信协议进行编解码处理。
第一步,协议对象:
public class SmsObject { private String sender;// 短信发送者 private String receiver;// 短信接受者 private String message;// 短信内容 public String getSender() { return sender; } public void setSender(String sender) { this.sender = sender; } public String getReceiver() { return receiver; } public void setReceiver(String receiver) { this.receiver = receiver; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } }
第二步,编码器:
在Mina 中编写编码器可以实现ProtocolEncoder,其中有encode()、dispose()两个方法需要实现。这里的dispose()方法用于在销毁编码器时释放关联的资源,由于这个方法一般我们并不关心,所以通常我们直接继承适配器ProtocolEncoderAdapter。
public class CmccSipcEncoder extends ProtocolEncoderAdapter { private final Charset charset; public CmccSipcEncoder(Charset charset) { this.charset = charset; } @Override public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception { SmsObject sms = (SmsObject) message; CharsetEncoder ce = charset.newEncoder(); IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true); String statusLine = "M sip:wap.fetion.com.cn SIP-C/2.0"; String sender = sms.getSender(); String receiver = sms.getReceiver(); String smsContent = sms.getMessage(); buffer.putString(statusLine + '/n', ce); buffer.putString("S: " + sender + '/n', ce); buffer.putString("R: " + receiver + '/n', ce); buffer .putString("L: " + (smsContent.getBytes(charset).length) + "/n", ce); buffer.putString(smsContent, ce); buffer.flip(); out.write(buffer); } }
这里我们依据传入的字符集类型对message 对象进行编码,编码的方式就是按照短信协议拼装字符串到IoBuffer 缓冲区,然后调用ProtocolEncoderOutput 的write()方法输出字节流。这里要注意生成短信内容长度时的红色代码,我们使用String 类与Byte[]类型之间的转换方法获得转为字节流后的字节数。
解码器的编写有以下几个步骤:
A. 将 encode()方法中的message 对象强制转换为指定的对象类型;
B. 创建IoBuffer 缓冲区对象,并设置为自动扩展;
C. 将转换后的message 对象中的各个部分按照指定的应用层协议进行组装,并put()到IoBuffer 缓冲区;
D. 当你组装数据完毕之后,调用flip()方法,为输出做好准备,切记在write()方法之前,要调用IoBuffer 的flip()方法,否则缓冲区的position 的后面是没有数据可以用来输出的,你必须调用flip()方法将position 移至0,limit 移至刚才的position。这个flip()方法的含义请参看java.nio.ByteBuffer。
E. 最后调用ProtocolEncoderOutput 的write()方法输出IoBuffer 缓冲区实例。
第三步,解码器:
在Mina 中编写解码器,可以实现ProtocolDecoder 接口,其中有decode()、finishDecode()、dispose()三个方法。这里的finishDecode()方法可以用于处理在IoSession 关闭时剩余的未读取数据,一般这个方法并不会被使用到,除非协议中未定义任何标识数据什么时候截止的约定,譬如:Http 响应的Content-Length 未设定,那么在你认为读取完数据后,关闭TCP连接(IoSession 的关闭)后,就可以调用这个方法处理剩余的数据,当然你也可以忽略调剩余的数据。同样的,一般情况下,我们只需要继承适配器ProtocolDecoderAdapter,关注decode()方法即可。但前面说过解码器相对编码器来说,最麻烦的是数据发送过来的规模,以聊天室为例,一个TCP
连接建立之后,那么隔一段时间就会有聊天内容发送过来,也就是decode()方法会被往复调用,这样处理起来就会非常麻烦。那么Mina 中幸好提供了CumulativeProtocolDecoder类,从名字上可以看出累积性的协议解码器,也就是说只要有数据发送过来,这个类就会去读取数据,然后累积到内部的IoBuffer 缓冲区,但是具体的拆包(把累积到缓冲区的数据解码为JAVA 对象)交由子类的doDecode()方法完成,实际上CumulativeProtocolDecoder就是在decode()反复的调用暴漏给子类实现的doDecode()方法。
具体执行过程如下所示:
A. 你的doDecode()方法返回true 时,CumulativeProtocolDecoder 的decode()方法会首先判断你是否在doDecode()方法中从内部的IoBuffer 缓冲区读取了数据,如果没有,则会抛出非法的状态异常,也就是你的doDecode()方法返回true 就表示你已经消费了本次数据(相当于聊天室中一个完整的消息已经读取完毕),进一步说,也就是此时你必须已经消费过内部的IoBuffer 缓冲区的数据(哪怕是消费了一个字节的数据)。如果验证过通过,那么CumulativeProtocolDecoder
会检查缓冲区内是否还有数据未读取,如果有就继续调用doDecode()方法,没有就停止对doDecode()方法的调用,直到有新的数据被缓冲。
B. 当你的doDecode()方法返回false 时,CumulativeProtocolDecoder 会停止对doDecode()方法的调用,但此时如果本次数据还有未读取完的,就将含有剩余数据的IoBuffer 缓冲区保存到IoSession 中,以便下一次数据到来时可以从IoSession 中提取合并。如果发现本次数据全都读取完毕,则清空IoBuffer 缓冲区。简而言之,当你认为读取到的数据已经够解码了,那么就返回true,否则就返回false。这个CumulativeProtocolDecoder 其实最重要的工作就是帮你完成了数据的累积,因为这个工作是很烦琐的。
public class CmccSipcDecoder extends CumulativeProtocolDecoder { private final Charset charset; public CmccSipcDecoder(Charset charset) { this.charset = charset; } @Override protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true); CharsetDecoder cd = charset.newDecoder(); int matchCount = 0; String statusLine = "", sender = "", receiver = "", length = "", sms = ""; int i = 1; while (in.hasRemaining()) { byte b = in.get(); buffer.put(b); if (b == 10 && i < 5) { matchCount++; if (i == 1) { buffer.flip(); statusLine = buffer.getString(matchCount, cd); statusLine = statusLine.substring(0, statusLine.length() - 1); matchCount = 0; buffer.clear(); } if (i == 2) { buffer.flip(); sender = buffer.getString(matchCount, cd); sender = sender.substring(0, sender.length() -1); matchCount = 0; buffer.clear(); } if (i == 3) { buffer.flip(); receiver = buffer.getString(matchCount, cd); receiver = receiver.substring(0, receiver.length() 1); matchCount = 0; buffer.clear(); } if (i == 4) { buffer.flip(); length = buffer.getString(matchCount, cd); length = length.substring(0, length.length() -1); matchCount = 0; buffer.clear(); } i++; } else if (i == 5) { matchCount++; if (matchCount == Long.parseLong(length.split(": ")[1])) { buffer.flip(); sms = buffer.getString(matchCount, cd); i++; break; } } else { matchCount++; } } SmsObject smsObject = new SmsObject(); smsObject.setSender(sender.split(": ")[1]); smsObject.setReceiver(receiver.split(": ")[1]); smsObject.setMessage(sms); out.write(smsObject); return false; } }
我们的这个短信协议解码器使用/n(ASCII 的10 字符)作为分解点,一个字节一个字节的读取,那么第一次发现/n 的字节位置之前的部分,必然就是短信协议的状态行,依次类推,你就可以解析出来发送者、接受者、短信内容长度。然后我们在解析短信内容时,使用获取到的长度进行读取。全部读取完毕之后, 然后构造SmsObject 短信对象, 使用ProtocolDecoderOutput
的write()方法输出,最后返回false,也就是本次数据全部读取完毕,告知CumulativeProtocolDecoder 在本次数据读取中不需要再调用doDecode()方法了。这里需要注意的是两个状态变量i、matchCount,i 用于记录解析到了短信协议中的哪一行(/n),matchCount 记录在当前行中读取到了哪一个字节。状态变量在解码器中经常被使用,我们这里的情况比较简单,因为我们假定短信发送是在一次数据发送中完成的,所以状态变量的使用也比较简单。假如数据的发送被拆成了多次(譬如:短信协议的短信内容、消息报头被拆成了两次数据发送),那么上面的代码势必就会存在问题,因为当第二次调用doDecode()方法时,状态变量i、matchCount
势必会被重置,也就是原来的状态值并没有被保存。那么我们如何解决状态保存的问题呢?答案就是将状态变量保存在IoSession 中或者是Decoder 实例自身,但推荐使用前者,因为虽然Decoder 是单例的,其中的实例变量保存的状态在Decoder 实例销毁前始终保持,但Mina 并不保证每次调用doDecode()方法时都是同一个线程(这也就是说第一次调用doDecode()是IoProcessor-1 线程,第二次有可能就是IoProcessor-2 线程),这就会产生多线程中的实例变量的可视性(Visibility,具体请参考JAVA
的多线程知识)问题。IoSession中使用一个同步的HashMap 保存对象,所以你不需要担心多线程带来的问题。使用IoSession 保存解码器的状态变量通常的写法如下所示:
A. 在解码器中定义私有的内部类Context,然后将需要保存的状态变量定义在Context 中存储。
B. 在解码器中定义方法获取这个Context 的实例,这个方法的实现要优先从IoSession 中获取Context。
具体代码示例如下所示:
// 上下文作为保存状态的内部类的名字,意思很明显,就是让状态跟随上下文,在整个调用过程中都可以被保持。
public class XXXDecoder extends CumulativeProtocolDecoder{ private final AttributeKey CONTEXT = new AttributeKey(getClass(), "context" ); public Context getContext(IoSession session){ Context ctx=(Context)session.getAttribute(CONTEXT); if(ctx==null){ ctx=new Context(); session.setAttribute(CONTEXT,ctx); } } private class Context { //状态变量 } }
注意这里我们使用了Mina 自带的AttributeKey 类来定义保存在IoSession 中的对象的键值,这样可以有效的防止键值重复。另外,要注意在全部处理完毕之后,状态要复位,譬如:聊天室中的一条消息读取完毕之后,状态变量要变为初始值,以便下次处理时重新使用。
第四步,编解码工厂:
public class CmccSipcCodecFactory implements ProtocolCodecFactory { private final CmccSipcEncoder encoder; private final CmccSipcDecoder decoder; public CmccSipcCodecFactory() { this(Charset.defaultCharset()); } public CmccSipcCodecFactory(Charset charSet) { this.encoder = new CmccSipcEncoder(charSet); this.decoder = new CmccSipcDecoder(charSet); } @Override public ProtocolDecoder getDecoder(IoSession session) throws Exception { return decoder; } @Override public ProtocolEncoder getEncoder(IoSession session) throws Exception { return encoder; } }
实际上这个工厂类就是包装了编码器、解码器,通过接口中的getEncoder()、getDecoder()方法向ProtocolCodecFilter 过滤器返回编解码器实例,以便在过滤器中对数据进行编解码处理。
第五步,运行示例:
下面我们修改最一开始的示例中的MyServer、MyClient 的代码,如下所示:
acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new CmccSipcCodecFactory(Charset .forName("UTF-8")))); connector.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new CmccSipcCodecFactory( Charset.forName("UTF-8")))); 然后我们在ClientHandler 中发送一条短信: public void sessionOpened(IoSession session) { SmsObject sms = new SmsObject(); sms.setSender("15801012253"); sms.setReceiver("18869693235"); sms.setMessage("你好!Hello World!"); session.write(sms); }
最后我们在MyIoHandler 中接收这条短信息:
public void messageReceived(IoSession session, Object message) throws Exception { SmsObject sms = (SmsObject) message; log.info("The message received is [" + sms.getMessage() + "]"); }
你会看到Server 端的控制台输出如下信息:
The message received is [你好!Hello World!]
(6-2.)复杂的解码器:
下面我们讲解一下如何在解码器中保存状态变量,也就是真正的实现上面所说的Context。
我们假设这样一种情况,有两条短信:
M sip:wap.fetion.com.cn SIP-C/2.0
S: 1580101xxxx
R: 1889020xxxx
L: 21
Hello World!
M sip:wap.fetion.com.cn SIP-C/2.0
S: 1580101xxxx
R: 1889020xxxx
L: 21
Hello World!
他们按照上面的颜色标识发送,也就是说红色部分、蓝色部分、绿色部分分别发送(调用三次IoSession.write()方法),那么如果你还用上面的CmccSipcDecoder,将无法工作,因为第一次数据流(红色部分)发送过取时,数据是不完整的,无法解析出一条短信息,当二次数据流(蓝色部分)发送过去时,已经可以解析出第一条短信息了,但是第二条短信还是不完整的,需要等待第三次数据流(绿色部分)的发送。注意:由于模拟数据发送的规模性问题很麻烦,所以这里采用了这种极端的例子说明问题,虽不具有典型性,但很能说明问题,这就足够了,所以不要追究这种发送消息是否在真实环境中存在,更不要追究其合理性。
CmccSispcDecoder 类改为如下的写法:
public class CmccSipcDecoder extends CumulativeProtocolDecoder { private final Charset charset; private final AttributeKey CONTEXT = new AttributeKey(getClass(), "context"); public CmccSipcDecoder(Charset charset) { this.charset = charset; } @Override protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { Context ctx = getContext(session); CharsetDecoder cd = charset.newDecoder(); int matchCount = ctx.getMatchCount(); int line = ctx.getLine(); IoBuffer buffer = ctx.innerBuffer; String statusLine = ctx.getStatusLine(), sender = ctx.getSender(), receiver = ctx.getReceiver(), length = ctx.getLength(), sms = ctx.getSms(); while (in.hasRemaining()) { byte b = in.get(); matchCount++; buffer.put(b); if (line < 4 && b == 10) { if (line == 0) { buffer.flip(); statusLine = buffer.getString(matchCount, cd); statusLine = statusLine.substring(0, statusLine.length() - 1); matchCount = 0; buffer.clear(); ctx.setStatusLine(statusLine); } if (line == 1) { buffer.flip(); sender = buffer.getString(matchCount, cd); sender = sender.substring(0, sender.length() - 1); matchCount = 0; buffer.clear(); ctx.setSender(sender); } if (line == 2) { buffer.flip(); receiver = buffer.getString(matchCount, cd); receiver = receiver.substring(0, receiver.length() - 1); matchCount = 0; buffer.clear(); ctx.setReceiver(receiver); } if (line == 3) { buffer.flip(); length = buffer.getString(matchCount, cd); length = length.substring(0, length.length() - 1); matchCount = 0; buffer.clear(); ctx.setLength(length); } line++; } else if (line == 4) { if (matchCount == Long.parseLong(length.split(": ")[1])) { buffer.flip(); sms = buffer.getString(matchCount, cd); ctx.setSms(sms); // 由于下面的break,这里需要调用else外面的两行代码 ctx.setMatchCount(matchCount); ctx.setLine(line); break; } } ctx.setMatchCount(matchCount); ctx.setLine(line); } if (ctx.getLine() == 4 && Long.parseLong(ctx.getLength().split(": ")[1]) == ctx .getMatchCount()) { SmsObject smsObject = new SmsObject(); smsObject.setSender(sender.split(": ")[1]); smsObject.setReceiver(receiver.split(": ")[1]); smsObject.setMessage(sms); out.write(smsObject); ctx.reset(); return true; } else { return false; } } private Context getContext(IoSession session) { Context context = (Context) session.getAttribute(CONTEXT); if (context == null){ context = new Context(); session.setAttribute(CONTEXT, context); } return context; } private class Context { private final IoBuffer innerBuffer; private String statusLine = ""; private String sender = ""; private String receiver = ""; private String length = ""; private String sms = ""; public Context() { innerBuffer = IoBuffer.allocate(100).setAutoExpand(true); } private int matchCount = 0; private int line = 0; public int getMatchCount() { return matchCount; } public void setMatchCount(int matchCount) { this.matchCount = matchCount; } public int getLine() { return line; } public void setLine(int line) { this.line = line; } public String getStatusLine() { return statusLine; } public void setStatusLine(String statusLine) { this.statusLine = statusLine; } public String getSender() { return sender; } public void setSender(String sender) { this.sender = sender; } public String getReceiver() { return receiver; } public void setReceiver(String receiver) { this.receiver = receiver; } public String getLength() { return length; } public void setLength(String length) { this.length = length; } public String getSms() { return sms; } public void setSms(String sms) { this.sms = sms; } public void reset() { this.innerBuffer.clear(); this.matchCount = 0; this.line = 0; this.statusLine = ""; this.sender = ""; this.receiver = ""; this.length = ""; this.sms = ""; } } }
这里我们做了如下的几步操作:
(1.) 所有记录状态的变量移到了Context 内部类中,包括记录读到短信协议的哪一行的line。每一行读取了多少个字节的matchCount,还有记录解析好的状态行、发送者、接受者、短信内容、累积数据的innerBuffer 等。这样就可以在数据不能完全解码,等待下一次doDecode()方法的调用时,还能承接上一次调用的数据。
(2.) 在 doDecode()方法中主要的变化是各种状态变量首先是从Context 中获取,然后操作之后,将最新的值setXXX()到Context 中保存。
(3.) 这里注意doDecode()方法最后的判断,当认为不够解码为一条短信息时,返回false,也就是在本次数据流解码中不要再调用doDecode()方法;当认为已经解码出一条短信息时,输出短消息,然后重置所有的状态变量,返回true,也就是如果本次数据流解码中还有没解码完的数据,继续调用doDecode()方法。下面我们对客户端稍加改造,来模拟上面的红、蓝、绿三次发送聊天短信息的情况:
MyClient:
ConnectFuture future = connector.connect(new InetSocketAddress( HOSTNAME, PORT)); future.awaitUninterruptibly(); session = future.getSession(); for (int i = 0; i < 3; i++) { SmsObject sms = new SmsObject(); session.write(sms); System.out.println("****************" + i); }
这里我们为了方便演示,不在IoHandler 中发送消息,而是直接在MyClient 中发送,你要注意的是三次发送都要使用同一个IoSession,否则就不是从同一个通道发送过去的了。
CmccSipcEncoder:
public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception { SmsObject sms = (SmsObject) message; CharsetEncoder ce = charset.newEncoder(); String statusLine = "M sip:wap.fetion.com.cn SIP-C/2.0"; String sender = "15801012253"; String receiver = "15866332698"; String smsContent = "你好!Hello World!"; IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true); buffer.putString(statusLine + '/n', ce); buffer.putString("S: " + sender + '/n', ce); buffer.putString("R: " + receiver + '/n', ce); buffer.flip(); out.write(buffer); IoBuffer buffer2 = IoBuffer.allocate(100).setAutoExpand(true); buffer2.putString("L: " + (smsContent.getBytes(charset).length) + "/n",ce); buffer2.putString(smsContent, ce); buffer2.putString(statusLine + '/n', ce); buffer2.flip(); out.write(buffer2); IoBuffer buffer3 = IoBuffer.allocate(100).setAutoExpand(true); buffer3.putString("S: " + sender + '/n', ce); buffer3.putString("R: " + receiver + '/n', ce); buffer3.putString("L: " + (smsContent.getBytes(charset).length) + "/n",ce); buffer3.putString(smsContent, ce); buffer3.putString(statusLine + '/n', ce); buffer3.flip(); out.write(buffer3); }
上面的这段代码要配合MyClient来操作,你需要做的是在MyClient中的红色输出语句处设置断点,然后第一调用时CmccSipcEncoder中注释掉蓝、绿色的代码,也就是发送两条短信息的第一部分(红色的代码),依次类推,也就是MyClient的中的三次断点中,分别执行CmccSipcEncoder中的红、蓝、绿三段代码,也就是模拟两条短信的三段发送。你会看到Server端的运行结果是:当MyClient第一次到达断点时,没有短信息被读取到,当MyClient第二次到达断点时,第一条短信息输出,当MyClient第三次到达断点时,第二条短信息输出。
Mina中自带的解码器:
解码器 说明
CumulativeProtocolDecoder 累积性解码器,上面我们重点说明了这个解码器的用法。
SynchronizedProtocolDecoder 这个解码器用于将任何一个解码器包装为一个线程安全的解码器,用于解决上面说的每次执行decode()方法时可能线程不是上一次的线程的问题,但这样会在高并发时,大大降低系统的性能。
TextLineDecoder 按照文本的换行符( Windows:/r/n 、Linux:/n、Mac:/r)解码数据。
PrefixedStringDecoder 这个类继承自CumulativeProtocolDecoder类,用于读取数据最前端的1、2、4 个字节表示后面的数据长度的数据。譬如:一个段数据的前两个字节表示后面的真实数据的长度,那么你就可以用这个方法进行解码。
(6-3.)多路分离的解码器:
假设一段数据发送过来之后,需要根据某种条件决定使用哪个解码器,而不是像上面的例子,固定使用一个解码器,那么该如何做呢?幸好Mina 提供了org.apache.mina.filter.codec.demux 包来完成这种多路分离(Demultiplexes)的解码工作,也就是同时注册多个解码器,然后运行时依据传入的数据决定到底使用哪个解码器来工作。所谓多路分离就是依据条件分发到指定的解码器,譬如:上面的短信协议进行扩展,可以依据状态行来判断使用1.0 版本的短信协议解码器还是2.0版本的短信协议解码器。
下面我们使用一个简单的例子,说明这个多路分离的解码器是如何使用的,需求如下所示:
(1.) 客户端传入两个int 类型的数字,还有一个char 类型的符号。
(2.) 如果符号是+,服务端就是用1 号解码器,对两个数字相加,然后把结果返回给客户端。
(3.) 如果符号是-,服务端就使用2 号解码器,将两个数字变为相反数,然后相加,把结果返回给客户端。
Demux 开发编解码器主要有如下几个步骤:
A. 定义Client 端、Server 端发送、接收的数据对象。
B. 使用Demux 编写编码器是实现MessageEncoder8742468051c85b06f0a0af9e3e506b5c接口,T 是你要编码的数据对象,这个MessageEncoder 会在DemuxingProtocolEncoder 中调用。
C. 使用Demux 编写编码器是实现MessageDecoder 接口,这个MessageDecoder 会在DemuxingProtocolDecoder 中调用。
D. 在 DemuxingProtocolCodecFactory 中调用addMessageEncoder()、addMessageDecoder()方法组装编解码器。
MessageEncoder的接口如下所示:
public interface MessageEncoder<T> { void encode(IoSession session, T message, ProtocolEncoderOutput out) throws Exception; }
你注意到消息编码器接口与在ProtocolEncoder 中没什么不同,区别就是Object message被泛型具体化了类型,你不需要手动的类型转换了。
MessageDecoder的接口如下所示:
public interface MessageDecoder { static MessageDecoderResult OK = MessageDecoderResult.OK; static MessageDecoderResult NEED_DATA = MessageDecoderResult.NEED_DATA; static MessageDecoderResult NOT_OK = MessageDecoderResult.NOT_OK; MessageDecoderResult decodable(IoSession session, IoBuffer in); MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception; void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception; }
(1.)decodable()方法有三个返回值,分别表示如下的含义:
A. MessageDecoderResult.NOT_OK:表示这个解码器不适合解码数据,然后检查其它解码器,如果都不满足会抛异常;
B. MessageDecoderResult.NEED_DATA:表示当前的读入的数据不够判断是否能够使用这个解码器解码,然后再次调用decodable()方法检查其它解码器,如果都是NEED_DATA,则等待下次输入;
C. MessageDecoderResult.OK: 表示这个解码器可以解码读入的数据, 然后则调用MessageDecoder 的decode()方法。这里注意decodable()方法对参数IoBuffer in 的任何操作在方法结束之后,都会复原,也就是你不必担心在调用decode()方法时,position 已经不在缓冲区的起始位置。这个方法相当于是预读取,用于判断是否是可用的解码器。
(2.)decode()方法有三个返回值,分别表示如下的含义:
A. MessageDecoderResult.NOT_OK:表示解码失败,会抛异常;
B. MessageDecoderResult.NEED_DATA:表示数据不够,需要读到新的数据后,再次调用decode()方法。
C. MessageDecoderResult.OK:表示解码成功。
代码演示:
(1.)客户端发送的数据对象:
public class SendMessage { private int i = 0; private int j = 0; private char symbol = '+'; public char getSymbol() { return symbol; } public void setSymbol(char symbol) { this.symbol = symbol; } public int getI() { return i; } public void setI(int i) { this.i = i; } public int getJ() { return j; } public void setJ(int j) { this.j = j; } }
(2.)服务端发送的返回结果对象:
public class ResultMessage { private int result = 0; public int getResult() { return result; } public void setResult(int result) { this.result = result; } }
(3.)客户端使用的SendMessage的编码器:
public class SendMessageEncoder implements MessageEncoder<SendMessage> { @Override public void encode(IoSession session, SendMessage message, ProtocolEncoderOutput out) throws Exception { IoBuffer buffer = IoBuffer.allocate(10); buffer.putChar(message.getSymbol()); buffer.putInt(message.getI()); buffer.putInt(message.getJ()); buffer.flip(); out.write(buffer); } }
这里我们的SendMessage、ResultMessage 中的字段都是用长度固定的基本数据类型,这样IoBuffer 就不需要自动扩展了,提高性能。按照一个char、两个int 计算,这里的IoBuffer只需要10 个字节的长度就可以了。
(4.)服务端使用的SendMessage的1号解码器:
public class SendMessageDecoderPositive implements MessageDecoder { @Override public MessageDecoderResult decodable(IoSession session, IoBuffer in) { if (in.remaining() < 2) return MessageDecoderResult.NEED_DATA; else { char symbol = in.getChar(); if (symbol == '+') { return MessageDecoderResult.OK; } else { return MessageDecoderResult.NOT_OK; } } } @Override public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { SendMessage sm = new SendMessage(); sm.setSymbol(in.getChar()); sm.setI(in.getInt()); sm.setJ(in.getInt()); out.write(sm); return MessageDecoderResult.OK; } @Override public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception { // undo } }
因为客户端发送的SendMessage 的前两个字节(char)就是符号位,所以我们在decodable()方法中对此条件进行了判断,之后读到两个字节,并且这两个字节表示的字符是+时,才认为这个解码器可用。
(5.)服务端使用的SendMessage的2号解码器:
public class SendMessageDecoderNegative implements MessageDecoder { @Override public MessageDecoderResult decodable(IoSession session, IoBuffer in) { if (in.remaining() < 2) return MessageDecoderResult.NEED_DATA; else { char symbol = in.getChar(); if (symbol == '-') { return MessageDecoderResult.OK; } else { return MessageDecoderResult.NOT_OK; } } } @Override public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { SendMessage sm = new SendMessage(); sm.setSymbol(in.getChar()); sm.setI(-in.getInt()); sm.setJ(-in.getInt()); out.write(sm); return MessageDecoderResult.OK; } @Override public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception { // undo } }
(6.)服务端使用的ResultMessage的编码器:
public class ResultMessageEncoder implements MessageEncoder<ResultMessage> { @Override public void encode(IoSession session, ResultMessage message, ProtocolEncoderOutput out) throws Exception { IoBuffer buffer = IoBuffer.allocate(4); buffer.putInt(message.getResult()); buffer.flip(); out.write(buffer); } }
(7.)客户端使用的ResultMessage的解码器:
public class ResultMessageDecoder implements MessageDecoder { @Override public MessageDecoderResult decodable(IoSession session, IoBuffer in) { if (in.remaining() < 4) return MessageDecoderResult.NEED_DATA; else if (in.remaining() == 4) return MessageDecoderResult.OK; else return MessageDecoderResult.NOT_OK; } @Override public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { ResultMessage rm = new ResultMessage(); rm.setResult(in.getInt()); out.write(rm); return MessageDecoderResult.OK; } @Override public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception { // undo } }
(8.)组装这些编解码器的工厂:
public class MathProtocolCodecFactory extends DemuxingProtocolCodecFactory { public MathProtocolCodecFactory(boolean server) { if (server) { super.addMessageEncoder(ResultMessage.class, ResultMessageEncoder.class); super.addMessageDecoder(SendMessageDecoderPositive.class); super.addMessageDecoder(SendMessageDecoderNegative.class); } else { super .addMessageEncoder(SendMessage.class, SendMessageEncoder.class); super.addMessageDecoder(ResultMessageDecoder.class); } } }
这个工厂类我们使用了构造方法的一个布尔类型的参数,以便其可以在Server 端、Client端同时使用。我们以Server 端为例,你可以看到调用两次addMessageDecoder()方法添加了1 号、2 号解码器,其实DemuxingProtocolDecoder 内部在维护了一个MessageDecoder数组,用于保存添加的所有的消息解码器,每次decode()的时候就调用每个MessageDecoder的decodable()方法逐个检查,只要发现一个MessageDecoder
不是对应的解码器,就从数组中移除,直到找到合适的MessageDecoder,如果最后发现数组为空,就表示没找到对应的MessageDecoder,最后抛出异常。
(9.)Server端:
public class Server { public static void main(String[] args) throws Exception { IoAcceptor acceptor = new NioSocketAcceptor(); LoggingFilter lf = new LoggingFilter(); acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 5); acceptor.getFilterChain().addLast("logger", lf); acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MathProtocolCodecFactory(true))); acceptor.setHandler(new ServerHandler()); acceptor.bind(new InetSocketAddress(9123)); } }
(10.)Server端使用的IoHandler:
public class ServerHandler extends IoHandlerAdapter { private final static Logger log = LoggerFactory .getLogger(ServerHandler.class); @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { session.close(true); } @Override public void messageReceived(IoSession session, Object message) throws Exception { SendMessage sm = (SendMessage) message; log.info("The message received is [ " + sm.getI() + " " + sm.getSymbol() + " " + sm.getJ() + " ]"); ResultMessage rm = new ResultMessage(); rm.setResult(sm.getI() + sm.getJ()); session.write(rm); } }
(11.)Client端:
public class Client { public static void main(String[] args) throws Throwable { IoConnector connector = new NioSocketConnector(); connector.setConnectTimeoutMillis(30000); connector.getFilterChain().addLast("logger", new LoggingFilter()); connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MathProtocolCodecFactory(false))); connector.setHandler(new ClientHandler()); connector.connect(new InetSocketAddress("localhost", 9123)); } }
(12.)Client端的IoHandler:
public class ClientHandler extends IoHandlerAdapter { private final static Logger LOGGER = LoggerFactory .getLogger(ClientHandler.class); @Override public void sessionOpened(IoSession session) throws Exception { SendMessage sm = new SendMessage(); sm.setI(100); sm.setJ(99); sm.setSymbol('+'); session.write(sm); } @Override public void messageReceived(IoSession session, Object message) { ResultMessage rs = (ResultMessage) message; LOGGER.info(String.valueOf(rs.getResult())); } }
你尝试改变(12.)中的红色代码中的正负号,会看到服务端使用了两个不同的解码器对其进行处理。
7.线程模型配置:
Mina 中的很多执行环节都使用了多线程机制,用于提高性能。Mina 中默认在三个地方使用了线程:
(1.) IoAcceptor:
这个地方用于接受客户端的连接建立,每监听一个端口(每调用一次bind()方法),都启用一个线程,这个数字我们不能改变。这个线程监听某个端口是否有请求到来,一旦发现,则创建一个IoSession 对象。因为这个动作很快,所以有一个线程就够了。
(2.) IoConnector:
这个地方用于与服务端建立连接,每连接一个服务端(每调用一次connect()方法),就启用一个线程,我们不能改变。同样的,这个线程监听是否有连接被建立,一旦发现,则创建一个IoSession 对象。因为这个动作很快,所以有一个线程就够了。
(3.) IoProcessor:
这个地方用于执行真正的IO 操作,默认启用的线程个数是CPU 的核数+1,譬如:单CPU 双核的电脑,默认的IoProcessor 线程会创建3 个。这也就是说一个IoAcceptor 或者IoConnector 默认会关联一个IoProcessor 池,这个池中有3 个IoProcessor。因为IO 操作耗费资源,所以这里使用IoProcessor 池来完成数据的读写操作,有助于提高性能。这也就是前面说的IoAccetor、IoConnector 使用一个Selector,而IoProcessor 使用自己单独的Selector
的原因。那么为什么IoProcessor 池中的IoProcessor 数量只比CPU 的核数大1 呢?因为IO 读写操作是耗费CPU 的操作,而每一核CPU 同时只能运行一个线程,因此IoProcessor 池中的IoProcessor 的数量并不是越多越好。
この IoProcessor の数は次のように調整できます:
IoAcceptor acceptor=new NioSocketAcceptor(5);
IoConnector Connector=new NioSocketConnector(5);
これにより、IoProcessor プールの数が 5 に変更されます。つまり、処理できます。 5 つの読み取りおよび書き込み操作を同時に実行します。前に、Mina のデコーダは Decoder 自体ではなく IoSession を使用して状態変数を保存していると述べたことを思い出してください。これは、Mina が同じ IoProcessor が毎回 doDecode() メソッドを実行することを保証していないためでしょうか。実際、この問題の根本原因は、IoSession がアイドル状態になる (データの読み取りが行われない) たびに、IoProcessor がプールにリサイクルされ、IoSession がビジー状態になることです。再びアイドル状態ステータスから、IoProcessor
IoProcessor インスタンスは再び割り当てられますが、現時点では、それがまだ最後のビジー状態の IoProcessor であるという保証はありません。 IoAcceptor と IoConnector にもコンストラクターがあることがわかります。 java.util.concurrent.Executor クラスをスレッド プール オブジェクトとして指定できます。では、このスレッド プール オブジェクトは何に使用されるのでしょうか。実際、(1.) と (2.) で、TCP 接続が確立されているかどうかを監視するために使用されるスレッドを作成するために使用されます。デフォルトでは、Executors.newCachedThreadPool() メソッドを使用して Executor インスタンスが作成されます。これは無制限のスレッド プールです (詳細については、JAVA を参照してください)。
同時実行ライブラリ)。この Executor インスタンスは変更せず、組み込みのものを使用してください。そうしないと、特定のアクセス レベルでパフォーマンスが突然低下するなど、説明できない問題が発生する可能性があります。無制限のスレッド プールは作成されたソケットと同じ数のスレッドを割り当てるため、Executor でスレッド プールを作成する他の方法に変更して制限付きスレッド プールを作成すると、一部のリクエストが時間内に応答されなくなり、何らかの疑問が生じます。
Mina のワークフローの完全な概要を説明します。
(1.) IoService インスタンスが作成されると、IoService に関連付けられた IoProcessor プールとスレッド プールも作成されます。
(2.) IoService がスイートを確立するとき、インターフェース (IoAcceptor の binding() または IoConnector の connect() メソッドが呼び出される)、IoService はスレッド プールからスレッドを取り出し、ソケット ポートをリッスンします
(3.) IoService がソケットをリッスンするとき 接続要求が行われるとき、 IoSession オブジェクトが確立され、IoProcessor インスタンスが IoProcessor プールから取り出され、このセッション チャネルでフィルターと IoHandler が実行されます。
(4.) この IoSession チャネルがアイドル状態になるか閉じられると、IoProcessor はリサイクルされます。上記は、Mina のデフォルトのスレッド動作モードなので、ここで説明するのは、IoProcessor のマルチスレッド動作モードを構成する方法です。 IoProcessor はセッション上のすべてのフィルターと IoHandler の実行、つまり IO 読み取りおよび書き込み操作を担当するため、シングルスレッドの作業メソッドです (つまり、順番に 1 つずつ実行されます)。イベント メソッド (たとえば、sessionIdle()、sessionOpened() など) を別のスレッド (つまり、IoProcessor ではないスレッド) で実行する場合は、ここで ExecutorFilter フィルターを使用する必要があります。 IoProcessor のコンストラクターの 1 つのパラメーターは java.util.concurrent.Executor であることがわかります。これにより、IoProcessor によって呼び出されるフィルターと IoHandler の一部のイベント メソッドが、スレッド プールに割り当てられたスレッド上で独立して実行され、IoProcessor 上では実行されなくなります。
それが存在するスレッド。
例:
acceptor.getFilterChain().addLast("exceutor", new ExecutorFilter());
我们看到是用这个功能,简单的一行代码就可以了。那么ExecutorFilter 还有许多重载的构造方法,这些重载的有参构造方法,参数主要用于指定如下信息:
(1.) 指定线程池的属性信息,譬如:核心大小、最大大小、等待队列的性质等。你特别要关注的是ExecutorFilter 内部默认使用的是OrderedThreadPoolExecutor 作为线程池的实现,从名字上可以看出是保证各个事件在多线程执行中的顺序(譬如:各个事件方
法的执行是排他的,也就是不可能出现两个事件方法被同时执行;messageReceived()总是在sessionClosed() 方法之前执行), 这是因为多线程的执行是异步的, 如果没有OrderedThreadPoolExecutor 来保证IoHandler 中的方法的调用顺序,可能会出现严重的问题。但是如果你的代码确实没有依赖于IoHandler 中的事件方法的执行顺序,那么你可以使用UnorderedThreadPoolExecutor 作为线程池的实现。因此,你也最好不要改变默认的Executor
实现,否则,事件的执行顺序就会混乱,譬如:messageReceived()、messageSent()方法被同时执行。
(2.) 哪些事件方法被关注,也就哪些事件方法用这个线程池执行。线程池可以异步执行的事件类型是位于IoEventType 中的九个枚举值中除了SESSION_CREATED 之外的其余八个,这说明Session 建立的事件只能与IoProcessor 在同一个线程上执行。
public enum IoEventType { SESSION_CREATED, SESSION_OPENED, SESSION_CLOSED, MESSAGE_RECEIVED, MESSAGE_SENT, SESSION_IDLE, EXCEPTION_CAUGHT, WRITE, CLOSE, }
默认情况下,没有配置关注的事件类型,有如下六个事件方法会被自动使用线程池异步执行:
IoEventType.EXCEPTION_CAUGHT,
IoEventType.MESSAGE_RECEIVED,
IoEventType.MESSAGE_SENT,
IoEventType.SESSION_CLOSED,
IoEventType.SESSION_IDLE,
IoEventType.SESSION_OPENED
其实ExecutorFilter 的工作机制很简单,就是在调用下一个过滤器的事件方法时,把其交给Executor 的execute(Runnable runnable)方法来执行,其实你自己在IoHandler 或者某个过滤器的事件方法中开启一个线程,也可以完成同样的功能,只不过这样做,你就失去了程序的可配置性,线程调用的代码也会完全耦合在代码中。但要注意的是绝对不能开启线程让其执行sessionCreated()方法。如果你真的打算使用这个ExecutorFilter,那么最好想清楚它该放在过滤器链的哪个位置,针对哪些事件做异步处理机制。一般ExecutorFilter
都是要放在ProtocolCodecFilter 过滤器的后面,也就是不要让编解码运行在独立的线程上,而是要运行在IoProcessor 所在的线程,因为编解码处理的数据都是由IoProcessor 读取和发送的,没必要开启新的线程,否则性能反而会下降。一般使用ExecutorFilter 的典型场景是将业务逻辑(譬如:耗时的数据库操作)放在单独的线程中运行,也就是说与IO 处理无关的操作可以考虑使用ExecutorFilter 来异步执行。
以上がJava での mina の詳細な紹介の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。