egator条件下,Master成为系统瓶颈的问题。采取了把所有Aggregator派发给某一部分Workers,由这些Workers完成全局的聚集值的计算与发送,Master只需要与这些Workers进行简单数据通信即可,大大降低了Master的工作量。
追加:下面用图示方法说明上述执行过程。
实验条件:
1). 一个Master,四个Worker
2). 两个Aggregators,记为A1和A2。
1. Master把Aggregators发送给Workers,收到Aggregator的Worker就作为该Aggregator的Owner。下图中Master把A1发送给Worker1,A2发送给Worker3.那么Worker1就作为A1的Owner,Worker3就是A2的Owner。该步骤在MasterAggregatorHandler类的finishSuperStep(MasterClient masterClient) 方法中完成,使用的是SendAggregatorsToOwnerRequest 通信协议。注:每个Owner Worker 可能有多个聚集器。

图1 Master分发Aggregator
2. Workers接受Master发送的Aggregator,然后把Aggregator发送给其他Workers。Worker1要把A1分别发送给Worker2、Worker3和Worker4;Worker3要把A2分别发送给Worker1、Worker2和Worker4。该步骤在WorkerAggregatorHandler类的prepareSuperstep( WorkerAggregatorRequestProcessor requestProcessor)方法中完成,使用的是SendAggregatorsToMasterRequest 通信协议。此步骤完成后,每个Worker上都有了聚集器A1和A2(具体为上一个超步的全局最终聚集值)。

3. 每个Worker调用Vertex.compute()方法开始计算,收集本地的Aggregator聚集值。对聚集体A1来说,Worker1、Worker2、Worker3、Worker4的本地聚集值依次记为:
A11 、A12、 A13、A14;对聚集器A2来说,Worker1、Worker2、Worker3、Worker4的本地聚集值依次记为:
A21 、A22、 A23、A24 。计算完成后,每个Worker就要把本地的聚集值发送给聚集器的Owner,聚集器的Owner在接受的时候会合并聚集。那么A11 、A12、 A13、A14要发送给Worker1进行全局聚集得到A1’,A21 、A22、 A23、A24 要发送给Worker3进行全局聚集得到A2’ 。
公式如下:

此部分采用的是SendWorkerAggregatorsRequest通信协议。Worker1和Worker3要把汇总的A1和A2的新值:A1’ 和A2’发送给Master,供下一次超级步的MasterCompute.compute()方法使用采用的是SendAggregatorsToMasterRequest通信协议。此部分在WorkerAggregatorHandler类的finishSuperstep( WorkerAggregatorRequestProcessor requestProcessor)方法中完成。过程如下图所示:

4. Master收到Worker1发送的A1’ 和Woker3发送的A2’后,此步骤在MasterAggregatorHandler类的prepareSusperStep(masterClient)方法中完成。然后调用MasterCompute.compute()方法,此方法可能会修改聚集器的值,如得到A1’’和A2’’。在masterCompute.compute()方法内若根据聚集器的值调用了MasterCompute类的haltCompute()方法来终止MaterCompute,则表明要结束整个Job。那么Master就会通知所有Workers要结束整个作业;在该方法内若没有调用MasterCompute类的haltCompute()方法,则回到步骤1继续进行迭代,继续把A1’’发送给Worker1,A2’’发送给Worker3。

完!
本人原创,转载请注明出处!欢迎大家加入Giraph 技术交流群: 228591158