设为首页 加入收藏

TOP

Giraph源码分析(九)――Aggregators原理解析(三)
2014-11-24 07:41:51 来源: 作者: 【 】 浏览:4
Tags:Giraph 源码 分析 Aggregators 原理 解析
uest数目waitingOnPermits. 接受一个Request

  /**
   * Require more permits. This will increase the number of times permits
   * were required. Doesn't wait for permits to become available.
   *
   * @param permits Number of permits to require
   * @param taskId Task id which required permits
   */
  public synchronized void requirePermits(long permits, int taskId) {
    arrivedTaskIds.add(taskId);
    waitingOnPermits += permits;
    notifyAll();
  }

\ 接受一个Request后,会调用releaseOnePermit()方法把waitingOnPermits减1。 \

3. 在Vertex.compute()方法中,每个Worker聚集自身的值。计算完成后,调用WorkerAggregatorHandler类的finishSuperstep( WorkerAggregatorRequestProcessor requestProcessor)方法,把本地的聚集器的值给句聚集器的aggregatorName发送给该aggregator所属的Worker. Aggregator的属主Worker接受其他所有Workers发送的本地聚集值进行汇总,汇总完毕后发送给Master,供下一次超级步的MasterCompute.compute()方法使用。finishSuperstep方法如下:

 /**
   * Send aggregators to their owners and in the end to the master
   *
   * @param requestProcessor Request processor for aggregators
   */
  public void finishSuperstep(
      WorkerAggregatorRequestProcessor requestProcessor) {
    OwnerAggregatorServerData ownerAggregatorData =
        serviceWorker.getServerData().getOwnerAggregatorData();
    // First send partial aggregated values to their owners and determine
    // which aggregators belong to this worker
    for (Map.Entry> entry :
        currentAggregatorMap.entrySet()) {
        boolean sent = requestProcessor.sendAggregatedValue(entry.getKey(),
            entry.getValue().getAggregatedValue());
        if (!sent) {
          // If it's my aggregator, add it directly
          ownerAggregatorData.aggregate(entry.getKey(),
              entry.getValue().getAggregatedValue());
        }
    }
    // Flush
    requestProcessor.flush();
    // Wait to receive partial aggregated values from all other workers
    Iterable> myAggregators =
        ownerAggregatorData.getMyAggregatorValuesWhenReady(
            getOtherWorkerIdsSet());

    // Send final aggregated values to master
    AggregatedValueOutputStream aggregatorOutput =
        new AggregatedValueOutputStream();
    for (Map.Entry entry : myAggregators) {
        int currentSize = aggregatorOutput.addAggregator(entry.getKey(),
            entry.getValue());
        if (currentSize > maxBytesPerAggregatorRequest) {
          requestProcessor.sendAggregatedValuesToMaster(
              aggregatorOutput.flush());
        }   
    }
    requestProcessor.sendAggregatedValuesToMaster(aggregatorOutput.flush());
    // Wait for master to receive aggregated values before proceeding
    serviceWorker.getWorkerClient().waitAllRequests();
    ownerAggregatorData.reset();
  }

调用关系如下:

\

4. 大同步后,Master调用MasterAggregatorHandler类的prepareSusperStep(masterClient)方法,收集聚集器的值。方法内容如下:

  public void prepareSuperstep(MasterClient masterClient) {

    // 收集上次超级步的聚集值,为master compute 做准备
    for (AggregatorWrapper aggregator : aggregatorMap.values()) {
	// 如果是 Persistent Aggregator,则累加
	if (aggregator.isPersistent()) {
        aggregator.aggregateCurrent(aggregator.getPreviousAggregatedValue());
      }
      aggregator.setPreviousAggregatedValue(
          aggregator.getCurrentAggregatedValue());
      aggregator.resetCurrentAggregator();
      progressable.progress();
    }
  }
然后调用MasterCompute.compute()方法(可能会修改聚集器的值),在该方法内若根据聚集器的值调用了MasterCompute类的haltCompute()方法来终止MaterCompute,则表明要结束整个Job。那么Master就会通知所有Workers要结束整个作业;在该方法内若没有调用MasterCompute类的haltCompute()方法,则回到步骤1继续进行迭代。

备注:Job迭代结束条件有三,满足其一就行:
1) 达到最大迭代次数
2) 没有活跃顶点且没有消息在传递
3) 终止MasterCompute计算

总结:为解决在多个Aggr

首页 上一页 1 2 3 4 下一页 尾页 3/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
分享到: 
上一篇mongodb学习笔记--杂项与补充 下一篇机房收费系统――存储过程的运用

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

·About - Redis (2025-12-26 08:20:56)
·Redis: A Comprehens (2025-12-26 08:20:53)
·Redis - The Real-ti (2025-12-26 08:20:50)
·Bash 脚本教程——Li (2025-12-26 07:53:35)
·实战篇!Linux shell (2025-12-26 07:53:32)