设为首页 加入收藏

TOP

从hbase读取内容到hdfs文件上
2019-01-04 01:46:39 】 浏览:26
Tags:hbase 读取 内容 hdfs 文件
import java.io.IOException;
import java.util.Date;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
//从hbase读取内容到hdfs文件上
public class WordCountHbaseMapreduce02 {
public static class MyHBaseMap02 extends TableMapper<text, text="">{
public static void main(String[] args) throws Exception {
System.exit(run());
}
@Override
protected void map(ImmutableBytesWritable key, Result value,
Mapper<immutablebyteswritable, text="">.Context context)
throws IOException, InterruptedException {
String word=null;
String num=null;
List<cell> cs=value.listCells();
for(Cell cell:cs){
word=Bytes.toString(CellUtil.cloneRow(cell));
num=Bytes.toString(CellUtil.cloneva lue(cell));
}
context.write(new Text(word), new Text(num));
}
public static int run() throws Exception {
Configuration conf = new Configuration();
conf = HBaseConfiguration.create(conf);
conf.set("hbase.zookeeper.quorum", "192.168.52.140");
Job job = Job.getInstance(conf, "wordcount2");
job.setJarByClass(WordCountHbaseMapreduce02.class);
Scan scan = new Scan();
//取对业务有用的数据 tags, nickname
scan.addColumn(Bytes.toBytes("wordcount"), Bytes.toBytes("num"));
//数据来源 hbase
//TableInputFormat.addColumns(scan, columns);
//ImmutableBytesWritable来自hbase数据的类型
TableMapReduceUtil.initTableMapperJob("word", scan, MyHBaseMap02.class,
Text.class, Text.class, job);
FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.52.140:9000/hadoop_hbase_out" + new Date().getTime()));
return job.waitForCompletion(true) 0 : 1;
}
}
}

编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Hbase架构   Hbase Region的.. 下一篇Spark 操作Hbase 对表的操作:..

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(214) }