设为首页 加入收藏

TOP

c语言使用librdkafka库实现kafka生产和消费的实例(四)
2018-07-24 06:05:15 】 浏览:1493
Tags:语言 使用 librdkafka 实现 kafka 生产 消费 实例

//创建一个Topic+Partition的存储空间(list/vector)

topics = rd_kafka_topic_partition_list_new(1);

//把Topic+Partition加入list

rd_kafka_topic_partition_list_add(topics, topic, -1);

//开启consumer订阅,匹配的topic将被添加到订阅列表中

if((err = rd_kafka_subscribe(rk, topics))){

fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));

return -1;

}

return 1;

}

int main(int argc, char **argv){

char *brokers = "localhost:9092";

char *group = NULL;

char *topic = NULL;

int opt;

rd_kafka_resp_err_t err;

while ((opt = getopt(argc, argv, "g:b:t:qd:eX:As:DO")) != -1){

switch (opt) {

case 'b':

brokers = optarg;

break;

case 'g':

group = optarg;

break;

case 't':

topic = optarg;

break;

default:

break;

}

}

signal(SIGINT, stop);

signal(SIGUSR1, sig_usr1);

if(!initKafka(brokers, group, topic)){

fprintf(stderr, "kafka server initialize error\n");

}else{

while(run){

rd_kafka_message_t *rkmessage;

/*-轮询消费者的消息或事件,最多阻塞timeout_ms

-应用程序应该定期调用consumer_poll(),即使没有预期的消息,以服务

所有排队等待的回调函数,当注册过rebalance_cb,该操作尤为重要,

因为它需要被正确地调用和处理以同步内部消费者状态 */

rkmessage = rd_kafka_consumer_poll(rk, 1000);

if(rkmessage){

msg_consume(rkmessage, NULL);

/*释放rkmessage的资源,并把所有权还给rdkafka*/

rd_kafka_message_destroy(rkmessage);

}

}

}

done:

/*此调用将会阻塞,直到consumer撤销其分配,调用rebalance_cb(如果已设置),

commit offset到broker,并离开consumer group

最大阻塞时间被设置为session.timeout.ms

*/

err = rd_kafka_consumer_close(rk);

if(err){

fprintf(stderr, "%% Failed to close consumer: %s\n", rd_kafka_err2str(err));

}else{

fprintf(stderr, "%% Consumer closed\n");

}

//释放topics list使用的所有资源和它自己

rd_kafka_topic_partition_list_destroy(topics);

//destroy kafka handle

rd_kafka_destroy(rk);

run = 5;

//等待所有rd_kafka_t对象销毁,所有kafka对象被销毁,返回0,超时返回-1

while(run-- > 0 && rd_kafka_wait_destroyed(1000) == -1){

printf("Waiting for librdkafka to decommission\n");

}

if(run <= 0){

//dump rdkafka内部状态到stdout流

rd_kafka_dump(stdout, rk);

}

return 0;

}

在linux下编译producer和consumer的代码:

gcc my_producer.c -o my_producer -lrdkafka -lz -lpthread -lrt

gcc my_consumer.c -o my_consumer -lrdkafka -lz -lpthread -lrt

在运行my_producer或my_consumer时可能会报错"error while loading shared libraries xxx.so", 此时需要在/etc/ld.so.conf中加入xxx.so所在的目录

在本地启动一个简单的kafka服务,设置broker集群为localhost:9092并创建一个叫“test_topic”的topic

启动方式可参考kafka0.8.2集群的环境搭建并实现基本的生产消费

启动consumer:

\

启动producer,并发送一条数据“hello world”:

\

consumer处成功收到producer发送的“hello world”:

\

首页 上一页 1 2 3 4 下一页 尾页 4/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇C语言:写一个宏可以将一个数字的.. 下一篇C语言基础学习之结构体与指针实例

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目