设为首页 加入收藏

TOP

编译工具中的 HDFS远程连接配置
2019-04-07 12:21:05 】 浏览:85
Tags:编译 工具 HDFS 远程 连接 配置
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_39081169/article/details/83382607

在configuration中配置了远程连接,可直接在编译工具上运行MapReduce,而不需要将jar打包拷到虚拟机中运行 这种方法更方便

本人使用的是 IDEA

配置和使用如下

1.写一个工具类 HadoopUtil:

public class HadoopUtil {

//构建一个获取configuration静态方法

public static Configuration getRemoteHadoopConf(){

Configuration conf=getBaseRemoteHadoopConf();

conf.set("mapreduce.job.jar", "I:\\bigdata\\target\\bigdata-all.jar");//你项目打包所存放的地址

return conf;

}

//重载

public static Configuration getRemoteHadoopConf(String jarAbsPath){

Configuration conf=new Configuration();

conf.set("mapreduce.job.jar",jarAbsPath);

return conf;

}

private static Configuration getBaseRemoteHadoopConf(){

Configuration conf=new Configuration();

//!!!下面的配置是远程连接起作用的配置

//HDFS远程连接信息

conf.set("fs.defaultFS", "hdfs://mycluster"); // 集群逻辑名称 ,指定hdfs的nameservice为mycluster,与core-site.xml中的配置保持一致

conf.set("dfs.nameservices", "mycluster");//给HDFS集群取一个逻辑名称, 指定hdfs的nameservice为mycluster,需要和core-site.xml中的保持一致

conf.set("dfs.ha.namenodes.mycluster", "nn1,nn2"); //mycluster 下面有两个NameNode, 分别是nn1,nn2

conf.set("dfs.namenode.rpc-address.mycluster.nn1", "mini1:8020");//nn1对应的真实访问地址 ,RPC通信地址

conf.set("dfs.namenode.rpc-address.mycluster.nn2", "mini2:8020");

conf.set("dfs.client.failover.proxy.provider.mycluster", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");//配置失败自动切换实现方式的那个主类

//MR&YARN相关配置

conf.set("mapreduce.framework.name", "yarn");//指定mr框架为yarn方式

conf.set("yarn.resourcemanager.address", "http://mini2:8032");//指定resourceManger的访问地址

conf.set("yarn.resourcemanager.scheduler.address", "http://mini2:8030");

conf.set("mapreduce.app-submission.cross-platform", "true");//允许跨平台提交jar包

return conf;

}

}

2.远程连接配置已经ok 配置,下面wordcount 示例如何使用

当代码完成后,直接在编译工具运行即可, 在虚拟机进行查询结果

Job:

public class WCAPP {

/**

* 程序入口

* @param args

*/

public static void main(String[] args) throws Exception {

//构建Job类的对象

Job wcjob = Job.getInstance(HadoopUtil.getRemoteHadoopConf()); // 切记一定要在构建job类对象时将上述工具类中返回的configuration传入,否则远程连接不起作用,其他代码与普通mp无异,整个远程连接起作用的也就是这段与 前面的配置信息连段代码

//给当前job类的对象设置job名称

wcjob.setJobName("zxlwcProgrammer");

//设置运行主类

wcjob.setJarByClass(WCAPP.class);

//设置job的Mapper及其输出K,V的类型

wcjob.setMapperClass(WCMapper.class);

wcjob.setMapOutputKeyClass(Text.class);

wcjob.setMapOutputValueClass(IntWritable.class);

//设置job的输出K,V的类型,也可以说是Reducer输出的K,V的类型

wcjob.setReducerClass(WCReducer.class);

wcjob.setOutputKeyClass(Text.class);

wcjob.setOutputValueClass(IntWritable.class);

//设置要处理的HDFS上的文件的路径

FileInputFormat.addInputPath(wcjob,new Path(inputPath));

//设置最终输出结果的路径

FileOutputFormat.setOutputPath(wcjob,new Path(outputPath));

//等待程序完成后自动结束程序

System.exit(wcjob.waitForCompletion(true)0:1);

}

}

-----------------------------------------------------------------------------------------------------------------------------------------------------------------

Mapper:

public class WCMapper extends Mapper<LongWritable,Text,Text,IntWritable>{

/**

* 自定义映射器中的映射方法

* 具体效果: line --> list<k,v>

* @param key 当前处理行的行号

* @param value 当前处理行的内容

* @param context MR编程框架中的组件, 帮助我们将映射出来的多个k,v 输送到 Reducer那里去

* @throws IOException

* @throws InterruptedException

*/

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

//将Text类型的行内容转换成String类型

String lineContent = value.toString();

//将给定的一行文本切割成多个字符

String[] items= lineContent.split(" ");

for(int i=0; i<items.length; i++){

context.write(new Text(items[i]),new IntWritable(1));

}

}

}

Reducer:

public class WCReducer extends Reducer<Text,IntWritable,Text,IntWritable>{

/**

* 自定义规约器中的规约方法

* @param key 由多个Mapper中映射出的某个key

* @param values 上述key对应的全部value

* @param context MR编程框架中的组件, 帮助我们将规约结果输送到HDFS上

* @throws IOException

* @throws InterruptedException

*/

@Override

protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

int count =0;

Iterator<IntWritable> iterator= values.iterator();

while(iterator.hasNext()){

count = count + iterator.next().get();

}

context.write(key,new IntWritable(count));

}

}

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇centos-7 部署hadoop2.5.1 >&g.. 下一篇Hive(五):常见存储格式的性能测试..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目