Heim  >  Artikel  >  Datenbank  >  分布式数据库中间件–(3)Cobar对简单select命令的处理过程

分布式数据库中间件–(3)Cobar对简单select命令的处理过程

WBOY
WBOYOriginal
2016-06-07 15:56:071267Durchsuche

在认证成功后Cobar会将该连接的回调处理函数由FrontendAuthenticator(前端认证处理器)设置成FrontendCommandHanler(前端命令处理器)。 所以在客户端再次向Cobar发送请求报文的时候,前端命令处理器会处理该连接。下面详细分析一下简单select语句的执行过

在认证成功后Cobar会将该连接的回调处理函数由FrontendAuthenticator(前端认证处理器)设置成FrontendCommandHanler(前端命令处理器)。

所以在客户端再次向Cobar发送请求报文的时候,前端命令处理器会处理该连接。下面详细分析一下简单select语句的执行过程。

1、事件的产生

NIOReactor的R线程一直在监听selector上的每个连接的感兴趣事件是否发生,当客户端发送了一条select * from tb1,select函数会返回,然后获取到该连接SelectionKey,并且该SelectKey的兴趣事件是OP_READ。此时会调用read(NIOConnection)函数。

01 public <code class="Brush keyword">void <code class="Brush plain">run() {
02 <code class="Brush keyword">final <code class="Brush plain">Selector selector = <code class="Brush keyword">this<code class="Brush plain">.selector;
03 <code class="Brush keyword">for <code class="Brush plain">(;;) {
04 <code class="Brush plain">++reactCount;
05 <code class="Brush keyword">try <code class="Brush plain">{
06 <code class="Brush keyword">int <code class="Brush plain">res = selector.select();
07 <code class="Brush plain">LOGGER.debug(reactCount + <code class="Brush string">">>NIOReactor接受连接数:" <code class="Brush plain">+ res);
08 <code class="Brush plain">register(selector);
09 <code class="Brush plain">Set<selectionkey> keys = selector.selectedKeys();</selectionkey>
10 <code class="Brush keyword">try <code class="Brush plain">{
11 <code class="Brush keyword">for <code class="Brush plain">(SelectionKey key : keys) {
12 <code class="Brush plain">Object att = key.attachment();
13 <code class="Brush keyword">if <code class="Brush plain">(att != <code class="Brush keyword">null <code class="Brush plain">&& key.isValid()) {
14 <code class="Brush keyword">int <code class="Brush plain">readyOps = key.readyOps();
15 <code class="Brush keyword">if <code class="Brush plain">((readyOps & SelectionKey.OP_READ) != <code class="Brush value">0<code class="Brush plain">) {
16 <code class="Brush plain">LOGGER.debug(<code class="Brush string">"select读事件"<code class="Brush plain">);
17 <code class="Brush plain">read((NIOConnection) att);
18 <code class="Brush plain">..............................
19 <code class="Brush plain">}
20 <code class="Brush plain">...........................
21 <code class="Brush plain">}
22 <code class="Brush plain">} ..................
23 <code class="Brush plain">} ............
24 <code class="Brush plain">}
25 <code class="Brush plain">}

2、调用该连接的read函数进行处理

该函数在上一篇中提到过,该函数的实现在AbstractConnection中,实现从channel中读取数据到缓冲区,然后从缓冲区完整的取出整包数据交给FrontendConnection类的handle()函数处理。

该函数交给processor进行异步处理。从processor中的线程池获取一个线程来执行该任务。这里调用具体的handler来进行处理。

刚开始提到的,当认证成功后,Cobar将连接的回调处理函数设置为FrontendCommandHandler。所以这里会调用前端命令处理器的handler函数进行数据的处理。

在这里需要先了解MySQL数据包的格式:

MySQL客户端命令请求报文

MySQL客户端命令请求报文

该处理函数如下:

01 public <code class="Brush keyword">void <code class="Brush plain">handle(<code class="Brush keyword">byte<code class="Brush plain">[] data) {
02 <code class="Brush plain">LOGGER.info(<code class="Brush string">"data[4]:"<code class="Brush plain">+data[<code class="Brush value">4<code class="Brush plain">]);
03 <code class="Brush keyword">switch <code class="Brush plain">(data[<code class="Brush value">4<code class="Brush plain">]) {
04 <code class="Brush keyword">case <code class="Brush plain">MySQLPacket.COM_INIT_DB:
05 <code class="Brush plain">commands.doInitDB();
06 <code class="Brush plain">source.initDB(data);
07 <code class="Brush keyword">break<code class="Brush plain">;
08 <code class="Brush keyword">case <code class="Brush plain">MySQLPacket.COM_QUERY:
09 <code class="Brush plain">commands.doQuery();
10 <code class="Brush plain">source.query(data);
11 <code class="Brush keyword">break<code class="Brush plain">;
12 <code class="Brush keyword">case <code class="Brush plain">MySQLPacket.COM_PING:
13 <code class="Brush plain">commands.doPing();
14 <code class="Brush plain">source.ping();
15 <code class="Brush keyword">break<code class="Brush plain">;
16 <code class="Brush keyword">case <code class="Brush plain">MySQLPacket.COM_QUIT:
17 <code class="Brush plain">commands.doQuit();
18 <code class="Brush plain">source.close();
19 <code class="Brush keyword">break<code class="Brush plain">;
20 <code class="Brush keyword">case <code class="Brush plain">MySQLPacket.COM_PROCESS_KILL:
21 <code class="Brush plain">commands.doKill();
22 <code class="Brush plain">source.kill(data);
23 <code class="Brush keyword">break<code class="Brush plain">;
24 <code class="Brush keyword">case <code class="Brush plain">MySQLPacket.COM_STMT_PREPARE:
25 <code class="Brush plain">commands.doStmtPrepare();
26 <code class="Brush plain">source.stmtPrepare(data);
27 <code class="Brush keyword">break<code class="Brush plain">;
28 <code class="Brush keyword">case <code class="Brush plain">MySQLPacket.COM_STMT_EXECUTE:
29 <code class="Brush plain">commands.doStmtExecute();
30 <code class="Brush plain">source.stmtExecute(data);
31 <code class="Brush keyword">break<code class="Brush plain">;
32 <code class="Brush keyword">case <code class="Brush plain">MySQLPacket.COM_STMT_CLOSE:
33 <code class="Brush plain">commands.doStmtClose();
34 <code class="Brush plain">source.stmtClose(data);
35 <code class="Brush keyword">break<code class="Brush plain">;
36 <code class="Brush keyword">case <code class="Brush plain">MySQLPacket.COM_HEARTBEAT:
37 <code class="Brush plain">commands.doHeartbeat();
38 <code class="Brush plain">source.heartbeat(data);
39 <code class="Brush keyword">break<code class="Brush plain">;
40 <code class="Brush keyword">default<code class="Brush plain">:
41 <code class="Brush plain">commands.doOther();
42 <code class="Brush plain">source.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, <code class="Brush string">"Unknown command"<code class="Brush plain">);
43 <code class="Brush plain">}
44 <code class="Brush plain">}

 

