设为首页 加入收藏

TOP

Kafka的Producer和Consumer的示例(使用Java语言)
2014-11-23 20:01:25 来源: 作者: 【 】 浏览:30
Tags:Kafka Producer Consumer 示例 使用 Java 语言

我使用的kafka版本是:0.7.2


jdk版本是:1.6.0_20


http://kafka.apache.org/07/quickstart.html官方给的示例并不是很完整,以下代码是经过我补充的并且编译后能运行的。


Producer Code


import java.util.*;
import kafka.message.Message;
import kafka.producer.ProducerConfig;
import kafka.javaapi.producer.Producer;
import kafka.javaapi.producer.ProducerData;


public class ProducerSample {



public static void main(String[] args) {
ProducerSample ps = new ProducerSample();


Properties props = new Properties();
props.put("zk.connect", "127.0.0.1:2181");
props.put("serializer.class", "kafka.serializer.StringEncoder");


ProducerConfig config = new ProducerConfig(props);
Producer producer = new Producer(config);
ProducerData data = new ProducerData("test-topic", "test-message2");
producer.send(data);
producer.close();
}
}


Consumer Code


import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;


public class ConsumerSample {


public static void main(String[] args) {
// specify some consumer properties
Properties props = new Properties();
props.put("zk.connect", "localhost:2181");
props.put("zk.connectiontimeout.ms", "1000000");
props.put("groupid", "test_group");


// Create the connection to the cluster
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);


// create 4 partitions of the stream for topic “test-topic”, to allow 4 threads to consume
HashMap map = new HashMap();
map.put("test-topic", 4);
Map>> topicMessageStreams =
consumerConnector.createMessageStreams(map);
List> streams = topicMessageStreams.get("test-topic");


// create list of 4 threads to consume from each of the partitions
ExecutorService executor = Executors.newFixedThreadPool(4);


// consume the messages in the threads
for (final KafkaStream stream : streams) {
executor.submit(new Runnable() {
public void run() {
for (MessageAndMetadata msgAndMetadata : stream) {
// process message (msgAndMetadata.message())
System.out.println("topic: " + msgAndMetadata.topic());
Message message = (Message) msgAndMetadata.message();
ByteBuffer buffer = message.payload();
byte[] bytes = new byte[message.payloadSize()];
buffer.get(bytes);
String tmp = new String(bytes);
System.out.println("message content: " + tmp);
}
}
});
}


}
}


分别启动zookeeper,kafka server之后,依次运行Producer,Consumer的代码


运行ProducerSample:



运行ConsumerSample:



】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
分享到: 
上一篇Java 创建用户异常类、将异常一直.. 下一篇Linux Shell 中的判断结构

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容: