设为首页 加入收藏

TOP

kafka java 生产消费程序示例
2019-05-02 02:31:40 】 浏览:51
Tags:kafka java 生产 消费 程序 示例
自已测试通过的
package zs.souyue2.api.res;


import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;

public class KafkaConsumer {

private final ConsumerConnector consumer;

private KafkaConsumer() {
Properties props = new Properties();
//zookeeper 配置
//props.put("zookeeper.connect", "103.29.134.193:2181");
props.put("zookeeper.connect", "103.7.221.141:2181");
//group 代表一个消费组
props.put("group.id", "jd-group");

//zk连接超时
props.put("zookeeper.session.timeout.ms", "4000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "smallest");
//序列化类
props.put("serializer.class", "kafka.serializer.StringEncoder");

ConsumerConfig config = new ConsumerConfig(props);

consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
}

void consume() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("srp_word", new Integer(1));

StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

Map<String, List<KafkaStream<String, String>>> consumerMap =
consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
KafkaStream<String, String> stream = consumerMap.get("srp_word").get(0);
ConsumerIterator<String, String> it = stream.iterator();
//进行入库操作
while (it.hasNext()) {

System.out.println("=====标示:" + it.next().message());
}
}

public static void main(String[] args) {
new KafkaConsumer().consume();
}
}


上面上单线程的处理端,但是在实际的应用中,只有多线程的处理才能够提高性能。
 void consume() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
//传入参数,为分区数量,多线程取多分区
topicCountMap.put("srp_word", paritonsNum);

StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

Map<String, List<KafkaStream<String, String>>> consumerMap =
consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
List<KafkaStream<String, String>> streamList = consumerMap.get("srp_word");

//启动多个线程来处理list
ExecutorService threadpool = Executors.newFixedThreadPool(paritonsNum);
for(KafkaStream<String, String> partition : streamList){
threadPool.execute(new MessageRunner(partition));
}
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇关于 springboot 集成 kafka 单机.. 下一篇LinkedIn是如何优化Kafka的

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目