设为首页 加入收藏

TOP

c语言使用librdkafka库实现kafka生产和消费的实例(三)
2018-07-24 06:05:15 】 浏览:1491
Tags:语言 使用 librdkafka 实现 kafka 生产 消费 实例
t *rk;

static rd_kafka_topic_partition_list_t *topics;

static void stop (int sig) {

if (!run)

exit(1);

run = 0;

fclose(stdin); /* abort fgets() */

}

static void sig_usr1 (int sig) {

rd_kafka_dump(stdout, rk);

}

/**

* 处理并打印已消费的消息

*/

static void msg_consume (rd_kafka_message_t *rkmessage,

void *opaque) {

if (rkmessage->err) {

if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {

fprintf(stderr,

"%% Consumer reached end of %s [%"PRId32"] "

"message queue at offset %"PRId64"\n",

rd_kafka_topic_name(rkmessage->rkt),

rkmessage->partition, rkmessage->offset);

return;

}

if (rkmessage->rkt)

fprintf(stderr, "%% Consume error for "

"topic \"%s\" [%"PRId32"] "

"offset %"PRId64": %s\n",

rd_kafka_topic_name(rkmessage->rkt),

rkmessage->partition,

rkmessage->offset,

rd_kafka_message_errstr(rkmessage));

else

fprintf(stderr, "%% Consumer error: %s: %s\n",

rd_kafka_err2str(rkmessage->err),

rd_kafka_message_errstr(rkmessage));

if (rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION ||

rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)

run = 0;

return;

}

fprintf(stdout, "%% Message (topic %s [%"PRId32"], "

"offset %"PRId64", %zd bytes):\n",

rd_kafka_topic_name(rkmessage->rkt),

rkmessage->partition,

rkmessage->offset, rkmessage->len);

if (rkmessage->key_len) {

printf("Key: %.*s\n",

(int)rkmessage->key_len, (char *)rkmessage->key);

}

printf("%.*s\n",

(int)rkmessage->len, (char *)rkmessage->payload);

}

/*

init all configuration of kafka

*/

int initKafka(char *brokers, char *group,char *topic){

rd_kafka_conf_t *conf;

rd_kafka_topic_conf_t *topic_conf;

rd_kafka_resp_err_t err;

char tmp[16];

char errstr[512];

/* Kafka configuration */

conf = rd_kafka_conf_new();

//quick termination

snprintf(tmp, sizeof(tmp), "%i", SIGIO);

rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);

//topic configuration

topic_conf = rd_kafka_topic_conf_new();

/* Consumer groups require a group id */

if (!group)

group = "rdkafka_consumer_example";

if (rd_kafka_conf_set(conf, "group.id", group,

errstr, sizeof(errstr)) !=

RD_KAFKA_CONF_OK) {

fprintf(stderr, "%% %s\n", errstr);

return -1;

}

/* Consumer groups always use broker based offset storage */

if (rd_kafka_topic_conf_set(topic_conf, "offset.store.method",

"broker",

errstr, sizeof(errstr)) !=

RD_KAFKA_CONF_OK) {

fprintf(stderr, "%% %s\n", errstr);

return -1;

}

/* Set default topic config for pattern-matched topics. */

rd_kafka_conf_set_default_topic_conf(conf, topic_conf);

//实例化一个顶级对象rd_kafka_t作为基础容器,提供全局配置和共享状态

rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));

if(!rk){

fprintf(stderr, "%% Failed to create new consumer:%s\n", errstr);

return -1;

}

//Librdkafka需要至少一个brokers的初始化list

if (rd_kafka_brokers_add(rk, brokers) == 0){

fprintf(stderr, "%% No valid brokers specified\n");

return -1;

}

//重定向 rd_kafka_poll()队列到consumer_poll()队列

rd_kafka_poll_set_consumer(rk);

首页 上一页 1 2 3 4 下一页 尾页 3/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇C语言:写一个宏可以将一个数字的.. 下一篇C语言基础学习之结构体与指针实例

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目