This mapper that will execute the BSP graph tasks alloted to this worker. All tasks will be performed by calling the GraphTaskManager object managed by this GraphMapper wrapper classs. Since this mapper will not be passing data by key-value pairs through the MR framework, the Mapper parameter types are irrelevant, and set to Object type.
@Override
public void setup(Context context)
throws IOException, InterruptedException {
// Execute all Giraph-related role(s) assigned to this compute node.
// Roles can include "master," "worker," "zookeeper," or . . .
graphTaskManager = new GraphTaskManager(context);
graphTaskManager.setup(
DistributedCache.getLocalCacheArchives(context.getConfiguration()));
}
@Override
public void run(Context context) throws IOException, InterruptedException {
// Notify the master quicker if there is worker failure rather than
// waiting for ZooKeeper to timeout and delete the ephemeral znodes
try {
setup(context);
while (context.nextKeyValue()) {
graphTaskManager.execute();
}
cleanup(context);
// Checkstyle exception due to needing to dump ZooKeeper failure
} catch (RuntimeException e) {
graphTaskManager.zooKeeperCleanup();
graphTaskManager.workerFailureCleanup();
}
}
2. org.apache.giraph.graph.GraphTaskManager 类
功能:The Giraph-specific business logic for a single BSP compute node in whatever underlying type of cluster our Giraph job will run on. Owning object will provide the glue into the underlying cluster framework and will call this object to perform Giraph work.
下面讲述setup()方法,代码如下。
/**
* Called by owner of this GraphTaskManager on each compute node
* @param zkPathList the path to the ZK jars we need to run the job
*/
public void setup(Path[] zkPathList) throws IOException, InterruptedException {
context.setStatus("setup: Initializing Zookeeper services.");
locateZookeeperClasspath(zkPathList);
serverPortList = conf.getZookeeperList();
if (serverPortList == null && startZooKeeperManager()) {
return; // ZK connect/startup failed
}
if (zkManager != null && zkManager.runsZooKeeper()) {
LOG.info("setup: Chosen to run ZooKeeper...");
}
context.setStatus(