Hadoop Map/Reduce执行全流程关键代码(二)

2014-11-24 09:22:13 · 作者: · 浏览: 4
sMapTask(), conf, pidFile); |添加至内存管理
|-->jvmManager.launchJvm(this, jvmManager.constructJvmEnv(setup,vargs,stdout,stderr,logSize, |统一纳入jvm管理器当中并启动
workDir, env, pidFile, conf));
|-->mapJvmManager.reapJvm(t, env); |区分map/reduce操作

JvmManager.reapJvm() |
|--> while (jvmIter.hasNext())
|-->JvmRunner jvmRunner = jvmIter.next().getValue();
|-->JobID jId = jvmRunner.jvmId.getJobId();
|-->setRunningTaskForJvm(jvmRunner.jvmId, t);
|-->spawnNewJvm(jobId, env, t);
|-->JvmRunner jvmRunner = new JvmRunner(env,jobId);
|-->jvmIdToRunner.put(jvmRunner.jvmId, jvmRunner);
|-->jvmRunner.start(); |执行JvmRunner的run()方法
|-->jvmRunner.run()
|-->runChild(env);
|-->List wrappedCommand = TaskLog.captureOutAndError(env.setup, env.vargs, env.stdout, env.stderr,
env.logSize, env.pidFile); |选取main函数
|-->shexec.execute(); |执行
|-->int exitCode = shexec.getExitCode(); |获取执行状态值
|--> updateOnJvmExit(jvmId, exitCode, killed); |更新Jvm状态

Child.main() 执行Task(map/reduce)
|-->JVMId jvmId = new JVMId(firstTaskid.getJobID(),firstTaskid.isMap(),jvmIdInt);
|-->TaskUmbilicalProtocol umbilical = (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
TaskUmbilicalProtocol.versionID, address, defaultConf);
|--> while (true)
|-->JvmTask myTask = umbilical.getTask(jvmId);
|-->task = myTask.getTask();
|-->taskid = task.getTaskID();
|-->TaskRunner.setupWorkDir(job);
|-->task.run(job, umbilical); |以maptask为例
|-->TaskReporter reporter = new TaskReporter(getProgress(), umbilical);
|-->if (useNewApi)
|-->runNewMapper(job, split, umbilical, reporter);
|-->else
|-->runOldMapper(job, split, umbilical, reporter);
|-->inputSplit = (InputSplit) ReflectionUtils.newInstance(job.getClassByName(splitClass), job);
|-->MapRunnable runner = ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
|-->runner.run(in, new OldOutputCollector(collector, conf), reporter);

MapRunner.run()
|--> K1 key = input.createKey();
|-->V1 value = input.createva lue();
|-->while (input.next(key, value))
|-->mapper.map(key, value, output, reporter);
|--> if(incrProcCount)
|-->reporter.incrCounter(SkipBadRecords.COUNTER_GROUP,
|-->SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
|-->mapper.close();