新建spring boot
添加maven依赖
<xml version="1.0" encoding="UTF-8">
<project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" >
<modelVersion > 4.0.0</modelVersion >
<groupId > com.adups</groupId >
<artifactId > kafka-hbase</artifactId >
<version > 1.0.0</version >
<packaging > jar</packaging >
<name > kafka-hbase</name >
<description > Demo project for Spring Boot</description >
<parent >
<groupId > org.springframework.boot</groupId >
<artifactId > spring-boot-starter-parent</artifactId >
<version > 1.5.8.RELEASE</version >
<relativePath />
</parent >
<properties >
<project.build.sourceEncoding > UTF-8</project.build.sourceEncoding >
<project.reporting.outputEncoding > UTF-8</project.reporting.outputEncoding >
<java .version > 1.8</java .version >
</properties >
<dependencies >
<dependency >
<groupId > org.springframework.boot</groupId >
<artifactId > spring-boot-starter-web</artifactId >
</dependency >
<dependency >
<groupId > org.springframework.kafka</groupId >
<artifactId > spring-kafka</artifactId >
</dependency >
<dependency >
<groupId > org.apache.hbase</groupId >
<artifactId > hbase-client</artifactId >
<version > 1.2.6</version >
</dependency >
<dependency >
<groupId > com.alibaba</groupId >
<artifactId > fastjson</artifactId >
<version > 1.2.11</version >
</dependency >
<dependency >
<groupId > org.springframework.boot</groupId >
<artifactId > spring-boot-starter-test</artifactId >
<scope > test</scope >
</dependency >
</dependencies >
<build >
<plugins >
<plugin >
<groupId > org.springframework.boot</groupId >
<artifactId > spring-boot-maven-plugin</artifactId >
</plugin >
</plugins >
</build >
</project >
日志logback.xml配置
<xml version="1.0" encoding="UTF-8">
<configuration scan ="true" scanPeriod ="60 seconds" >
<property name ="logPath" value ="/data/logs/kafka-hbase" />
<appender name ="file" class ="ch.qos.logback.core.rolling.RollingFileAppender" >
<append > true</append >
<encoder >
<pattern >
%-5level %date{yyyy-MM-dd HH:mm:ss} - %msg%n
</pattern >
<charset > UTF-8</charset >
</encoder >
<rollingPolicy class ="ch.qos.logback.core.rolling.TimeBasedRollingPolicy" >
<fileNamePattern > ${logPath}/kafka_hbase_%d{yyyyMMdd}.log</fileNamePattern >
</rollingPolicy >
<prudent > true</prudent >
</appender >
<appender name ="console" class ="ch.qos.logback.core.ConsoleAppender" >
<encoder >
<pattern >
%-5level %date{yyyy-MM-dd HH:mm:ss} - %msg%n
</pattern >
<charset > UTF-8</charset >
</encoder >
<filter class ="ch.qos.logback.classic.filter.ThresholdFilter" >
<level > @log.console.level@</level >
</filter >
</appender >
<logger name ="org.springframework" level ="WARN" additivity ="true" > </logger >
<logger name ="org.hibernate" level ="WARN" additivity ="true" > </logger >
<logger name ="ch.qos.logback" level ="WARN" additivity ="true" > </logger >
<logger name ="org.apache.kafka" level ="WARN" additivity ="true" > </logger >
<logger name ="org.apache.zookeeper" level ="WARN" additivity ="true" > </logger >
<logger name ="org.apache.hadoop" level ="ERROR" additivity ="true" > </logger >
<root level ="info" >
<appender-ref ref ="file" />
<appender-ref ref ="console" />
</root >
</configuration >
application.properties属性修改
server.port =18080
#hbase 配置
hbase.zookeeper .quorum =xxxx:2181 ,xxxx:2181 ,xxxx:2181
#kafka configuration
spring.kafka .bootstrap -servers=vm208:9092 ,vm211:9092 ,vm50:9092
spring.kafka .consumer .key -deserializer=org.apache .kafka .common .serialization .StringDeserializer
spring.kafka .consumer .value -deserializer=org.apache .kafka .common .serialization .StringDeserializer
spring.kafka .listener .concurrency =3
spring.kafka .consumer .max -poll-records=1000
spring.kafka .consumer .auto -offset-reset=latest
spring.kafka .consumer .group -id=Kafka-To-HBase
kafka属性spring-kafka自动读取, 需要配置hbase属性注入
package com.adups.hbase.config;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
@org .springframework.context.annotation.Configuration
@ConfigurationProperties (prefix = "hbase.zookeeper" )
public class HBaseConfigurationBase {
private Logger logger= LoggerFactory.getLogger(HBaseConfigurationBase.class);
private String quorum;
/**
* 产生HBaseConfiguration实例化Bean
* @return
*/
@Bean
public Configuration configuration () {
Configuration conf=HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum" ,quorum);
logger.info("quorum is :" +quorum);
return conf;
}
public String getQuorum () {
return quorum;
}
public void setQuorum (String quorum) {
this .quorum = quorum;
}
}
编写Hbase操作
package com .adups .hbase .service .impl
import com .adups .hbase .service .IHBaseService
import org.apache .hadoop .conf .Configuration
import org.apache .hadoop .hbase .Cell
import org.apache .hadoop .hbase .HColumnDescriptor
import org.apache .hadoop .hbase .HTableDescriptor
import org.apache .hadoop .hbase .TableName
import org.apache .hadoop .hbase .client .*
import org.apache .hadoop .hbase .filter .*
import org.apache .hadoop .hbase .util .Bytes
import org.slf 4j.Logger
import org.slf 4j.LoggerFactory
import org.springframework .beans .factory .annotation .Autowired
import org.springframework .stereotype .Service
import java.io .IOException
import java.util .HashMap
import java.util .List
import java.util .Map
@Service
public class HBaseServiceImpl implements IHBaseService {
private Logger logger = LoggerFactory.getLogger (this.getClass ())
@Autowired
private Configuration configuration
@Override
public void createTable(String tableName, String... families) {
HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf (tableName))
try (Connection connection = ConnectionFactory.createConnection (configuration)
Admin admin = connection.getAdmin ()) {
for (String family : families) {
tableDescriptor.addFamily (new HColumnDescriptor(family))
}
if (admin.tableExists (TableName.valueOf (tableName))) {
System.out .println ("Table Exists" )
logger.info ("Table:[" + tableName + "] Exists" )
} else {
admin.createTable (tableDescriptor)
System.out .println ("Create table Successfully!!!Table Name:[" + tableName + "]" )
logger.info ("Create table Successfully!!!Table Name:[" + tableName + "]" )
}
} catch (IOException e) {
e.printStackTrace ()
logger.error (e.getMessage ())
}
}
@Override
public void deleteTable(String tableName) {
try (Connection connection = ConnectionFactory.createConnection (configuration)
Admin admin = connection.getAdmin ()) {
TableName table = TableName.valueOf (tableName)
if (!admin.tableExists (TableName.valueOf (tableName))) {
logger.info ("[" + tableName + "] is not existed. Delete failed!" )
return
}
admin.disableTable (table)
admin.deleteTable (table)
System.out .println ("delete table " + tableName + " successfully!" )
logger.info ("delete table " + tableName + " successfully!" )
} catch (IOException e) {
e.printStackTrace ()
logger.error (e.getMessage ())
}
}
@Override
public void putRowValue(String tableName, String rowKey, String familyColumn, String columnName, String value) {
try (Connection connection = ConnectionFactory.createConnection (configuration)
Table table = connection.getTable (TableName.valueOf (tableName))) {
Put put = new Put(Bytes.toBytes (rowKey))
put.addColumn (Bytes.toBytes (familyColumn), Bytes.toBytes (columnName), Bytes.toBytes (value))
table.put (put)
logger.info ("update table:" + tableName + ",rowKey:" + rowKey + ",family:" + familyColumn + ",column:" + columnName + ",value:" + value + " successfully!" )
System.out .println ("Update table success" )
} catch (IOException e) {
e.printStackTrace ()
logger.error (e.getMessage ())
}
}
@Override
public void putRowValueBatch(String tableName, String rowKey, String familyColumn, List<String> columnNames, List<String> values) {
try (Connection connection = ConnectionFactory.createConnection (configuration)
Table table = connection.getTable (TableName.valueOf (tableName))) {
Put put = new Put(Bytes.toBytes (rowKey))
for (int j = 0
put.addColumn (Bytes.toBytes (familyColumn), Bytes.toBytes (columnNames.get (j)), Bytes.toBytes (values.get (j)))
}
table.put (put)
logger.info ("update table:" + tableName + ",rowKey:" + rowKey + ",family:" + familyColumn + ",columns:" + columnNames + ",values:" + values + " successfully!" )
System.out .println ("Update table success" )
} catch (IOException e) {
e.printStackTrace ()
logger.error (e.getMessage ())
}
}
@Override
public void putRowValueBatch(String tableName, String rowKey, String familyColumn, Map<String, String> columnValues) {
logger.info ("begin to update table:" + tableName + ",rowKey:" + rowKey + ",family:" + familyColumn + ",columnValues:" + columnValues.toString ())
try (Connection connection = ConnectionFactory.createConnection (configuration)
Table table = connection.getTable (TableName.valueOf (tableName))) {
Put put = new Put(Bytes.toBytes (rowKey))
for (Map.Entry <String, String> entry : columnValues.entrySet ()) {
put.addColumn (Bytes.toBytes (familyColumn), Bytes.toBytes (entry.getKey ()), Bytes.toBytes (entry.getValue ()))
}
table.put (put)
logger.info ("update table:" + tableName + ",rowKey:" + rowKey + " successfully!" )
System.out .println ("Update table success" )
} catch (IOException e) {
e.printStackTrace ()
logger.error (e.getMessage ())
}
}
@Override
public List<Cell> scanRegexRowKey(String tableName, String regexKey) {
try (Connection connection = ConnectionFactory.createConnection (configuration)
Table table = connection.getTable (TableName.valueOf (tableName))) {
Scan scan = new Scan()
Filter filter = new RowFilter(CompareFilter.CompareOp .EQUAL , new RegexStringComparator(regexKey))
scan.setFilter (filter)
ResultScanner rs = table.getScanner (scan)
for (Result r : rs) {
return r.listCells ()
}
} catch (IOException e) {
e.printStackTrace ()
logger.error (e.getMessage ())
}
return null
}
@Override
public void deleteAllColumn(String tableName, String rowKey) {
try (Connection connection = ConnectionFactory.createConnection (configuration)
Table table = connection.getTable (TableName.valueOf (tableName))) {
Delete delAllColumn = new Delete(Bytes.toBytes (rowKey))
table.delete (delAllColumn)
System.out .println ("Delete AllColumn Success" )
logger.info ("Delete rowKey:" + rowKey + "'s all Columns Successfully" )
} catch (IOException e) {
e.printStackTrace ()
logger.error (e.getMessage ())
}
}
@Override
public void deleteColumn(String tableName, String rowKey, String familyName, String columnName) {
try (Connection connection = ConnectionFactory.createConnection (configuration)
Table table = connection.getTable (TableName.valueOf (tableName))) {
Delete delColumn = new Delete(Bytes.toBytes (rowKey))
delColumn.addColumn (Bytes.toBytes (familyName), Bytes.toBytes (columnName))
table.delete (delColumn)
System.out .println ("Delete Column Success" )
logger.info ("Delete rowKey:" + rowKey + "'s Column:" + columnName + " Successfully" )
} catch (IOException e) {
e.printStackTrace ()
logger.error (e.getMessage ())
}
}
}
kafka操作hbase service示例
package com .adups .kafka .consumer .impl
import com .adups .hbase .config .BaseConfig
import com .adups .hbase .service .IHBaseService
import com .adups .kafka .bean .CheckInfo
import com .adups .kafka .consumer .AbstractConsumer
import com .alibaba .fastjson .JSON
import org.apache .hadoop .hbase .Cell
import org.apache .hadoop .hbase .util .Bytes
import org.apache .kafka .clients .consumer .ConsumerRecord
import org.slf 4j.Logger
import org.slf 4j.LoggerFactory
import org.springframework .beans .factory .annotation .Autowired
import org.springframework .kafka .annotation .KafkaListener
import org.springframework .stereotype .Component
import java.util .HashMap
import java.util .List
import java.util .Map
@Component
public class CheckConsumer extends AbstractConsumer<CheckInfo> {
private Logger logger = LoggerFactory.getLogger (this.getClass ())
@Autowired
IHBaseService ihBaseService
@Override
@KafkaListener(topics = {"ota_check" })
public void listen(ConsumerRecord<, > record) {
System.out .printf ("offset = %d,topic= %s,partition=%s,key =%s,value=%s\n" , record.offset (), record.topic (), record.partition (), record.key (), record.value ())
logger.info ("value is: " +record.value ())
CheckInfo checkInfo = JSON.parseObject (record.value ().toString (), CheckInfo.class )
updateOrInsert(checkInfo)
}
@Override
public void updateOrInsert(CheckInfo checkInfo) {
Map<String, String> columnValues = new HashMap<>(8 )
Long productId = checkInfo.getProductId ()
Integer deltaId = checkInfo.getDeltaId ()
String mid = checkInfo.getMid ()
String rowKeyRegex = rowKeyRegex(productId, deltaId, mid)
List<Cell> result = ihBaseService.scanRegexRowKey (tableName, rowKeyRegex)
columnValues.put ("check_time" , checkInfo.getCreateTime ())
String rowKey
if (result != null) {
//读出key,插入数据
Cell cell = result.get (0 )
rowKey = Bytes.toString (cell.getRowArray (), cell.getRowOffset (), cell.getRowLength ())
logger.info ("scan the key from hbase is :" +rowKey)
} else {
//插入新记录
rowKey = checkInfo.getProductId () + "+" + remainingTime() + "+" + checkInfo.getDeltaId () + "+" + checkInfo.getMid ()
logger.info (" the new rowKey is :" + rowKey)
columnValues.put ("status" , BaseConfig.STATUS _CHECK_SUCCESS)
}
ihBaseService.putRowValueBatch (tableName, rowKey, familyColumn, columnValues)
}
}
代码示例,我放到github上了demo地址
https://github.com/baifanwudi/kafka-hbase