import
java.io.IOException;
import
java.util.List;
import
org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.fs.Path;
import
org.apache.hadoop.hbase.Cell;
import
org.apache.hadoop.hbase.CellUtil;
import
org.apache.hadoop.hbase.HBaseConfiguration;
import
org.apache.hadoop.hbase.client.Result;
import
org.apache.hadoop.hbase.client.Scan;
import
org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import
org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import
org.apache.hadoop.hbase.mapreduce.TableMapper;
import
org.apache.hadoop.hbase.util.Bytes;
import
org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.Mapper;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public
class
WordCountHbaseMapreduce02 {
public
static
class
MyHBaseMap02
extends
TableMapper<text, text=
""
>{
public
static
void
main(String[] args)
throws
Exception {
System.exit(run());
}
@Override
protected
void
map(ImmutableBytesWritable key, Result value,
Mapper<immutablebyteswritable, text=
""
>.Context context)
throws
IOException, InterruptedException {
String word=
null
;
String num=
null
;
List<cell> cs=value.listCells();
for
(Cell cell:cs){
word=Bytes.toString(CellUtil.cloneRow(cell));
num=Bytes.toString(CellUtil.cloneva lue(cell));
}
context.write(
new
Text(word),
new
Text(num));
}
public
static
int
run()
throws
Exception {
Configuration conf =
new
Configuration();
conf = HBaseConfiguration.create(conf);
conf.set(
"hbase.zookeeper.quorum"
,
"192.168.52.140"
);
Job job = Job.getInstance(conf,
"wordcount2"
);
job.setJarByClass(WordCountHbaseMapreduce02.
class
);
Scan scan =
new
Scan();
scan.addColumn(Bytes.toBytes(
"wordcount"
), Bytes.toBytes(
"num"
));
TableMapReduceUtil.initTableMapperJob(
"word"
, scan, MyHBaseMap02.
class
,
Text.
class
, Text.
class
,
job);
return
job.waitForCompletion(
true
)
0
:
1
;
}
}
}