由于每个报文都有消息头,消息头固定的是4个字节,前3个字节是消息长度,后面的一个字节是报文序号,如下所示

mysql_protocol_struct

所以data[4]是第五个字节。也就是消息体的第一个字节。客户端向Cobar端发送的是命令报文,第一个字节是具体的命令。

如果是select语句,那么data[4]就是COM_QUERY,然后会调用具体连接的query成员函数,其定义在FrontendConnection类中。

01 public <code class="Brush keyword">void <code class="Brush plain">query(<code class="Brush keyword">byte<code class="Brush plain">[] data) {
02 <code class="Brush keyword">if <code class="Brush plain">(queryHandler != <code class="Brush keyword">null<code class="Brush plain">) {
03 <code class="Brush comments">// 取得语句
04 <code class="Brush plain">MySQLMessage mm = <code class="Brush keyword">new <code class="Brush plain">MySQLMessage(data);
05 <code class="Brush plain">mm.position(<code class="Brush value">5<code class="Brush plain">);
06 <code class="Brush plain">String sql = <code class="Brush keyword">null<code class="Brush plain">;
07 <code class="Brush keyword">try <code class="Brush plain">{
08 <code class="Brush plain">sql = mm.readString(charset);
09 <code class="Brush plain">} catch (UnsupportedEncodingException e) {
10 <code class="Brush plain">writeErrMessage(ErrorCode.ER_UNKNOWN_CHARACTER_SET, <code class="Brush string">"Unknown charset '" <code class="Brush plain">+ charset + <code class="Brush string">"'"<code class="Brush plain">);
11 <code class="Brush keyword">return<code class="Brush plain">;
12 <code class="Brush plain">}
13 <code class="Brush keyword">if <code class="Brush plain">(sql == <code class="Brush keyword">null <code class="Brush plain">|| sql.length() == <code class="Brush value">0<code class="Brush plain">) {
14 <code class="Brush plain">writeErrMessage(ErrorCode.ER_NOT_ALLOWED_COMMAND, <code class="Brush string">"Empty SQL"<code class="Brush plain">);
15 <code class="Brush keyword">return<code class="Brush plain">;
16 <code class="Brush plain">}
17 <code class="Brush plain">LOGGER.debug(<code class="Brush string">"解析的SQL语句:"<code class="Brush plain">+sql);
18 <code class="Brush comments">// 执行查询
19 <code class="Brush plain">queryHandler.query(sql);
20 <code class="Brush plain">} else {
21 <code class="Brush plain">writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, <code class="Brush string">"Query unsupported!"<code class="Brush plain">);
22 <code class="Brush plain">}
23 <code class="Brush plain">}

首先新建一个MySQLMessage对象,将数据包的索引位置定位到第6个字节位置处。然后将后面的所有的字节读取成指定编码格式的SQL语句,这里就形成了完整的SQL语句。

查询的时候Cobar控制台输出如下内容:

11:35:33,392 INFO data[4]:3

解析出SQL语句后交给queryHandler处理。该对象是在新建连接的时候设置的ServerQueryHandler类,其实现的query函数如下:

01 public <code class="Brush keyword">void <code class="Brush plain">query(String sql) {
02 <code class="Brush comments">//这里就得到了完整的SQL语句,接收自客户端
03 <code class="Brush plain">ServerConnection c = <code class="Brush keyword">this<code class="Brush plain">.source;
04 <code class="Brush keyword">if <code class="Brush plain">(LOGGER.isDebugEnabled()) {
05 <code class="Brush plain">LOGGER.debug(<code class="Brush keyword">new <code class="Brush plain">StringBuilder().append(c).append(sql).toString());
06 <code class="Brush plain">}
07 <code class="Brush comments">//该函数对SQL语句的语法和语义进行分析,并返回SQL语句的对于类型,执行相应的操作
08 <code class="Brush keyword">int <code class="Brush plain">rs = ServerParse.parse(sql);
09 <code class="Brush keyword">switch <code class="Brush plain">(rs & <code class="Brush value">0xff<code class="Brush plain">) {
10 <code class="Brush plain">.......................
11 <code class="Brush keyword">case <code class="Brush plain">ServerParse.SELECT:
12 <code class="Brush comments">//select操作执行
13 <code class="Brush plain">SelectHandler.handle(sql, c, rs >>> <code class="Brush value">8<code class="Brush plain">);
14 <code class="Brush keyword">break<code class="Brush plain">;
15 <code class="Brush plain">.......................
16 <code class="Brush plain">}
17 <code class="Brush plain">}

首先对SQL语句进程解析,通过parse函数对语句解析后返回语句类型的编号。

如果语句没有语法错误,则直接交给SelectHandler进行处理。如果是一般的select语句,则直接调用ServerConnection的execute执行sql

c.execute(stmt, ServerParse.SELECT);

在ServerConnection中的execute函数中需要进行路由检查,因为select的数据不一定在一个数据库中,需要按拆分的规则进行路由的检查。

1 // 路由计算
2 RouteResultset rrs = <code class="Brush keyword">null<code class="Brush plain">;
3 <code class="Brush keyword">try <code class="Brush plain">{
4 <code class="Brush plain">rrs = ServerRouter.route(schema, sql, <code class="Brush keyword">this<code class="Brush plain">.charset, <code class="Brush keyword">this<code class="Brush plain">);
5 <code class="Brush plain">LOGGER.debug(<code class="Brush string">"路由计算结果:"<code class="Brush plain">+rrs.toString());
6 <code class="Brush plain">}

具体的路由算法也是比较复杂,以后会专门分析。

Cobar的DEBUG控制台输出路由的计算结果如下:

11:35:33,392 DEBUG 路由计算结果:select * from tb2, route={

该条SQL语句的select内容分布在dnTset2和dnTest3中,所以要分别向这两个数据库进行查询。

经过比较复杂的资源处理最后在每个后端数据库上执行函数execute0。

01 private <code class="Brush keyword">void <code class="Brush plain">execute0(RouteResultsetNode rrn, Channel c, <code class="Brush keyword">boolean <code class="Brush plain">autocommit, BlockingSession ss, <code class="Brush keyword">int <code class="Brush plain">flag) {
02 <code class="Brush plain">ServerConnection sc = ss.getSource();
03 <code class="Brush plain">.........................
04 <code class="Brush keyword">try <code class="Brush plain">{
05 <code class="Brush comments">// 执行并等待返回
06 <code class="Brush plain">BinaryPacket bin = ((MySQLChannel) c).execute(rrn, sc, autocommit);
07 <code class="Brush comments">// 接收和处理数据,执行到这里就说明上面的执行已经得到执行结果的返回
08 <code class="Brush keyword">final <code class="Brush plain">ReentrantLock lock = MultiNodeExecutor.<code class="Brush keyword">this<code class="Brush plain">.lock;
09 <code class="Brush plain">lock.lock();
10 <code class="Brush keyword">try <code class="Brush plain">{
11 <code class="Brush keyword">switch <code class="Brush plain">(bin.data[<code class="Brush value">0<code class="Brush plain">]) {
12 <code class="Brush keyword">case <code class="Brush plain">ErrorPacket.FIELD_COUNT:
13 <code class="Brush plain">c.setRunning(<code class="Brush keyword">false<code class="Brush plain">);
14 <code class="Brush plain">handleFailure(ss, rrn, <code class="Brush keyword">new <code class="Brush plain">BinaryErrInfo((MySQLChannel) c, bin, sc, rrn));
15 <code class="Brush keyword">break<code class="Brush plain">;
16 <code class="Brush keyword">case <code class="Brush plain">OkPacket.FIELD_COUNT:
17 <code class="Brush plain">OkPacket ok = <code class="Brush keyword">new <code class="Brush plain">OkPacket();
18 <code class="Brush plain">ok.read(bin);
19 <code class="Brush plain">affectedRows += ok.affectedRows;
20 <code class="Brush comments">// set lastInsertId
21 <code class="Brush keyword">if <code class="Brush plain">(ok.insertId > <code class="Brush value">0<code class="Brush plain">) {
22 <code class="Brush plain">insertId = (insertId == <code class="Brush value">0<code class="Brush plain">) ? ok.insertId : Math.min(insertId, ok.insertId);
23 <code class="Brush plain">}
24 <code class="Brush plain">c.setRunning(<code class="Brush keyword">false<code class="Brush plain">);
25 <code class="Brush plain">handleSuccessOK(ss, rrn, autocommit, ok);
26 <code class="Brush keyword">break<code class="Brush plain">;
27 <code class="Brush keyword">default<code class="Brush plain">: // HEADER|FIELDS|FIELD_EOF|ROWS|LAST_EOF
28 <code class="Brush keyword">final <code class="Brush plain">MySQLChannel mc = (MySQLChannel) c;
29 <code class="Brush keyword">if <code class="Brush plain">(fieldEOF) {
30 <code class="Brush keyword">for <code class="Brush plain">(;;) {
31 <code class="Brush plain">bin = mc.receive();
32 <code class="Brush keyword">switch <code class="Brush plain">(bin.data[<code class="Brush value">0<code class="Brush plain">]) {
33 <code class="Brush keyword">case <code class="Brush plain">ErrorPacket.FIELD_COUNT:
34 <code class="Brush plain">c.setRunning(<code class="Brush keyword">false<code class="Brush plain">);
35 <code class="Brush plain">handleFailure(ss, rrn, <code class="Brush keyword">new <code class="Brush plain">BinaryErrInfo(mc, bin, sc, rrn));
36 <code class="Brush keyword">return<code class="Brush plain">;
37 <code class="Brush keyword">case <code class="Brush plain">EOFPacket.FIELD_COUNT:
38 <code class="Brush plain">handleRowData(rrn, c, ss);
39 <code class="Brush keyword">return<code class="Brush plain">;
40 <code class="Brush keyword">default<code class="Brush plain">:
41 <code class="Brush keyword">continue<code class="Brush plain">;
42 <code class="Brush plain">}
43 <code class="Brush plain">}
44 <code class="Brush plain">} else {
45 <code class="Brush plain">bin.packetId = ++packetId;<code class="Brush comments">// HEADER
46 <code class="Brush plain">List<mysqlpacket> headerList = <code class="Brush keyword">new <code class="Brush plain">LinkedList<mysqlpacket>();</mysqlpacket>
47 <code class="Brush plain">headerList.add(bin);
48 <code class="Brush keyword">for <code class="Brush plain">(;;) {
49 <code class="Brush plain">bin = mc.receive();
50 <code class="Brush keyword">switch <code class="Brush plain">(bin.data[<code class="Brush value">0<code class="Brush plain">]) {
51 <code class="Brush keyword">case <code class="Brush plain">ErrorPacket.FIELD_COUNT:
52 <code class="Brush plain">c.setRunning(<code class="Brush keyword">false<code class="Brush plain">);
53 <code class="Brush plain">handleFailure(ss, rrn, <code class="Brush keyword">new <code class="Brush plain">BinaryErrInfo(mc, bin, sc, rrn));
54 <code class="Brush keyword">return<code class="Brush plain">;
55 <code class="Brush keyword">case <code class="Brush plain">EOFPacket.FIELD_COUNT:
56 <code class="Brush plain">bin.packetId = ++packetId;<code class="Brush comments">// FIELD_EOF
57 <code class="Brush keyword">for <code class="Brush plain">(MySQLPacket packet : headerList) {
58 <code class="Brush plain">buffer = packet.write(buffer, sc);
59 <code class="Brush plain">}
60 <code class="Brush plain">headerList = <code class="Brush keyword">null<code class="Brush plain">;
61 <code class="Brush plain">buffer = bin.write(buffer, sc);
62 <code class="Brush plain">fieldEOF = <code class="Brush keyword">true<code class="Brush plain">;
63 <code class="Brush plain">handleRowData(rrn, c, ss);
64 <code class="Brush keyword">return<code class="Brush plain">;
65 <code class="Brush keyword">default<code class="Brush plain">:
66 <code class="Brush plain">bin.packetId = ++packetId;<code class="Brush comments">// FIELDS
67 <code class="Brush keyword">switch <code class="Brush plain">(flag) {
68 <code class="Brush keyword">case <code class="Brush plain">RouteResultset.REWRITE_FIELD:
69 <code class="Brush plain">StringBuilder fieldName = <code class="Brush keyword">new <code class="Brush plain">StringBuilder();
70 <code class="Brush plain">fieldName.append(<code class="Brush string">"Tables_in_"<code class="Brush plain">).append(ss.getSource().getSchema());
71 <code class="Brush plain">FieldPacket field = PacketUtil.getField(bin, fieldName.toString());
72 <code class="Brush plain">headerList.add(field);
73 <code class="Brush keyword">break<code class="Brush plain">;
74 <code class="Brush keyword">default<code class="Brush plain">:
75 <code class="Brush plain">headerList.add(bin);
76 <code class="Brush plain">}
77 <code class="Brush plain">}
78 <code class="Brush plain">}
79 <code class="Brush plain">}
80 <code class="Brush plain">}
81 <code class="Brush plain">} finally {
82 <code class="Brush plain">lock.unlock();
83 <code class="Brush plain">}
84 <code class="Brush plain">}//异常处理....................
85 <code class="Brush plain">}

这里真正的执行SQL语句,然后等待后端执行语句的返回数据,在成功获取后端Mysql返回的结果后,该函数返回的数据包是结果集数据包。

当客户端发起认证请求或命令请求后,服务器会返回相应的执行结果给客户端。客户端在收到响应报文后,需要首先检查第1个字节的值,来区分响应报文的类型。

响应报文类型 第1个字节取值范围
OK 响应报文 0×00
Error 响应报文 0xFF
Result Set 报文 0×01 – 0xFA
Field 报文 0×01 – 0xFA
Row Data 报文 0×01 – 0xFA
EOF 报文 0xFE

注:响应报文的第1个字节在不同类型中含义不同,比如在OK报文中,该字节并没有实际意义,值恒为0×00;而在Result Set报文中,该字节又是长度编码的二进制数据结构(Length Coded Binary)中的第1字节。

Result Set 消息分为五部分,结构如下:

结构 说明
[Result Set Header] 列数量
[Field] 列信息(多个)
[EOF] 列结束
[Row Data] 行数据(多个)
[EOF] 数据结束

函数执行完成后,返回的结果都放入LinkedList中,当读取结果完成后放入多节点执行器的缓冲区。如果buffer满了,就通过前端连接写出给客户端。

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Vorheriger Artikel:sql语句中单引号嵌套问题Nächster Artikel:数据库原理常见问答