设为首页 加入收藏

TOP

分布式数据库中间件?(3)Cobar对简单select命令的处理过程(二)
2014-11-24 00:04:34 来源: 作者: 【 】 浏览:36
Tags:分布式 数据库 中间件 Cobar 简单 select 命令 处理 过程
essage mm = new MySQLMessage(data);
05 mm.position(5);
06 String sql = null;
07 try {
08 sql = mm.readString(charset);
09 } catch (UnsupportedEncodingException e) {
10 writeErrMessage(ErrorCode.ER_UNKNOWN_CHARACTER_SET, "Unknown charset '" + charset + "'");
11 return;
12 }
13 if (sql == null || sql.length() == 0) {
14 writeErrMessage(ErrorCode.ER_NOT_ALLOWED_COMMAND, "Empty SQL");
15 return;
16 }
17 LOGGER.debug("解析的SQL语句:"+sql);
18 // 执行查询
19 queryHandler.query(sql);
20 } else {
21 writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Query unsupported!");
22 }
23 }

首先新建一个MySQLMessage对象,将数据包的索引位置定位到第6个字节位置处。然后将后面的所有的字节读取成指定编码格式的SQL语句,这里就形成了完整的SQL语句。

查询的时候Cobar控制台输出如下内容:

11:35:33,392 INFO data[4]:3

解析出SQL语句后交给queryHandler处理。该对象是在新建连接的时候设置的ServerQueryHandler类,其实现的query函数如下:

01 public void query(String sql) {
02 //这里就得到了完整的SQL语句,接收自客户端
03 ServerConnection c = this.source;
04 if (LOGGER.isDebugEnabled()) {
05 LOGGER.debug(new StringBuilder().append(c).append(sql).toString());
06 }
07 //该函数对SQL语句的语法和语义进行分析,并返回SQL语句的对于类型,执行相应的操作
08 int rs = ServerParse.parse(sql);
09 switch (rs & 0xff) {
10 .......................
11 case ServerParse.SELECT:
12 //select操作执行
13 SelectHandler.handle(sql, c, rs >>> 8);
14 break;
15 .......................
16 }
17 }

首先对SQL语句进程解析,通过parse函数对语句解析后返回语句类型的编号。

如果语句没有语法错误,则直接交给SelectHandler进行处理。如果是一般的select语句,则直接调用ServerConnection的execute执行sql

c.execute(stmt, ServerParse.SELECT);

在ServerConnection中的execute函数中需要进行路由检查,因为select的数据不一定在一个数据库中,需要按拆分的规则进行路由的检查。

1 // 路由计算
2 RouteResultset rrs = null;
3 try {
4 rrs = ServerRouter.route(schema, sql, this.charset, this);
5 LOGGER.debug("路由计算结果:"+rrs.toString());
6 }

具体的路由算法也是比较复杂,以后会专门分析。

Cobar的DEBUG控制台输出路由的计算结果如下:

11:35:33,392 DEBUG 路由计算结果:select * from tb2, route={

该条SQL语句的select内容分布在dnTset2和dnTest3中,所以要分别向这两个数据库进行查询。

经过比较复杂的资源处理最后在每个后端数据库上执行函数execute0。

01 private void execute0(RouteResultsetNode rrn, Channel c, boolean autocommit, BlockingSession ss, int flag) {
02 ServerConnection sc = ss.getSource();
03 .........................
04 try {
05 // 执行并等待返回
06 BinaryPacket bin = ((MySQLChannel) c).execute(rrn, sc, autocommit);
07 // 接收和处理数据,执行到这里就说明上面的执行已经得到执行结果的返回
08 final ReentrantLock lock = MultiNodeExecutor.this.lock;
09 lock.lock();
10 try {
11 switch (bin.data[0]) {
12 case ErrorPacket.FIELD_COUNT:
13 c.setRunning(false);
14 handleFailure(ss, rrn, new BinaryErrInfo((MySQLChannel) c, bin, sc, rrn));
15 break;
16 case OkPacket.FIELD_COUNT:
17 OkPacket ok = new OkPacket();
18 ok.read(bin);
19 affectedRows += ok.affectedRows;
20 // set lastInsertId
21 if (ok.insertId > 0) {
22 insertId = (insertId == 0) ok.insertId : Math.min(insertId, ok.insertId);
23 }
24 c.setRunning(false);
25 handleSuccessOK(ss, rrn, autocommit, ok);
首页 上一页 1 2 3 下一页 尾页 2/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
分享到: 
上一篇MongoDB中shard key的选择 下一篇海量数据检索的一些思考

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容: