- packagecn.luxh.app;
- importjava.io.IOException;
- importjava.util.StringTokenizer;
- importorg.apache.hadoop.conf.Configuration;
- importorg.apache.hadoop.hbase.HBaseConfiguration;
- importorg.apache.hadoop.hbase.client.Put;
- importorg.apache.hadoop.hbase.client.Result;
- importorg.apache.hadoop.hbase.client.Scan;
- importorg.apache.hadoop.hbase.io.ImmutableBytesWritable;
- importorg.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
- importorg.apache.hadoop.hbase.mapreduce.TableMapper;
- importorg.apache.hadoop.hbase.mapreduce.TableReducer;
- importorg.apache.hadoop.hbase.util.Bytes;
- importorg.apache.hadoop.io.IntWritable;
- importorg.apache.hadoop.io.Text;
- importorg.apache.hadoop.mapreduce.Job;
- /**
- *@authorLuxh
- *
- */
- publicclassWordStat{
- /**
- *TableMapper<Text,IntWritable>Text:输出的key类型,IntWritable:输出的value类型
- */
- publicstaticclassMyMapperextendsTableMapper<Text,IntWritable>{
- privatestaticIntWritableone=newIntWritable(1);
- privatestaticTextword=newText();
- @Override
- protectedvoidmap(ImmutableBytesWritablekey,Resultvalue,
- Contextcontext)
- throwsIOException,InterruptedException{
- //表里面只有一个列族,所以我就直接获取每一行的值
- Stringwords=Bytes.toString(value.list().get(0).getValue());
- StringTokenizerst=newStringTokenizer(words);
- while(st.hasMoreTokens()){
- Strings=st.nextToken();
- word.set(s);
- context.write(word,one);
- }
- }
- }
- /**
- *TableReducer<Text,IntWritable>Text:输入的key类型,IntWritable:输入的value类型,ImmutableBytesWritable:输出类型
- */
- publicstaticclassMyReducerextendsTableReducer<Text,IntWritable,ImmutableBytesWritable>{
- @Override
- protectedvoidreduce(Textkey,Iterable<IntWritable>values,
- Contextcontext)
- throwsIOException,InterruptedException{
- intsum=0;
- for(IntWritableva l:values){
- sum+=val.get();
- }
- //添加一行记录,每一个单词作为行键
- Putput=newPut(Bytes.toBytes(key.toString()));
- //在列族result中添加一个标识符num,赋值为每个单词出现的次数
- //String.valueOf(sum)先将数字转化为字符串,否则存到数据库后会变成\x00\x00\x00\x这种形式
- //然后再转二进制存到hbase。
- put.add(Bytes.toBytes("result"),Bytes.toBytes("num"),Bytes.toBytes(String.valueOf(sum)));
- context.write(newImmutableBytesWritable(Bytes.toBytes(key.toString())),put);
- }
- }
- publicstaticvoidmain(String[]args)throwsIOException,ClassNotFoundException,InterruptedException{
- Configurationconf=HBaseConfiguration.create();
- Jobjob=newJob(conf,"wordstat");
- job.setJarByClass(Blog.class);
- Scanscan=newScan();
- //指定要查询的列族
- scan.addColumn(Bytes.toBytes("content"),null);
- //指定Mapper读取的表为word
- TableMapReduceUtil.initTableMapperJob("word",scan,MyMapper.class,Text.class,IntWritable.class,job);
- //指定Reducer写入的表为stat
- TableMapReduceUtil.initTableReducerJob("stat",MyReducer.class,job);
- System.exit(job.waitForCompletion(true)0:1);
- }
- }
|