MapReduce统计结果直接输出hbase,我使用的是hadoop1.0.4版本和hbase 0.94版本,hadoop和hbase安装伪分布式。1.hadoop安装这里就不讲了。2.hbase安装我这里将一下。首页解压habase安装包到/home/hadoop目录。配置hosts文件如下:
-
192.168.0.101 hadoop.master
复制代码
配置hbase-site.xml,配置内容如下:
-
<configuration>
-
<property>
-
<name>hbase.rootdir</name>
-
<value>hdfs://hadoop.master:9000/hbase</value>
-
</property>
-
<property>
-
<name>hbase.cluster.distributed</name>
-
<value>true</value>
-
</property>
-
<property>
-
<name>hbase.zookeeper.quorum</name>
-
<value>hadoop.master</value>
-
</property>
-
<property>
-
<name>hbase.zookeeper.property.dataDir</name>
-
<value>/home/hadoop/zookeeper</value>
-
</property>
-
<property>
-
<name>hbase.regionserver.handler.count</name>
-
<value>100</value>
-
</property>
-
<property>
-
<name>hbase.hregion.max.filesize</name>
-
<value>8589934592</value>
-
</property>
-
<property>
-
<name>hfile.block.cache.size</name>
-
<value>0.3</value>
-
</property>
-
<property>
-
<name>dfs.replication</name>
-
<value>1</value>
-
</property>
-
</configuration>
复制代码
hbase-site.xml配置完后,在配置hbase-env.sh,我只把其中配置的如下显示:
-
export JAVA_HOME=/usr/jdk1.6.0_22
-
export HBASE_OPTS="-XX:+UseConcMarkSweepGC"
-
export HBASE_MANAGES_ZK=true
复制代码
上面最后一项一定要打开。设置zookeeper管理hbase。最后配置regionservers 如下:
注意:如果需要在本地连接hbase,需要关闭防火墙,执行命令/sbin/service iptables stop接下来启动hbase,创建表:TestCars,列族Car:准备数据:
-
Acura,Integra,Small
-
Acura,Legend,Midsize
-
Audi,90,Compact
-
Audi,100,Midsize
-
BMW,535i,Midsize
-
Buick,Century,Midsize
-
Buick,LeSabre,Large
-
Buick,Roadmaster,Large
-
Buick,Riviera,Midsize
-
Cadillac,DeVille,Large
-
Cadillac,Seville,Midsize
复制代码
将数据上传hadoop文件系统:
-
hadoop fs -copyfromLocal /home/hadoop/Car.txt /home/hadoop/input
复制代码
在运行mapreduce时需要将hbase-0.94.6.jar 、zookeeper-3.4.5.jar、protobuf-java-2.4.0a.jar添加到hadoop lib目录下,或者另一种方式在执行mapreduce时,导入前面三个包。下面是实现的具体代码:
-
package com.duplicate;
-
-
import java.io.IOException;
-
import java.util.ArrayList;
-
import java.util.List;
-
-
import org.apache.hadoop.conf.Configuration;
-
import org.apache.hadoop.fs.Path;
-
import org.apache.hadoop.hbase.HBaseConfiguration;
-
import org.apache.hadoop.hbase.client.HTableInterface;
-
import org.apache.hadoop.hbase.client.HTablePool;
-
import org.apache.hadoop.hbase.client.Put;
-
import org.apache.hadoop.hbase.util.Bytes;
-
import org.apache.hadoop.io.Text;
-
import org.apache.hadoop.mapreduce.Job;
-
import org.apache.hadoop.mapreduce.Mapper;
-
import org.apache.hadoop.mapreduce.Reducer;
-
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
import org.slf4j.Logger;
-
import org.slf4j.LoggerFactory;
-
-
public class OutputHbase {
-
-
private static Logger logger = LoggerFactory.getLogger(OutputHbase.class);
-
-
public static class Map extends Mapper<Object,Text,Text,Text>{
-
private Text outKey = new Text();
-
private Text outVal = new Text();
-
-
public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
-
String[] valueSplitted = value.toString().split(",");
-
if(valueSplitted.length == 3){
-
String brand = valueSplitted[0];
-
String model = valueSplitted[1];
-
String size = valueSplitted[2];
-
outKey.set(brand);
-
outVal.set(model + "," + size);
-
context.write(outKey, outVal);
-
}
-
}
-
}
-
-
public static class Reduce extends Reducer<Text,Text,Text,Text>{
-
private HTablePool pool = null;
-
private HTableInterface testHTable = null;
-
private List<Put> testListPut = new ArrayList<Put>();
-
-
@Override
-
public void setup(Context context){
-
Configuration conf = HBaseConfiguration.create();
-
conf.set("hbase.zookeeper.quorum", "192.168.0.101");
-
pool = new HTablePool(conf, 10);
-
testHTable = pool.getTable("TestCars");
-
}
-
-
@Override
-
public void reduce(Text key,Iterable<Text> values,Context context)throws IOException,InterruptedException{
-
String brand = key.toString();
-
-
for(Text tx : values){
-
String[] valueSplitted = tx.toString().split(",");
-
if(valueSplitted.length == 2){
-
String model = valueSplitted[0];
-
String size = valueSplitted[1];
-
-
byte[] putKey = Bytes.toBytes(brand+","+model);
-
byte[] putFmaily = Bytes.toBytes("Car");
-
Put put = new Put(putKey);
-
-
byte[] putQ = Bytes.toBytes("brand");
-
byte[] putVal = Bytes.toBytes(brand);
-
put.add(putFmaily,putQ,putVal);
-
-
putQ = Bytes.toBytes("model");
-
putVal = Bytes.toBytes(model);
-
put.add(putFmaily,putQ,putVal);
-
-
putQ = Bytes.toBytes("size");
-
putVal = Bytes.toBytes(size);
-
put.add(putFmaily,putQ,putVal);
-
testListPut.add(put);
-
}
-
}// End for
-
-
testHTable.put(testListPut);
-
testHTable.flushCommits();
-
}
-
-
@Override
-
public void cleanup(Context context)throws IOException{
-
if(null != testHTable){
-
testHTable.close();
-
}
-
-
if(null != pool){
-
pool.close();
-
}
-
}
-
}
-
-
/**
-
* @param args
-
* @throws IOException
-
* @throws ClassNotFoundException
-
* @throws InterruptedException
-
*/
-
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
-
// TODO Auto-generated method stub
-
Configuration conf = new Configuration();
-
-
-
Job job = new Job(conf,"OutputHbase");
-
//TableMapReduceUtil.addDependencyJars(job);
-
job.setJarByClass(OutputHbase.class);
-
job.setMapperClass(Map.class);
-
job.setReducerClass(Reduce.class);
-
-
job.setOutputKeyClass(Text.class);
-
job.setOutputValueClass(Text.class);
-
FileInputFormat.addInputPath(job, new Path(args[0]));
-
FileOutputFormat.setOutputPath(job, new Path(args[1]));
-
System.exit(job.waitForCompletion(true)0:1);
-
}
-
-
}
-
复制代码
执行方式:
-
hadoop jar /home/hadoop/dedup.jar com.duplicate.OutputHbase /home/hadoop/input/* /home/hadoop/output
复制代码
查看结果:两种方式一种直接在hbase客户查看,另一种是用程序直接读出来:hbase客户端查询:
java代码查询,我下面只查询了主键key值:
-
package com.duplicate.local;
-
-
import java.io.IOException;
-
-
import org.apache.hadoop.conf.Configuration;
-
import org.apache.hadoop.hbase.HBaseConfiguration;
-
import org.apache.hadoop.hbase.client.HTableInterface;
-
import org.apache.hadoop.hbase.client.HTablePool;
-
import org.apache.hadoop.hbase.client.Result;
-
import org.apache.hadoop.hbase.client.ResultScanner;
-
import org.apache.hadoop.hbase.client.Scan;
-
import org.apache.hadoop.hbase.util.Bytes;
-
-
public class ConnectionHbase {
-
private static HTablePool pool = null;
-
-
/**
-
* @param args
-
*/
-
public static void main(String[] args) {
-
ConnectionHbase hbase = new ConnectionHbase();
-
hbase.run();
-
}
-
-
public void run() {
-
// TODO Auto-generated method stub
-
Configuration conf = HBaseConfiguration.create();
-
HTableInterface testHTable = null;
-
conf.set("hbase.zookeeper.quorum", "192.168.0.101");
-
pool = new HTablePool(conf, 10);
-
-
testHTable = pool.getTable("TestCars");
-
Scan scan = new Scan();
-
try {
-
ResultScanner res = testHTable.getScanner(scan);
-
for(Result rs : res){
-
System.out.println(Bytes.toString(rs.getRow()));
-
}
-
} catch (IOException e) {
-
// TODO Auto-generated catch block
-
e.printStackTrace();
-
}
-
}
-
-
}
-
复制代码
|