本人原创,转载请注明出处!欢迎大家加入Giraph 技术交流群: 228591158
Giraph中Aggregator的用法请参考官方文档:http://giraph.apache.org/aggregators.html ,本文重点在解析Giraph如何实现Aggregators。
基本原理:在每个超级步中,每个Worker计算本地的聚集值。超级步计算完成后,把本地的聚集值发送给Master汇总。在MasterCompute()执行后,把全局的聚集值回发给所有的Workers。
缺点:当某个应用(或算法)使用了多个聚集器(Aggregators),Master要完成所有聚集器的计算。因为Master要接受、处理、发送大量的数据,无论是在计算方面还是网络通信层次,都会导致Master成为系统瓶颈。
改进:采用分片聚集 (sharded aggregators) . 在每个超级步的最后,每个聚集器被派发给一个Worker,该Worker接受和聚集其他Workers发送给该聚集器的值。然后Workers把自己的所有的聚集器发送给Master,这样Master就无需执行任何聚集,只是接收每个聚集器的最终值。在MasterCompute.compute执行后,Master不是直接把所有的聚集器发送给所有的Workers,而是发送给聚集器所属的Worker,然后每个Worker再把其上的聚集器发送给所有的Workers.
首先给出Master <-- > Worker间, Worker <--> Worker间通信协议,在每个类中的doRequest(ServerData serverData)方法中会解析并存储收到的消息。
1). org.apache.giraph.comm.requests.SendWorkerAggregatorsRequest 类 . Worker --> Worker Owner
功能:每个worker把当前超步的局部 aggregated values 发送到该Aggregator的拥有者。
2). org.apache.giraph.comm.requests.SendAggregatorsToMasterRequest 类. Worker Owner--> Master
功能:每个Worker把自己所拥有的Aggregator的最终 aggregated values 发送给 master。
3). org.apache.giraph.comm.requests.SendAggregatorsToOwnerRequest 类. Master --> Worker Owner.
功能:master把最终的 aggregated values 或aggregators 发送给该Aggregator的拥有者。
4). org.apache.giraph.comm.requests.SendAggregatorsToWorkerRequest 类。 Worker Owner--> Worker
功能: 发送最终的 aggregated values 到 其他workers。发送者为该Aggregator的拥有者,接受者为除发送者之外的所有workers。

IDC49rOsvLayvU1hc3RlckNvbXB1dGUuY29tcHV0ZSgpt723qNbQu/G1w7XEvtu8r8b3JiMyMDU0MDu++c6qxuSz9cq8JiMyMDU0MDs8L3N0cm9uZz6ho7TTtdoxuPazrLy2sr2/qsq8o6xNYXN0ZXJDb21wdXRlLmNvbXB1dGUoKbe9t6iyxbvxtcPBy8v509BWZXJ0ZXguY29tcHV0ZSgp1Nq12jC49rOsvLayvb7bvK+1xCYjMjA1NDA7oaM8L3A+CjxwPjEuILTTtdowuPazrLy2sr2/qsq8o6xCc3BTZXJ2aWNlTWFzdGVytffTw01hc3RlckFnZ3JlZ2F0b3JIYW5kbGVywOC1xGZpbmlzaFN1cGVyU3RlcChNYXN0ZXJDbGllbnQgbWFzdGVyQ2xpZW50KSC3vbeosNG+27yvxvfFybeiuPhXb3JrZXKjrL7bvK/G97XEdmFsdWXOqsnP0ru49rOsvLayvbXEyKu+1r7bvK8mIzIwNTQwO6OoZmluYWwgYWdncmVnYXRlZCB2YWx1ZXOjqaOstdrSu7TOzqqz9cq8JiMyMDU0MDuho8/IuPiz9k1hc3RlckFnZ3JlZ2F0b3JIYW5kbGVytcTA4LzMs9C52M+1o6zI58/Co7o8L3A+CjxwPjxpbWcgc3JjPQ=="https://www.cppentry.com/upload_files/article/57/1_40oym__.jpg" alt="\">
finishSuperStep(MasterClient masterClient) 方法核心内容如下:
< http://www.2cto.com/kf/ware/vc/" target="_blank" class="keylink">vcD4KPHByZSBjbGFzcz0="brush:sql;"> /** * Finalize aggregators for current superstep and share them with workers */ public void finishSuperstep(MasterClient masterClient) { for (AggregatorWrapper aggregator : aggregatorMap.values()) { if (aggregator.isChanged()) { // if master compute changed the value, use the one he chose aggregator.setPreviousAggregatedValue( aggregator.getCurrentAggregatedValue()); // reset aggregator for the next superstep aggregator.resetCurrentAggregator(); } } /** * 把聚集器发送给所属的Worker。发送内容: * 1). Name of the aggregator * 2). Class of the aggregator * 3). Value of the aggretator */ try { for (Map.Entry> entry : aggregatorMap.entrySet()) { masterClient.sendAggregator(entry.getKey(), entry.getValue().getAggregatorClass(), entry.getValue().getPreviousAggregatedValue()); } masterClient.finishSendingAggregatedValues(); } catch (IOException e) { throw new IllegalStateException("finishSuperstep: " + "IOException occurred while sending aggregators", e); } }
问题1:如何确定aggregator的Worker Owner ?
答:根据aggregator的Name来确定它所属的Worker,计算方法如下:
/**
* 根据aggregatorName和所有的workers列表来计算aggregator所属的Worker
* 参数aggregatorName:Name of the aggregator
* 参数workers: Workers的list列表
* 返回值:Worker which owns the aggregator
*/
public static WorkerInfo getOwner(String aggregatorName,List workers) {
//用aggregatorName的HashCode()值模以 Workers的总数目
int index = Math.abs(aggregatorName.hashCode() % workers.size());
return workers.get(index); //返回aggregator所属的Worker
}
问题2:Worker 如何判断自身是否接收完自己所拥有的aggregators?
答:Master给某个Worker发送aggregators时,同时发送到该Worker的aggregators数目。使用的 SendAggregatorsToOwnerRequest类对消息进行封装和解析。
2. Worker接受Master发送的Aggregator,Worker把接收到的聚集体值发送给其他所有Workers,然后每个Workers就会得到上一个超级步的全局聚集值。
由前文知道,每个Worker都有一个ServerData对象,ServerData类中关于Aggregator的两个成员变量如下:
// 保存Worker在当前超步拥有的aggregators
private final OwnerAggregatorServerData owner