"setup: Connected to Zookeeper service " +serverPortList);
this.graphFunctions = determineGraphFunctions(conf, zkManager);
instantiateBspService(serverPortList, sessionMsecTimeout);
}
依次介绍每个方法的功能:
1) locateZookeeperClasspath(zkPathList):找到ZK jar的本地副本,其路径为:/home/hadoop/hadooptmp/mapred/local/taskTracker/root/jobcache/job_201403270456_0001/jars/job.jar ,用于启动ZooKeeper服务。
2) startZooKeeperManager(),初始化和配置ZooKeeperManager。定义如下,
/**
* Instantiate and configure ZooKeeperManager for this job. This will
* result in a Giraph-owned Zookeeper instance, a connection to an
* existing quorum as specified in the job configuration, or task failure
* @return true if this task should terminate
*/
private boolean startZooKeeperManager()
throws IOException, InterruptedException {
zkManager = new ZooKeeperManager(context, conf);
context.setStatus("setup: Setting up Zookeeper manager.");
zkManager.setup();
if (zkManager.computationDone()) {
done = true;
return true;
}
zkManager.onlineZooKeeperServers();
serverPortList = zkManager.getZooKeeperServerPortString();
return false;
}
org.apache.giraph.zk.ZooKeeperManager 类,功能:Manages the election of ZooKeeper servers, starting/stopping the services, etc.
ZooKeeperManager类的setup()定义如下:
/**
* Create the candidate stamps and decide on the servers to start if
* you are partition 0.
*/
public void setup() throws IOException, InterruptedException {
createCandidateStamp();
getZooKeeperServerList();
} createCandidateStamp()方法在 HDFS上 的_bsp/_defaultZkManagerDir/job_201403301409_0006/_task 目录下为每个task创建一个文件,文件内容为空。文件名为本机的Hostname+taskPartition,如下截图:

运行时指定了5个workers(-w 5),再加上一个master,所有上面有6个task。
getZooKeeperServerList()方法中,taskPartition为0的task会调用createZooKeeperServerList()方法创建ZooKeeper server List,也是创建一个空文件,通过文件名来描述Zookeeper servers。

createZooKeeperServerList核心代码如下:
/**
* Task 0 will call this to create the ZooKeeper server list. The result is
* a file that describes the ZooKeeper servers through the filename.
*/
private void createZooKeeperServerList() throws IOException,
InterruptedException {
Map hostnameTaskMap = Maps.newTreeMap();
while (true) {
FileStatus [] fileStatusArray = fs.listStatus(taskDirectory);
hostnameTaskMap.clear();
if (fileStatusArray.length > 0) {
for (FileStatus fileStatus : fileStatusArray) {
String[] hostnameTaskArray =
fileStatus.getPath().getName().split(HOSTNAME_TASK_SEPARATOR);
if (!hostnameTaskMap.containsKey(hostnameTaskArray[0])) {
hostnameTaskMap.put(hostnameTaskArray[0],
new Integer(hostnameTaskArray[1]));
}
}
if (hostnameTaskMap.size() >= serverCount) {
break;
}
Thread.sleep(pollMsecs);
}
}
}
首先获取taskDirectory(_bsp/_defaultZkManagerDir/job_201403301409_0006/_task)目录下文件,如果当前目录下有文件,则把文件名(Hostname+taskPartition)中的Hostname和taskPartition存入到hostNameTaskMap中。扫描taskDirectory目录后,若hostNameTaskMap的size大于serverCount(等于GiraphConstants.java中的ZOOKEEPER_SERVER_COUNT变量,定义为1),就停止外层的循环。外层循环的目的是:因为taskDirectory下的文件每个task文件时多个task在分布式条件下创建的,有可能task 0在此创建server List时,别的task还没有生成后task文件。Giraph默认为每个Job启动一个ZooKeeper服务,也就是说只有一个task会启动ZooKeeper服务。
经过多次测试,task 0总是被选为ZooKeeper Server ,因为在同一进程中,扫描taskDirectory时,只有它对应的task 文件(其他task的文件还没有生成好