Home >Database >Mysql Tutorial >Mina源码阅读笔记(四)—Mina的连接IoConnector2
接着Mina源码阅读笔记(四)—Mina的连接IoConnector1,,我们继续: AbstractIoAcceptor: 001 package org.apache.mina.core.rewrite.service; 002 003 import java.io.IOException; 004 import java.net.SocketAddress; 005 import java.util.ArrayList; 0
接着Mina源码阅读笔记(四)—Mina的连接IoConnector1,,我们继续:
AbstractIoAcceptor:
001
|
package org.apache.mina.core.rewrite.service;
|
002
|
003
|
import java.io.IOException;
|
004
|
import java.net.SocketAddress;
|
005
|
import java.util.ArrayList;
|
006
|
import java.util.Collections;
|
007
|
import java.util.HashSet;
|
008
|
import java.util.List;
|
009
|
import java.util.Set;
|
010
|
import java.util.concurrent.Executor;
|
011
|
012
|
public abstract class AbstractIoAcceptor extends AbstractIoService implements
|
013
|
IoAcceptor
{
|
014
|
015
|
private final List<socketaddress>
defaultLocalAddresses = </socketaddress> new ArrayList<socketaddress>();</socketaddress>
|
016
|
017
|
private final List<socketaddress>
unmodifiableDeffaultLocalAddresses = Collections</socketaddress>
|
018
|
.unmodifiableList(defaultLocalAddresses);
|
019
|
020
|
private final Set<socketaddress>
boundAddresses = </socketaddress> new HashSet<socketaddress>();</socketaddress>
|
021
|
022
|
private boolean disconnectOnUnbind
= true ;
|
023
|
024
|
/**
这里不是很明白,为什么要用protected 而 不是private */
|
025
|
protected final Object
bindLock = new Object();
|
026
|
027
|
/**
|
028
|
*
注意这个构造方法是一定要写的,否则编译不通过:抽象类继承时候,构造方法都要写,而且必须包含super
|
029
|
*
|
030
|
*
@param param
|
031
|
*
sessionConfig
|
032
|
*
@param executor
|
033
|
*/
|
034
|
protected AbstractIoAcceptor(Object
param, Executor executor) {
|
035
|
super (param,
executor);
|
036
|
defaultLocalAddresses.add( null );
|
037
|
}
|
038
|
039
|
@Override
|
040
|
public SocketAddress
getLocalAddress() {
|
041
|
042
|
Set<socketaddress>
localAddresses = getLocalAddresses();</socketaddress>
|
043
|
if (localAddresses.isEmpty())
{
|
044
|
return null ;
|
045
|
}
|
046
|
return localAddresses.iterator().next();
|
047
|
}
|
048
|
049
|
@Override
|
050
|
public final Set<socketaddress>
getLocalAddresses() {</socketaddress>
|
051
|
Set<socketaddress>
localAddresses = </socketaddress> new HashSet<socketaddress>();</socketaddress>
|
052
|
synchronized (boundAddresses)
{
|
053
|
localAddresses.addAll(boundAddresses);
|
054
|
}
|
055
|
return localAddresses;
|
056
|
}
|
057
|
058
|
@Override
|
059
|
public void bind(SocketAddress
localAddress) throws IOException
{
|
060
|
//
TODO Auto-generated method stub
|
061
|
062
|
}
|
063
|
064
|
@Override
|
065
|
public void bind(Iterable extends SocketAddress>
localAddresses)
|
066
|
throws IOException
{
|
067
|
//
TODO isDisposing()
|
068
|
069
|
if (localAddresses
== null )
{
|
070
|
throw new IllegalArgumentException( "localAddresses" );
|
071
|
}
|
072
|
073
|
List<socketaddress>
localAddressesCopy = </socketaddress> new ArrayList<socketaddress>();</socketaddress>
|
074
|
075
|
for (SocketAddress
a : localAddresses) {
|
076
|
//
TODO check address type
|
077
|
localAddressesCopy.add(a);
|
078
|
}
|
079
|
080
|
if (localAddressesCopy.isEmpty())
{
|
081
|
throw new IllegalArgumentException( "localAddresses
is empty" );
|
082
|
}
|
083
|
084
|
boolean active
= false ;
|
085
|
086
|
synchronized (bindLock)
{
|
087
|
synchronized (boundAddresses)
{
|
088
|
if (boundAddresses.isEmpty())
{
|
089
|
active
= true ;
|
090
|
}
|
091
|
}
|
092
|
}
|
093
|
/**
implement in abstractIoService */
|
094
|
if (getHandler()
== null )
{
|
095
|
throw new IllegalArgumentException( "handler
is not set" );
|
096
|
}
|
097
|
098
|
try {
|
099
|
Set<socketaddress>
addresses = bindInternal(localAddressesCopy);</socketaddress>
|
100
|
101
|
synchronized (boundAddresses)
{
|
102
|
boundAddresses.addAll(addresses);
|
103
|
}
|
104
|
} catch (IOException
e) {
|
105
|
throw e;
|
106
|
} catch (RuntimeException
e) {
|
107
|
throw e;
|
108
|
} catch (Throwable
e) {
|
109
|
throw new RuntimeException( "Filed
ti bind" );
|
110
|
}
|
111
|
|
112
|
if (active){
|
113
|
//do
sth
|
114
|
}
|
115
|
}
|
116
|
117
|
protected abstract Set<socketaddress>
bindInternal(</socketaddress>
|
118
|
List extends SocketAddress>
localAddress) throws Exception;
|
119
|
120
|
@Override
|
121
|
public void unbind(SocketAddress
localAddress) {
|
122
|
//
TODO Auto-generated method stub
|
123
|
|
124
|
}
|
125
|
}
|
01
|
package org.apache.mina.core.rewrite.polling;
|
02
|
03
|
import java.net.SocketAddress;
|
04
|
import java.nio.channels.ServerSocketChannel;
|
05
|
import java.util.List;
|
06
|
import java.util.Set;
|
07
|
import java.util.concurrent.Executor;
|
08
|
import java.util.concurrent.Semaphore;
|
09
|
import java.util.concurrent.atomic.AtomicReference;
|
10
|
11
|
import org.apache.mina.core.rewrite.service.AbstractIoAcceptor;
|
12
|
13
|
public abstract class AbstractPollingIoAcceptor extends AbstractIoAcceptor
{
|
14
|
15
|
private final Semaphore
lock = new Semaphore( 1 );
|
16
|
17
|
private volatile boolean selectable;
|
18
|
19
|
private AtomicReference<acceptor>
acceptorRef = </acceptor> new AtomicReference<acceptor>();</acceptor>
|
20
|
21
|
/**
|
22
|
*
define the num of sockets that can wait to be accepted.
|
23
|
*/
|
24
|
protected int backlog
= 50 ;
|
25
|
26
|
/**
|
27
|
*
一样的,这个构造方法也要写
|
28
|
*
|
29
|
*
@param param
|
30
|
*
@param executor
|
31
|
*/
|
32
|
protected AbstractPollingIoAcceptor(Object
param, Executor executor) {
|
33
|
super (param,
executor);
|
34
|
//
TODO Auto-generated constructor stub
|
35
|
}
|
36
|
37
|
/**
|
38
|
*
init the polling system. will be called at construction time
|
39
|
*
|
40
|
*
@throws Exception
|
41
|
*/
|
42
|
protected abstract void init() throws Exception;
|
43
|
44
|
protected abstract void destory() throws Exception;
|
45
|
46
|
protected abstract int select() throws Exception;
|
47
|
/**这里有点儿变动*/
|
48
|
protected abstract ServerSocketChannel
open(SocketAddress localAddress) throws Exception;
|
49
|
50
|
@Override
|
51
|
protected Set<socketaddress>
bindInternal(</socketaddress>
|
52
|
List extends SocketAddress>
localAddress) throws Exception
{
|
53
|
//
...
|
54
|
try {
|
55
|
lock.acquire();
|
56
|
Thread.sleep( 10 );
|
57
|
} finally {
|
58
|
lock.release();
|
59
|
}
|
60
|
//
...
|
61
|
return null ;
|
62
|
}
|
63
|
64
|
/**
|
65
|
*
this class is called by startupAcceptor() method it's a thread accepting
|
66
|
*
incoming connections from client
|
67
|
*
|
68
|
*
@author ChenHui
|
69
|
*
|
70
|
*/
|
71
|
private class Acceptor implements Runnable
{
|
72
|
@Override
|
73
|
public void run()
{
|
74
|
assert (acceptorRef.get()
== this );
|
75
|
76
|
int nHandles
= 0 ;
|
77
|
78
|
lock.release();
|
79
|
80
|
while (selectable)
{
|
81
|
try {
|
82
|
int selected
= select();
|
83
|
84
|
//
nHandles+=registerHandles();
|
85
|
86
|
if (nHandles
== 0 )
{
|
87
|
acceptorRef.set( null );
|
88
|
//
...
|
89
|
}
|
90
|
} catch (Exception
e) {
|
91
|
92
|
}
|
93
|
}
|
94
|
}
|
95
|
}
|
96
|
}
|
001
|
package org.apache.mina.rewrite.transport.socket.nio;
|
002
|
003
|
import java.net.InetSocketAddress;
|
004
|
import java.net.ServerSocket;
|
005
|
import java.net.SocketAddress;
|
006
|
import java.nio.channels.SelectionKey;
|
007
|
import java.nio.channels.Selector;
|
008
|
import java.nio.channels.ServerSocketChannel;
|
009
|
import java.util.concurrent.Executor;
|
010
|
011
|
import org.apache.mina.core.rewrite.polling.AbstractPollingIoAcceptor;
|
012
|
import org.apache.mina.rewrite.transport.socket.SocketAcceptor;
|
013
|
014
|
public final class NioSocketAcceptor extends AbstractPollingIoAcceptor
|
015
|
implements SocketAcceptor
{
|
016
|
017
|
private volatile Selector
selector;
|
018
|
019
|
protected NioSocketAcceptor(Object
param, Executor executor) {
|
020
|
super (param,
executor);
|
021
|
//
TODO Auto-generated constructor stub
|
022
|
}
|
023
|
024
|
@Override
|
025
|
public int getManagedSessionCount()
{
|
026
|
//
TODO Auto-generated method stub
|
027
|
return 0 ;
|
028
|
}
|
029
|
030
|
/**
|
031
|
*
这个方法继承自AbstractIoAcceptor
|
032
|
*
|
033
|
*
The type NioSocketAcceptor must implement the inherited abstract method
|
034
|
*
SocketAcceptor.getLocalAddress() to override
|
035
|
*
AbstractIoAcceptor.getLocalAddress()
|
036
|
*/
|
037
|
@Override
|
038
|
public InetSocketAddress
getLocalAddress() {
|
039
|
//
TODO Auto-generated method stub
|
040
|
return null ;
|
041
|
}
|
042
|
043
|
@Override
|
044
|
public void setDefaultLocalAddress(InetSocketAddress
localAddress) {
|
045
|
//
TODO Auto-generated method stub
|
046
|
047
|
}
|
048
|
049
|
@Override
|
050
|
public boolean isReuseAddress()
{
|
051
|
//
TODO Auto-generated method stub
|
052
|
return false ;
|
053
|
}
|
054
|
055
|
@Override
|
056
|
protected void init() throws Exception
{
|
057
|
selector
= Selector.open();
|
058
|
}
|
059
|
060
|
@Override
|
061
|
protected void destory() throws Exception
{
|
062
|
if (selector
!= null )
{
|
063
|
selector.close();
|
064
|
}
|
065
|
}
|
066
|
067
|
@Override
|
068
|
protected int select() throws Exception
{
|
069
|
return selector.select();
|
070
|
}
|
071
|
072
|
@Override
|
073
|
protected void dispose0() throws Exception
{
|
074
|
//
TODO Auto-generated method stub
|
075
|
076
|
}
|
077
|
078
|
protected ServerSocketChannel
open(SocketAddress localAddress)
|
079
|
throws Exception
{
|
080
|
ServerSocketChannel
channel =ServerSocketChannel.open();
|
081
|
|
082
|
boolean success= false ;
|
083
|
|
084
|
try {
|
085
|
channel.configureBlocking( false );
|
086
|
|
087
|
ServerSocket
socket=channel.socket();
|
088
|
|
089
|
socket.setReuseAddress(isReuseAddress());
|
090
|
|
091
|
socket.bind(localAddress);
|
092
|
|
093
|
channel.register(selector,
SelectionKey.OP_ACCEPT);
|
094
|
|
095
|
success= true ;
|
096
|
} finally {
|
097
|
if (!success){
|
098
|
//close(channel);
|
099
|
}
|
100
|
}
|
101
|
return channel;
|
102
|
}
|
103
|
104
|
@Override
|
105
|
public boolean isActive()
{
|
106
|
//
TODO Auto-generated method stub
|
107
|
return false ;
|
108
|
}
|
109
|
110
|
}
|
到此为止将连接部分都写完了,在连接部分还有些零碎的东西,比如handler、pollingsession了。