设为首页 加入收藏

TOP

Giraph源码分析(九)――Aggregators原理解析(二)
2014-11-24 07:41:51 来源: 作者: 【 】 浏览:5
Tags:Giraph 源码 分析 Aggregators 原理 解析
Aggregator; // 保存前一个超步的aggregators private final AllAggregatorServerData allAggregatorData;
可以看到,ownerAggregatorData用来存储在当前超步Master发送给Worker的聚集器,allAggregatorData用来保存上一个超级步全局的聚集值。ownerAggregatorData和allAggregatorData值的初始化在SendAggregatorsToOwnerRequest 类中的doRequest(ServerData serverData)方法中,如下:

public void doRequest(ServerData serverData) {
    DataInput input = getDataInput();
    AllAggregatorServerData aggregatorData = serverData.getAllAggregatorData();
    try {
      //收到的Aggregators数目。在CountingOutputStream类中有计数器counter,
      //每向输出流中添加一个聚集器对象,计数加1. 发送时,在flush方法中把该值插入到输出流最前面。
      int numAggregators = input.readInt();
      for (int i = 0; i < numAggregators; i++) {
        String aggregatorName = input.readUTF();
        String aggregatorClassName = input.readUTF();
        if (aggregatorName.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
          LongWritable count = new LongWritable(0);
          //Master发送给该Worker的requests总数目.
          count.readFields(input);
          aggregatorData.receivedRequestCountFromMaster(count.get(),
              getSenderTaskId());
        } else {
          Class> aggregatorClass =
              AggregatorUtils.getAggregatorClass(aggregatorClassName);
          aggregatorData.registerAggregatorClass(aggregatorName,
              aggregatorClass);
          Writable aggregatorValue =
              aggregatorData.createAggregatorInitialValue(aggregatorName);
          aggregatorValue.readFields(input);
          //把收到的上一次全局聚集的值赋值给allAggregatorData
          aggregatorData.setAggregatorValue(aggregatorName, aggregatorValue);
          //ownerAggregatorData只接受聚集器
          serverData.getOwnerAggregatorData().registerAggregator(
              aggregatorName, aggregatorClass);
        }
      }
    } catch (IOException e) {
      throw new IllegalStateException("doRequest: " +
          "IOException occurred while processing request", e);
    }
    //接受一个 request,计数减1,同时把收到的Data添加到allAggregatorServerData的List masterData中
    aggregatorData.receivedRequestFromMaster(getData());
 }

每个Worker在开始计算前,会调用BspServiceWorker类的prepareSuperStep()方法来进行聚集器值的派发和接受其他Workers发送的聚集器值。调用关系如下:

\

BspServiceWorker类的prepareSuperStep()方法如下:

@Override
public void prepareSuperstep() {
   if (getSuperstep() != INPUT_SUPERSTEP) {
	  /*
	   * aggregatorHandler为WorkerAggregatorHandler类型.可参考上文中MasterAggregatorHandler的类继承关系
	   * workerAggregatorRequestProcessor声明为WorkerAggregatorRequestProcessor(接口)类型,
	   * 实际为NettyWorkerAggregatorRequestProcessor的实例,用于Worker间发送聚集器的值。
	   */
      aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor);
   }
}

WorkerAggregatorHandler类的prepareSuperstep( WorkerAggregatorRequestProcessor requestProcessor)方法如下:

public void prepareSuperstep(WorkerAggregatorRequestProcessor requestProcessor) {
    AllAggregatorServerData allAggregatorData =
        serviceWorker.getServerData().getAllAggregatorData();
    /**
     * 等待直到Master发送给该Worker的聚集器都已接受完,
     * 返回值为Master发送给该Worker的所有Data(聚集器)
     */
    Iterable dataToDistribute =
        allAggregatorData.getDataFromMasterWhenReady(
            serviceWorker.getMasterInfo());
  
    // 把从Master收到的Data(聚集器)发送给其他所有Workers
    requestProcessor.distributeAggregators(dataToDistribute);

    // 等待直到接受完其他Workers发送给该Workers的聚集器
    allAggregatorData.fillNextSuperstepMapsWhenReady(
        getOtherWorkerIdsSet(), previousAggregatedValueMap,
        currentAggregatorMap);
    // 只是清空allAggregatorServerData的List masterData对象
    // 为下一个超级步接受Master发送的聚集器做准备
    allAggregatorData.reset();
}
下面详述Worker如何判定已接收完所有Master发送的所有Request ? 主要目的在于描述分布式环境下线程间如何协作。在AllAggregatorServerData类中定义了TaskIdsPermitBarrier类型的变量masterBarrier,用来判断是否接收完Master发送的Request. TaskIdsPermitBarrier类中主要使用wait()、notifyAll()等方法来控制,当获得的aggregatorName等于AggregatorUtils. SPECIAL_COUNT_AGGREGATOR时,会调用requirePermits(long permits,int taskId)来增加接收的arrivedTaskIds和需要等待的req
首页 上一页 1 2 3 4 下一页 尾页 2/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
分享到: 
上一篇mongodb学习笔记--杂项与补充 下一篇机房收费系统――存储过程的运用

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

·About - Redis (2025-12-26 08:20:56)
·Redis: A Comprehens (2025-12-26 08:20:53)
·Redis - The Real-ti (2025-12-26 08:20:50)
·Bash 脚本教程——Li (2025-12-26 07:53:35)
·实战篇!Linux shell (2025-12-26 07:53:32)