uest数目waitingOnPermits. 接受一个Request
/**
* Require more permits. This will increase the number of times permits
* were required. Doesn't wait for permits to become available.
*
* @param permits Number of permits to require
* @param taskId Task id which required permits
*/
public synchronized void requirePermits(long permits, int taskId) {
arrivedTaskIds.add(taskId);
waitingOnPermits += permits;
notifyAll();
}
接受一个Request后,会调用releaseOnePermit()方法把waitingOnPermits减1。
3. 在Vertex.compute()方法中,每个Worker聚集自身的值。计算完成后,调用WorkerAggregatorHandler类的finishSuperstep( WorkerAggregatorRequestProcessor requestProcessor)方法,把本地的聚集器的值给句聚集器的aggregatorName发送给该aggregator所属的Worker. Aggregator的属主Worker接受其他所有Workers发送的本地聚集值进行汇总,汇总完毕后发送给Master,供下一次超级步的MasterCompute.compute()方法使用。finishSuperstep方法如下:
/**
* Send aggregators to their owners and in the end to the master
*
* @param requestProcessor Request processor for aggregators
*/
public void finishSuperstep(
WorkerAggregatorRequestProcessor requestProcessor) {
OwnerAggregatorServerData ownerAggregatorData =
serviceWorker.getServerData().getOwnerAggregatorData();
// First send partial aggregated values to their owners and determine
// which aggregators belong to this worker
for (Map.Entry> entry :
currentAggregatorMap.entrySet()) {
boolean sent = requestProcessor.sendAggregatedValue(entry.getKey(),
entry.getValue().getAggregatedValue());
if (!sent) {
// If it's my aggregator, add it directly
ownerAggregatorData.aggregate(entry.getKey(),
entry.getValue().getAggregatedValue());
}
}
// Flush
requestProcessor.flush();
// Wait to receive partial aggregated values from all other workers
Iterable> myAggregators =
ownerAggregatorData.getMyAggregatorValuesWhenReady(
getOtherWorkerIdsSet());
// Send final aggregated values to master
AggregatedValueOutputStream aggregatorOutput =
new AggregatedValueOutputStream();
for (Map.Entry entry : myAggregators) {
int currentSize = aggregatorOutput.addAggregator(entry.getKey(),
entry.getValue());
if (currentSize > maxBytesPerAggregatorRequest) {
requestProcessor.sendAggregatedValuesToMaster(
aggregatorOutput.flush());
}
}
requestProcessor.sendAggregatedValuesToMaster(aggregatorOutput.flush());
// Wait for master to receive aggregated values before proceeding
serviceWorker.getWorkerClient().waitAllRequests();
ownerAggregatorData.reset();
}
调用关系如下:
4. 大同步后,Master调用MasterAggregatorHandler类的prepareSusperStep(masterClient)方法,收集聚集器的值。方法内容如下:
public void prepareSuperstep(MasterClient masterClient) {
// 收集上次超级步的聚集值,为master compute 做准备
for (AggregatorWrapper aggregator : aggregatorMap.values()) {
// 如果是 Persistent Aggregator,则累加
if (aggregator.isPersistent()) {
aggregator.aggregateCurrent(aggregator.getPreviousAggregatedValue());
}
aggregator.setPreviousAggregatedValue(
aggregator.getCurrentAggregatedValue());
aggregator.resetCurrentAggregator();
progressable.progress();
}
}然后调用MasterCompute.compute()方法(可能会修改聚集器的值),在该方法内若根据聚集器的值调用了MasterCompute类的haltCompute()方法来终止MaterCompute,则表明要结束整个Job。那么Master就会通知所有Workers要结束整个作业;在该方法内若没有调用MasterCompute类的haltCompute()方法,则回到步骤1继续进行迭代。
备注:Job迭代结束条件有三,满足其一就行:
1) 达到最大迭代次数
2) 没有活跃顶点且没有消息在传递
3) 终止MasterCompute计算
总结:为解决在多个Aggr