设为首页 加入收藏

TOP

Spring boot 消费kafka 写入hbase
2019-02-09 01:53:40 】 浏览:287
Tags:Spring boot 消费 kafka 写入 hbase
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/baifanwudi/article/details/78663306

新建spring boot


添加maven依赖

<xml version="1.0" encoding="UTF-8">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.adups</groupId>
    <artifactId>kafka-hbase</artifactId>
    <version>1.0.0</version>
    <packaging>jar</packaging>

    <name>kafka-hbase</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.8.RELEASE</version>
        <relativePath/> 
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.2.6</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.11</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

日志logback.xml配置

<xml version="1.0" encoding="UTF-8">
<configuration scan="true" scanPeriod="60 seconds">

    <property name="logPath" value="/data/logs/kafka-hbase"/>

    <!-- 文件輸出 -->   
    <appender name="file" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <append>true</append> 
        <encoder>  
            <pattern>  
                %-5level %date{yyyy-MM-dd HH:mm:ss} - %msg%n  
            </pattern>  
            <charset>UTF-8</charset> <!-- 此处设置字符集 -->  
        </encoder>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">  
            <fileNamePattern>${logPath}/kafka_hbase_%d{yyyyMMdd}.log</fileNamePattern>
        </rollingPolicy>  
        <prudent>true</prudent>   
    </appender>  


    <!-- 控制台輸出 -->  
    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">  
        <encoder>  
            <pattern>  
                %-5level %date{yyyy-MM-dd HH:mm:ss} - %msg%n 
            </pattern>  
            <charset>UTF-8</charset> <!-- 此处设置字符集 -->  
        </encoder>  
        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">  
            <level>@log.console.level@</level>  
        </filter>  
    </appender>

    <logger name="org.springframework" level="WARN" additivity="true"></logger>
    <logger name="org.hibernate" level="WARN" additivity="true"></logger>
    <logger name="ch.qos.logback" level="WARN" additivity="true"></logger>
    <logger name="org.apache.kafka" level="WARN" additivity="true"></logger>
    <logger name="org.apache.zookeeper" level="WARN" additivity="true"></logger>
    <logger name="org.apache.hadoop" level="ERROR" additivity="true"></logger>

    <root level="info">
        <appender-ref ref="file" />  
        <appender-ref ref="console" />  
    </root>  
</configuration>

application.properties属性修改


server.port=18080
#hbase 配置
hbase.zookeeper.quorum=xxxx:2181,xxxx:2181,xxxx:2181

#kafka configuration
spring.kafka.bootstrap-servers=vm208:9092,vm211:9092,vm50:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.listener.concurrency=3
spring.kafka.consumer.max-poll-records=1000
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.group-id=Kafka-To-HBase

kafka属性spring-kafka自动读取, 需要配置hbase属性注入

package com.adups.hbase.config;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;


@org.springframework.context.annotation.Configuration
@ConfigurationProperties(prefix = "hbase.zookeeper")
public class HBaseConfigurationBase {

    private  Logger logger= LoggerFactory.getLogger(HBaseConfigurationBase.class);

    private String quorum;

    /**
     * 产生HBaseConfiguration实例化Bean
     * @return
     */
    @Bean
    public Configuration configuration() {
        Configuration conf=HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum",quorum);
        logger.info("quorum is :"+quorum);
        return conf;
    }

    public String getQuorum() {
        return quorum;
    }

    public void setQuorum(String quorum) {
        this.quorum = quorum;
    }
}

编写Hbase操作

package com.adups.hbase.service.impl;

import com.adups.hbase.service.IHBaseService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author allen
 * @date 23/11/2017.
 */

@Service
public class HBaseServiceImpl implements IHBaseService {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private Configuration configuration;

    @Override
    public void createTable(String tableName, String... families) {
        HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
        try (Connection connection = ConnectionFactory.createConnection(configuration);
             Admin admin = connection.getAdmin()) {
            for (String family : families) {
                tableDescriptor.addFamily(new HColumnDescriptor(family));
            }
            if (admin.tableExists(TableName.valueOf(tableName))) {
                System.out.println("Table Exists");
                logger.info("Table:[" + tableName + "] Exists");
            } else {
                admin.createTable(tableDescriptor);
                System.out.println("Create table Successfully!!!Table Name:[" + tableName + "]");
                logger.info("Create table Successfully!!!Table Name:[" + tableName + "]");
            }
        } catch (IOException e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        }
    }

    @Override
    public void deleteTable(String tableName) {
        try (Connection connection = ConnectionFactory.createConnection(configuration);
             Admin admin = connection.getAdmin()) {
            TableName table = TableName.valueOf(tableName);
            if (!admin.tableExists(TableName.valueOf(tableName))) {
                logger.info("[" + tableName + "] is not existed. Delete failed!");
                return;
            }
            admin.disableTable(table);
            admin.deleteTable(table);
            System.out.println("delete table " + tableName + " successfully!");
            logger.info("delete table " + tableName + " successfully!");
        } catch (IOException e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        }
    }

    @Override
    public void putRowValue(String tableName, String rowKey, String familyColumn, String columnName, String value) {
        try (Connection connection = ConnectionFactory.createConnection(configuration);
             Table table = connection.getTable(TableName.valueOf(tableName))) {
            Put put = new Put(Bytes.toBytes(rowKey));
            put.addColumn(Bytes.toBytes(familyColumn), Bytes.toBytes(columnName), Bytes.toBytes(value));
            table.put(put);
            logger.info("update table:" + tableName + ",rowKey:" + rowKey + ",family:" + familyColumn + ",column:" + columnName + ",value:" + value + " successfully!");
            System.out.println("Update table success");
        } catch (IOException e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        }
    }

