设为首页 加入收藏

TOP

Giraph源码分析――启动ZooKeeper服务(二)
2014-11-24 03:21:20 来源: 作者: 【 】 浏览:10
Tags:Giraph 源码 分析 启动 ZooKeeper 服务
"setup: Connected to Zookeeper service " +serverPortList); this.graphFunctions = determineGraphFunctions(conf, zkManager); instantiateBspService(serverPortList, sessionMsecTimeout); } 依次介绍每个方法的功能:

1) locateZookeeperClasspath(zkPathList):找到ZK jar的本地副本,其路径为:/home/hadoop/hadooptmp/mapred/local/taskTracker/root/jobcache/job_201403270456_0001/jars/job.jar ,用于启动ZooKeeper服务。
2) startZooKeeperManager(),初始化和配置ZooKeeperManager。定义如下,

 /**
   * Instantiate and configure ZooKeeperManager for this job. This will
   * result in a Giraph-owned Zookeeper instance, a connection to an
   * existing quorum as specified in the job configuration, or task failure
   * @return true if this task should terminate
   */
  private boolean startZooKeeperManager()
    throws IOException, InterruptedException {
    zkManager = new ZooKeeperManager(context, conf);
    context.setStatus("setup: Setting up Zookeeper manager.");
    zkManager.setup();
    if (zkManager.computationDone()) {
      done = true;
      return true;
    }
    zkManager.onlineZooKeeperServers();
    serverPortList = zkManager.getZooKeeperServerPortString();
    return false;
  }

org.apache.giraph.zk.ZooKeeperManager 类,功能:Manages the election of ZooKeeper servers, starting/stopping the services, etc.

ZooKeeperManager类的setup()定义如下:

/**
   * Create the candidate stamps and decide on the servers to start if
   * you are partition 0.
   */
  public void setup() throws IOException, InterruptedException {
    createCandidateStamp();
    getZooKeeperServerList();
  }
createCandidateStamp()方法在 HDFS上 的_bsp/_defaultZkManagerDir/job_201403301409_0006/_task 目录下为每个task创建一个文件,文件内容为空。文件名为本机的Hostname+taskPartition,如下截图:

\

运行时指定了5个workers(-w 5),再加上一个master,所有上面有6个task。

getZooKeeperServerList()方法中,taskPartition为0的task会调用createZooKeeperServerList()方法创建ZooKeeper server List,也是创建一个空文件,通过文件名来描述Zookeeper servers。

\

createZooKeeperServerList核心代码如下:

/**
   * Task 0 will call this to create the ZooKeeper server list.  The result is
   * a file that describes the ZooKeeper servers through the filename.
   */
  private void createZooKeeperServerList() throws IOException,
      InterruptedException {
    Map hostnameTaskMap = Maps.newTreeMap();
    while (true) {
      FileStatus [] fileStatusArray = fs.listStatus(taskDirectory);
      hostnameTaskMap.clear();
      if (fileStatusArray.length > 0) {
        for (FileStatus fileStatus : fileStatusArray) {  
          String[] hostnameTaskArray =
              fileStatus.getPath().getName().split(HOSTNAME_TASK_SEPARATOR);
   
          if (!hostnameTaskMap.containsKey(hostnameTaskArray[0])) {
            hostnameTaskMap.put(hostnameTaskArray[0],
                new Integer(hostnameTaskArray[1]));
          }
        }
        if (hostnameTaskMap.size() >= serverCount) {
          break;
        }
        Thread.sleep(pollMsecs);
      }
    }
  }
首先获取taskDirectory(_bsp/_defaultZkManagerDir/job_201403301409_0006/_task)目录下文件,如果当前目录下有文件,则把文件名(Hostname+taskPartition)中的Hostname和taskPartition存入到hostNameTaskMap中。扫描taskDirectory目录后,若hostNameTaskMap的size大于serverCount(等于GiraphConstants.java中的ZOOKEEPER_SERVER_COUNT变量,定义为1),就停止外层的循环。外层循环的目的是:因为taskDirectory下的文件每个task文件时多个task在分布式条件下创建的,有可能task 0在此创建server List时,别的task还没有生成后task文件。Giraph默认为每个Job启动一个ZooKeeper服务,也就是说只有一个task会启动ZooKeeper服务。

经过多次测试,task 0总是被选为ZooKeeper Server ,因为在同一进程中,扫描taskDirectory时,只有它对应的task 文件(其他task的文件还没有生成好

首页 上一页 1 2 3 下一页 尾页 2/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
分享到: 
上一篇搭建MongoDB主从复制(Master-Slav.. 下一篇java使用mongoDB

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

·Java 编程和 c 语言 (2025-12-25 08:19:48)
·. net内存管理宝典这 (2025-12-25 08:19:46)
·C++为什么不加上内存 (2025-12-25 08:19:44)
·MySQL 安装及连接-腾 (2025-12-25 06:20:28)
·MySQL的下载、安装、 (2025-12-25 06:20:26)