//创建一个Topic+Partition的存储空间(list/vector)
topics = rd_kafka_topic_partition_list_new(1);
//把Topic+Partition加入list
rd_kafka_topic_partition_list_add(topics, topic, -1);
//开启consumer订阅,匹配的topic将被添加到订阅列表中
if((err = rd_kafka_subscribe(rk, topics))){
fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));
return -1;
}
return 1;
}
int main(int argc, char **argv){
char *brokers = "localhost:9092";
char *group = NULL;
char *topic = NULL;
int opt;
rd_kafka_resp_err_t err;
while ((opt = getopt(argc, argv, "g:b:t:qd:eX:As:DO")) != -1){
switch (opt) {
case 'b':
brokers = optarg;
break;
case 'g':
group = optarg;
break;
case 't':
topic = optarg;
break;
default:
break;
}
}
signal(SIGINT, stop);
signal(SIGUSR1, sig_usr1);
if(!initKafka(brokers, group, topic)){
fprintf(stderr, "kafka server initialize error\n");
}else{
while(run){
rd_kafka_message_t *rkmessage;
/*-轮询消费者的消息或事件,最多阻塞timeout_ms
-应用程序应该定期调用consumer_poll(),即使没有预期的消息,以服务
所有排队等待的回调函数,当注册过rebalance_cb,该操作尤为重要,
因为它需要被正确地调用和处理以同步内部消费者状态 */
rkmessage = rd_kafka_consumer_poll(rk, 1000);
if(rkmessage){
msg_consume(rkmessage, NULL);
/*释放rkmessage的资源,并把所有权还给rdkafka*/
rd_kafka_message_destroy(rkmessage);
}
}
}
done:
/*此调用将会阻塞,直到consumer撤销其分配,调用rebalance_cb(如果已设置),
commit offset到broker,并离开consumer group
最大阻塞时间被设置为session.timeout.ms
*/
err = rd_kafka_consumer_close(rk);
if(err){
fprintf(stderr, "%% Failed to close consumer: %s\n", rd_kafka_err2str(err));
}else{
fprintf(stderr, "%% Consumer closed\n");
}
//释放topics list使用的所有资源和它自己
rd_kafka_topic_partition_list_destroy(topics);
//destroy kafka handle
rd_kafka_destroy(rk);
run = 5;
//等待所有rd_kafka_t对象销毁,所有kafka对象被销毁,返回0,超时返回-1
while(run-- > 0 && rd_kafka_wait_destroyed(1000) == -1){
printf("Waiting for librdkafka to decommission\n");
}
if(run <= 0){
//dump rdkafka内部状态到stdout流
rd_kafka_dump(stdout, rk);
}
return 0;
}
在linux下编译producer和consumer的代码:
gcc my_producer.c -o my_producer -lrdkafka -lz -lpthread -lrt
gcc my_consumer.c -o my_consumer -lrdkafka -lz -lpthread -lrt
在运行my_producer或my_consumer时可能会报错"error while loading shared libraries xxx.so", 此时需要在/etc/ld.so.conf中加入xxx.so所在的目录
在本地启动一个简单的kafka服务,设置broker集群为localhost:9092并创建一个叫“test_topic”的topic
启动方式可参考kafka0.8.2集群的环境搭建并实现基本的生产消费
启动consumer:
启动producer,并发送一条数据“hello world”:
consumer处成功收到producer发送的“hello world”: