设为首页 加入收藏

TOP

Kafka 接受数据并消费到hbase数据库
2019-04-08 01:42:28 】 浏览:120
Tags:Kafka 接受 数据 消费 hbase 数据库

一、

1、生产者 产生数据

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

package kafakaTohbase;

import java.util.Properties;

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

public class KafkaProducer {

public static void main(String[] args) {

Properties props = new Properties();

props.put("zk.connect", KafkaProperties.zkConnect);

props.put("serializer.class", "kafka.serializer.StringEncoder");

props.put("metadata.broker.list", "hdjt01:9092,hdjt02:9092,hdjt03:9092");

ProducerConfig config = new ProducerConfig(props);

Producer<String, String> producer = new Producer<String, String>(config);

for (int i = 0; i < 10; i++){

producer.send(new KeyedMessage<String, String>("test5", "liu" + i));

}

}

}

  注:props.put("serializer.class", "kafka.serializer.StringEncoder") 发送的数据是String,

还可以是 二进制数组形式:

props.put("serializer.class", "kafka.serializer.DefaultEncoder");
props.put("key.serializer.class", "kafka.serializer.StringEncoder"); 如果没有这个,就代表 key也是二进制形式。

生产者发送的都是keyvalue对

2、消费者

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

package kafakaTohbase;

import java.io.IOException;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

public class KafkaConsumer extends Thread{

private final ConsumerConnector consumer;

private final String topic;

public KafkaConsumer(String topic) {

consumer = kafka.consumer.Consumer

.createJavaConsumerConnector(createConsumerConfig());

this.topic = topic;

}

private static ConsumerConfig createConsumerConfig() {

Properties props = new Properties();

props.put("zookeeper.connect", KafkaProperties.zkConnect);

props.put("group.id", KafkaProperties.groupId1);

props.put("zookeeper.session.timeout.ms", "40000"); //zookeeper 与 region server 的链接超时时间

props.put("zookeeper.sync.time.ms", "200");

props.put("auto.commit.interval.ms", "1000"); <br> //props.put("auto.offset.reset", "smallest");//可以读取旧数据,默认不读取

return new ConsumerConfig(props);

}

@Override

public void run() {

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

topicCountMap.put(topic, new Integer(1));

Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer

.createMessageStreams(topicCountMap);

KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);

ConsumerIterator<byte[], byte[]> it = stream.iterator();

HBaseUtils hbase = new HBaseUtils();

while (it.hasNext()) { //相当于加了一把锁,一直返回true

// System.out.println("3receive:" + it.next().message());

try {

System.out.println("11111");

hbase.put(new String(it.next().message()));

} catch (IOException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

// try {

// sleep(300); // 每条消息延迟300ms

// } catch (InterruptedException e) {

// e.printStackTrace();

// }

}

}

}

  连接hbase,配置信息

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

package kafakaTohbase;

import java.io.IOException;

import java.util.Random;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.HTable;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.util.Bytes;

public class HBaseUtils {

public void put(String string) throws IOException {

//设置HBase据库的连接配置参数

Configuration conf = HBaseConfiguration.create();

conf.set("hbase.zookeeper.quorum", "hdjt01:2181,hdjt02:2181,hdjt03:2181"); // Zookeeper的地址

// conf.set("hbase.zookeeper.property.clientPort", "42182");

Random random = new Random();

long a = random.nextInt(1000000000);

String tableName = "emp";

String rowkey = "rowkey"+a ;

String columnFamily = "basicinfo";

String column = "empname";

//String value = string;

HTable table=new HTable(conf, tableName);

Put put=new Put(Bytes.toBytes(rowkey));

put.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(string));

table.put(put);//放入表

System.out.println("放入成功");

table.close();//释放资源

}

}

  测试消费者:

1

2

3

4

5

6

7

8

9

10

11

public class Kafkaceshi {

public static void main(String[] args) {

// KafkaProducer a=new KafkaProducer ();

// a.producer();

KafkaConsumer consumerThread = new KafkaConsumer(KafkaProperties.topic);

consumerThread.run();

}

}

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇[转]HBase核心概念(LSM树、底层.. 下一篇5 hbase-shell +   hbase..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目