设为首页 加入收藏

TOP

HBase1.0.0源码分析之请求处理流程分析以Put操作为例(二)(一)
2015-11-21 01:45:38 来源: 作者: 【 】 浏览:0
Tags:HBase1.0.0 源码 分析 请求 处理 流程 Put 作为

1.通过mutate(put)操作,将单个put操作添加到缓冲操作中,这些缓冲操作其实就是Put的父类的一个List的集合。如下:

    private List writeAsyncBuffer = new LinkedList<>();
    writeAsyncBuffer.add(m);

当writeAsyncBuffer满了之后或者是人为的调用backgroundFlushCommits操作促使缓冲池中的操作被执行。

2.backgroundFlushCommits(boolean synchronous)执行操作缓冲池中的操作,其实他也并不是自己去处理响应的操作,而是委托给一个AsyncProcess具体进行响应操作的执行,该类是模拟一个异步处理持续请求流的类。这其中主要发生以下几个主要操作:

    ap.submit(tableName, writeAsyncBuffer, true, null, false);

    Map> actionsByServer =
        new HashMap>();
   List> retainedActions = new  ArrayList>(rows.size());

   addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup

    return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults,
      locationErrors, locationErrorRows, actionsByServer, pool);

总结以下这里主要干了以下几件事:
* 对Put操作进行封装,封装成Action
* 找出每个操作对应的regionServer形成ServerName - > MultiAction的键值对,然后继续submit
3. 在submitMultiActions方法里面,作者使用了一个AsyncRequestFutureImpl的实现来保存结果数据

 AsyncRequestFutureImpl ars = createAsyncRequestFuture(
      tableName, retainedActions, nonceGroup, pool, callback, results, needResults);

4.类ars中的sendMultiAction函数sendMultiAction才是最后真正的逻辑层调用的地方,完整的函数如下:

    private void sendMultiAction(Map> actionsByServer,
        int numAttempt, List> actionsForReplicaThread, boolean reuseThread) {
      // Run the last item on the same thread if we are already on a send thread.
      // We hope most of the time it will be the only item, so we can cut down on threads.
      int actionsRemaining = actionsByServer.size();
      // This iteration is by server (the HRegionLocation comparator is by server portion only).
      for (Map.Entry> e : actionsByServer.entrySet()) {
        ServerName server = e.getKey();
        MultiAction multiAction = e.getValue();
        incTaskCounters(multiAction.getRegions(), server);
        Collection runnables = getNewMultiActionRunnable(server, multiAction,
            numAttempt);
        // make sure we correctly count the number of runnables before we try to reuse the send
        // thread, in case we had to split the request into different runnables because of backoff
        if (runnables.size() > actionsRemaining) {
          actionsRemaining = runnables.size();
        }

        // run all the runnables
        for (Runnable runnable : runnables) {
          if ((--actionsRemaining == 0) && reuseThread) {
            runnable.run();
          } else {
            try {
              pool.submit(runnable);
            } catch (RejectedExecutionException ree) {
              // This should never happen. But as the pool is provided by the end user, let's secure
              //  this a little.
              decTaskCounters(multiAction.getRegions(), server);
              LOG.warn("#" + id + ", the task was rejected by the pool. This is unexpected." +
                  " Server is " + server.getServerName(), ree);
              // We're likely to fail again, but this will increment the attempt counter, so it will
              //  finish.
              receiveGlobalFailure(multiAction, server, numAttempt, ree);
            }
          }
        }
      }

      if (actionsForReplicaThread != null) {
        startWaitingForReplicaCalls(actionsForReplicaThread);
      }
    }

函数蛮长,但是做的事情的逻辑也是比较清晰的,从代码中可以看出,程序的处理逻辑是按照regionServer进行区分的,将每个操作封装成可运行的任务(SingleServerRequestRunnable),然后用现成池pool依次执行。
这里有必要将他们所构造的任务的run 函数贴出来研究以下,从run函数我们才能够看到请求的真正逻辑:

    MultiResponse res;
    MultiServerCallable callable = createCallable(server, tableName, multiAction);
    res = createCaller(callable).callWithoutRetries(callable, timeout);
    receiveMultiAction(multiAction, server, res, numAttempt);

每个任务都是通过HBase的RPC框架与服务器进行通信,并获取返回的结果。

首页 上一页 1 2 下一页 尾页 1/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
分享到: 
上一篇Pregel系统介绍 下一篇HBase1.0.0源码分析之请求处理流..

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容: