设为首页 加入收藏

TOP

04 hbase提取kafka中的数据存储
2019-01-21 02:34:31 】 浏览:50
Tags:hbase 提取 kafka 数据 存储

上一篇中的测试时是采用kafka消费者,如果把消费者换成hbase就可以实现hbase提取kafka中的数据进行存储。

启动hbase要先启动hdfs,hbase需要zk

启动hdfs:start-dfs.sh

启动hbase:start-hbase.sh

要hbase高可用,需要在其他节点中启动:hbase-daemon.sh start master

各节点进程:

创建hbase消费者:

在idea中需要引入hbase-site.xml以及hdfs-site.xml 文件 一样配置文件外部化:

kafka.properties:

zookeeper.connect=s128:2181,s129:2181,s130:2181
group.id=g4  //用户组
zookeeper.session.timeout.ms=500
zookeeper.sync.time.ms=250
auto.commit.interval.ms=1000
auto.offset.reset=smallest
#主题
topic=calllog//kafka中的topic
#表名
table.name=ns1:calllogs //hbase中数据表名
#分区数
partition.number=100
#主叫标记
caller.flag=0
#hash区域的模式
hashcode.pattern=00

创建HbaseDao类,访问hbase,进行数据相关操作:

/**
 * Hbase数据访问对象
 */
public class HbaseDao {
    //
    private DecimalFormat df = new DecimalFormat() ;

    private Table table = null ;

    private int partitions ;

    private String flag  ;
    public HbaseDao(){
        try {
            Configuration conf = HBaseConfiguration.create();
            Connection conn = ConnectionFactory.createConnection(conf);
            TableName name = TableName.valueOf(PropertiesUtil.getProp("table.name"));
            table = conn.getTable(name);

            df.applyPattern(PropertiesUtil.getProp("hashcode.pattern"));

            partitions = Integer.parseInt(PropertiesUtil.getProp("partition.number"));
            flag = PropertiesUtil.getProp("caller.flag") ;
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    /**
     * put数据到hbase
     */
    public void put(String log){
        if (log == null || log.equals("")) {
            return;
        }
        try {
            //解析日志
            String[] arr = log.split(",");
            if (arr != null && arr.length == 4) {
                String caller = arr[0];
                String callee = arr[1];
                String callTime = arr[2];
                callTime = callTime.replace("/","") ;       //删除/
                callTime = callTime.replace(" ","") ;       //删除空格
                callTime = callTime.replace(":","") ;       //删除空格

                String callDuration = arr[3];
                //结算区域号

                //构造put对象
                String rowkey = genRowkey(getHashcode(caller, callTime), caller, callTime, flag, callee, callDuration);
                //
                Put put = new Put(Bytes.toBytes(rowkey));
                put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("caller"), Bytes.toBytes(caller));
                put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("callee"), Bytes.toBytes(callee));
                put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("callTime"), Bytes.toBytes(callTime));
                put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("callDuration"), Bytes.toBytes(callDuration));
                table.put(put);

            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public String getHashcode(String caller ,String callTime){
        int len = caller.length();
        //取出后四位电话号码
        String last4Code = caller.substring(len - 4);
        //取出时间单位,年份和月份.
        String mon = callTime.substring(0,6);
        //
        int hashcode = (Integer.parseInt(mon) ^ Integer.parseInt(last4Code)) % partitions ;
        return df.format(hashcode);
    }

    /**
     * 生成rowkey
     * @param hash
     * @param caller
     * @param time
     * @param flag
     * @param callee
     * @param duration
     * @return
     */
    public String genRowkey(String hash,String caller,String time,String flag,String callee,String duration){
        return hash + "," + caller + "," + time + "," + flag + "," + callee + "," + duration ;
    }
}

创建HbaseConsumer(hbase消费者):

**
 * Hbase消费者,从kafka提取数据,存储到hbase中。
 */
public class HbaseConsumer {

    public static void main(String[] args) throws Exception {
        HbaseDao dao = new HbaseDao();
        //创建配置对象
        ConsumerConfig config = new ConsumerConfig(PropertiesUtil.props);

        //获得主题
        String topic = PropertiesUtil.getProp("topic");
        //
        Map<String, Integer> map = new HashMap<String, Integer>();
        map.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> msgs = Consumer.createJavaConsumerConnector(new ConsumerConfig(PropertiesUtil.props)).createMessageStreams(map);

        List<KafkaStream<byte[], byte[]>> msgList = msgs.get(topic);

        String msg = null ;
        for (KafkaStream<byte[], byte[]> stream : msgList) {
            ConsumerIterator<byte[], byte[]> it = stream.iterator();
            while (it.hasNext()) {
                byte[] message = it.next().message();
                //取得kafka的消息
                msg = new String(message) ;
                //写入hbase中。
                dao.put(msg);
            }
        }
    }
}

打成jar包放到s128。

因为事先要到入很多相关包,所以在window下使用mvn命令,下载工件的所有依赖软件包
----------------------------------------

mvn -DoutputDirectory=./lib -DgroupId=com.chenway -DartifactId=CallLogConsumerModule -Dversion=1.0-SNAPSHOT dependency:copy-dependencies -DgroupId=com.chenway -DartifactId=CallLogConsumerModule -Dversion=1.0-SNAPSHOT dependency:copy-dependencies

将生成的所有jar包放入s128下lib文件夹

编写run-kafkaconsumer.sh脚本:

运行生成数据以及hbase消费者脚本:

./run-kafkaconsumer.sh

./calllog.sh

可以进入hbase shell

查看命令:scan ‘ns1:calllogs’

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Kafka Topic partition leader 为.. 下一篇Kafka 和 EMS 消息批量 ack 的实现

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目