Aggregator;
// 保存前一个超步的aggregators
private final AllAggregatorServerData allAggregatorData;
可以看到,ownerAggregatorData用来存储在当前超步Master发送给Worker的聚集器,allAggregatorData用来保存上一个超级步全局的聚集值。ownerAggregatorData和allAggregatorData值的初始化在SendAggregatorsToOwnerRequest 类中的doRequest(ServerData serverData)方法中,如下:
public void doRequest(ServerData serverData) {
DataInput input = getDataInput();
AllAggregatorServerData aggregatorData = serverData.getAllAggregatorData();
try {
//收到的Aggregators数目。在CountingOutputStream类中有计数器counter,
//每向输出流中添加一个聚集器对象,计数加1. 发送时,在flush方法中把该值插入到输出流最前面。
int numAggregators = input.readInt();
for (int i = 0; i < numAggregators; i++) {
String aggregatorName = input.readUTF();
String aggregatorClassName = input.readUTF();
if (aggregatorName.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) {
LongWritable count = new LongWritable(0);
//Master发送给该Worker的requests总数目.
count.readFields(input);
aggregatorData.receivedRequestCountFromMaster(count.get(),
getSenderTaskId());
} else {
Class> aggregatorClass =
AggregatorUtils.getAggregatorClass(aggregatorClassName);
aggregatorData.registerAggregatorClass(aggregatorName,
aggregatorClass);
Writable aggregatorValue =
aggregatorData.createAggregatorInitialValue(aggregatorName);
aggregatorValue.readFields(input);
//把收到的上一次全局聚集的值赋值给allAggregatorData
aggregatorData.setAggregatorValue(aggregatorName, aggregatorValue);
//ownerAggregatorData只接受聚集器
serverData.getOwnerAggregatorData().registerAggregator(
aggregatorName, aggregatorClass);
}
}
} catch (IOException e) {
throw new IllegalStateException("doRequest: " +
"IOException occurred while processing request", e);
}
//接受一个 request,计数减1,同时把收到的Data添加到allAggregatorServerData的List masterData中
aggregatorData.receivedRequestFromMaster(getData());
}
每个Worker在开始计算前,会调用BspServiceWorker类的prepareSuperStep()方法来进行聚集器值的派发和接受其他Workers发送的聚集器值。调用关系如下:

BspServiceWorker类的prepareSuperStep()方法如下:
@Override
public void prepareSuperstep() {
if (getSuperstep() != INPUT_SUPERSTEP) {
/*
* aggregatorHandler为WorkerAggregatorHandler类型.可参考上文中MasterAggregatorHandler的类继承关系
* workerAggregatorRequestProcessor声明为WorkerAggregatorRequestProcessor(接口)类型,
* 实际为NettyWorkerAggregatorRequestProcessor的实例,用于Worker间发送聚集器的值。
*/
aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor);
}
}
WorkerAggregatorHandler类的prepareSuperstep( WorkerAggregatorRequestProcessor requestProcessor)方法如下:
public void prepareSuperstep(WorkerAggregatorRequestProcessor requestProcessor) {
AllAggregatorServerData allAggregatorData =
serviceWorker.getServerData().getAllAggregatorData();
/**
* 等待直到Master发送给该Worker的聚集器都已接受完,
* 返回值为Master发送给该Worker的所有Data(聚集器)
*/
Iterable dataToDistribute =
allAggregatorData.getDataFromMasterWhenReady(
serviceWorker.getMasterInfo());
// 把从Master收到的Data(聚集器)发送给其他所有Workers
requestProcessor.distributeAggregators(dataToDistribute);
// 等待直到接受完其他Workers发送给该Workers的聚集器
allAggregatorData.fillNextSuperstepMapsWhenReady(
getOtherWorkerIdsSet(), previousAggregatedValueMap,
currentAggregatorMap);
// 只是清空allAggregatorServerData的List masterData对象
// 为下一个超级步接受Master发送的聚集器做准备
allAggregatorData.reset();
}下面详述Worker如何判定已接收完所有Master发送的所有Request ? 主要目的在于描述分布式环境下线程间如何协作。在AllAggregatorServerData类中定义了TaskIdsPermitBarrier类型的变量masterBarrier,用来判断是否接收完Master发送的Request. TaskIdsPermitBarrier类中主要使用wait()、notifyAll()等方法来控制,当获得的aggregatorName等于AggregatorUtils.
SPECIAL_COUNT_AGGREGATOR时,会调用requirePermits(long permits,int taskId)来增加接收的arrivedTaskIds和需要等待的req