    @Override
    public void putRowValueBatch(String tableName, String rowKey, String familyColumn, List<String> columnNames, List<String> values) {
        try (Connection connection = ConnectionFactory.createConnection(configuration);
             Table table = connection.getTable(TableName.valueOf(tableName))) {
            Put put = new Put(Bytes.toBytes(rowKey));
            for (int j = 0; j < columnNames.size(); j++) {
                put.addColumn(Bytes.toBytes(familyColumn), Bytes.toBytes(columnNames.get(j)), Bytes.toBytes(values.get(j)));
            }
            table.put(put);
            logger.info("update table:" + tableName + ",rowKey:" + rowKey + ",family:" + familyColumn + ",columns:" + columnNames + ",values:" + values + " successfully!");
            System.out.println("Update table success");

        } catch (IOException e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        }
    }

    @Override
    public void putRowValueBatch(String tableName, String rowKey, String familyColumn, Map<String, String> columnValues) {
        logger.info("begin to update table:" + tableName + ",rowKey:" + rowKey + ",family:" + familyColumn + ",columnValues:" + columnValues.toString());
        try (Connection connection = ConnectionFactory.createConnection(configuration);
             Table table = connection.getTable(TableName.valueOf(tableName))) {
            Put put = new Put(Bytes.toBytes(rowKey));
            for (Map.Entry<String, String> entry : columnValues.entrySet()) {
                put.addColumn(Bytes.toBytes(familyColumn), Bytes.toBytes(entry.getKey()), Bytes.toBytes(entry.getValue()));
            }
            table.put(put);
            logger.info("update table:" + tableName + ",rowKey:" + rowKey + " successfully!");
            System.out.println("Update table success");

        } catch (IOException e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        }
    }

    @Override
    public List<Cell> scanRegexRowKey(String tableName, String regexKey) {
        try (Connection connection = ConnectionFactory.createConnection(configuration);
             Table table = connection.getTable(TableName.valueOf(tableName))) {
            Scan scan = new Scan();
            Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regexKey));
            scan.setFilter(filter);
            ResultScanner rs = table.getScanner(scan);
            for (Result r : rs) {
                return r.listCells();
            }
        } catch (IOException e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        }
        return null;
    }

    @Override
    public void deleteAllColumn(String tableName, String rowKey) {
        try (Connection connection = ConnectionFactory.createConnection(configuration);
             Table table = connection.getTable(TableName.valueOf(tableName))) {
            Delete delAllColumn = new Delete(Bytes.toBytes(rowKey));
            table.delete(delAllColumn);
            System.out.println("Delete AllColumn Success");
            logger.info("Delete rowKey:" + rowKey + "'s all Columns Successfully");
        } catch (IOException e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        }
    }

    @Override
    public void deleteColumn(String tableName, String rowKey, String familyName, String columnName) {
        try (Connection connection = ConnectionFactory.createConnection(configuration);
             Table table = connection.getTable(TableName.valueOf(tableName))) {
            Delete delColumn = new Delete(Bytes.toBytes(rowKey));
            delColumn.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName));
            table.delete(delColumn);
            System.out.println("Delete Column Success");
            logger.info("Delete rowKey:" + rowKey + "'s Column:" + columnName + " Successfully");
        } catch (IOException e) {
            e.printStackTrace();
            logger.error(e.getMessage());
        }
    }
}

kafka操作hbase service示例

package com.adups.kafka.consumer.impl;

import com.adups.hbase.config.BaseConfig;
import com.adups.hbase.service.IHBaseService;
import com.adups.kafka.bean.CheckInfo;
import com.adups.kafka.consumer.AbstractConsumer;
import com.alibaba.fastjson.JSON;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author allen
 * @date 23/11/2017.
 */

@Component
public class CheckConsumer extends AbstractConsumer<CheckInfo> {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    IHBaseService ihBaseService;

    @Override
    @KafkaListener(topics = {"ota_check"})
    public void listen(ConsumerRecord<, > record) {
        System.out.printf("offset = %d,topic= %s,partition=%s,key =%s,value=%s\n", record.offset(), record.topic(), record.partition(), record.key(), record.value());
        logger.info("value is: "+record.value());
        CheckInfo checkInfo = JSON.parseObject(record.value().toString(), CheckInfo.class);
        updateOrInsert(checkInfo);
    }

    @Override
    public void updateOrInsert(CheckInfo checkInfo) {
        Map<String, String> columnValues = new HashMap<>(8);
        Long productId = checkInfo.getProductId();
        Integer deltaId = checkInfo.getDeltaId();
        String mid = checkInfo.getMid();
        String rowKeyRegex = rowKeyRegex(productId, deltaId, mid);
        List<Cell> result = ihBaseService.scanRegexRowKey(tableName, rowKeyRegex);
        columnValues.put("check_time", checkInfo.getCreateTime());
        String rowKey ;
        if (result != null) {
            //读出key,插入数据
            Cell cell = result.get(0);
            rowKey = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
            logger.info("scan the key from hbase is :"+rowKey);
        } else {
            //插入新记录
            rowKey = checkInfo.getProductId() + "+" + remainingTime() + "+" + checkInfo.getDeltaId() + "+" + checkInfo.getMid();
            logger.info(" the new rowKey is :" + rowKey);
            columnValues.put("status", BaseConfig.STATUS_CHECK_SUCCESS);
        }
        ihBaseService.putRowValueBatch(tableName, rowKey, familyColumn, columnValues);
    }
}

代码示例,我放到github上了demo地址
https://github.com/baifanwudi/kafka-hbase

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇spark 连接hbase 下一篇HBASE 安装后 hbase shell 启动失..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目