|
分布式数据库中间件?(3)Cobar对简单select命令的处理过程(二)
essage mm = new MySQLMessage(data); |
| 08 |
sql = mm.readString(charset); |
| 09 |
} catch (UnsupportedEncodingException e) { |
| 10 |
writeErrMessage(ErrorCode.ER_UNKNOWN_CHARACTER_SET, "Unknown charset '" + charset + "'"); |
| 13 |
if (sql == null || sql.length() == 0) { |
| 14 |
writeErrMessage(ErrorCode.ER_NOT_ALLOWED_COMMAND, "Empty SQL"); |
| 17 |
LOGGER.debug("解析的SQL语句:"+sql); |
| 19 |
queryHandler.query(sql); |
| 21 |
writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Query unsupported!"); |
首先新建一个MySQLMessage对象,将数据包的索引位置定位到第6个字节位置处。然后将后面的所有的字节读取成指定编码格式的SQL语句,这里就形成了完整的SQL语句。
查询的时候Cobar控制台输出如下内容:
11:35:33,392 INFO data[4]:3
解析出SQL语句后交给queryHandler处理。该对象是在新建连接的时候设置的ServerQueryHandler类,其实现的query函数如下:
| 01 |
public void query(String sql) { |
| 03 |
ServerConnection c = this.source; |
| 04 |
if (LOGGER.isDebugEnabled()) { |
| 05 |
LOGGER.debug(new StringBuilder().append(c).append(sql).toString()); |
| 08 |
int rs = ServerParse.parse(sql); |
| 10 |
....................... |
| 11 |
case ServerParse.SELECT: |
| 13 |
SelectHandler.handle(sql, c, rs >>> 8); |
| 15 |
....................... |
首先对SQL语句进程解析,通过parse函数对语句解析后返回语句类型的编号。
如果语句没有语法错误,则直接交给SelectHandler进行处理。如果是一般的select语句,则直接调用ServerConnection的execute执行sql
c.execute(stmt, ServerParse.SELECT);
在ServerConnection中的execute函数中需要进行路由检查,因为select的数据不一定在一个数据库中,需要按拆分的规则进行路由的检查。
| 2 |
RouteResultset rrs = null; |
| 4 |
rrs = ServerRouter.route(schema, sql, this.charset, this); |
| 5 |
LOGGER.debug("路由计算结果:"+rrs.toString()); |
具体的路由算法也是比较复杂,以后会专门分析。
Cobar的DEBUG控制台输出路由的计算结果如下:
11:35:33,392 DEBUG 路由计算结果:select * from tb2, route={
该条SQL语句的select内容分布在dnTset2和dnTest3中,所以要分别向这两个数据库进行查询。
经过比较复杂的资源处理最后在每个后端数据库上执行函数execute0。
| 01 |
private void execute0(RouteResultsetNode rrn, Channel c, boolean autocommit, BlockingSession ss, int flag) { |
| 02 |
ServerConnection sc = ss.getSource(); |
| 03 |
......................... |
| 06 |
BinaryPacket bin = ((MySQLChannel) c).execute(rrn, sc, autocommit); |
| 08 |
final ReentrantLock lock = MultiNodeExecutor.this.lock; |
| 11 |
switch (bin.data[0]) { |
| 12 |
case ErrorPacket.FIELD_COUNT: |
| 14 |
handleFailure(ss, rrn, new BinaryErrInfo((MySQLChannel) c, bin, sc, rrn)); |
| 16 |
case OkPacket.FIELD_COUNT: |
| 17 |
OkPacket ok = new OkPacket(); |
| 19 |
affectedRows += ok.affectedRows; |
| 21 |
if (ok.insertId > 0) { |
| 22 |
insertId = (insertId == 0) ok.insertId : Math.min(insertId, ok.insertId); |
| 25 |
handleSuccessOK(ss, rrn, autocommit, ok); |
|