t *rk;
static rd_kafka_topic_partition_list_t *topics;
static void stop (int sig) {
if (!run)
exit(1);
run = 0;
fclose(stdin); /* abort fgets() */
}
static void sig_usr1 (int sig) {
rd_kafka_dump(stdout, rk);
}
/**
* 处理并打印已消费的消息
*/
static void msg_consume (rd_kafka_message_t *rkmessage,
void *opaque) {
if (rkmessage->err) {
if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
fprintf(stderr,
"%% Consumer reached end of %s [%"PRId32"] "
"message queue at offset %"PRId64"\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition, rkmessage->offset);
return;
}
if (rkmessage->rkt)
fprintf(stderr, "%% Consume error for "
"topic \"%s\" [%"PRId32"] "
"offset %"PRId64": %s\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition,
rkmessage->offset,
rd_kafka_message_errstr(rkmessage));
else
fprintf(stderr, "%% Consumer error: %s: %s\n",
rd_kafka_err2str(rkmessage->err),
rd_kafka_message_errstr(rkmessage));
if (rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION ||
rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
run = 0;
return;
}
fprintf(stdout, "%% Message (topic %s [%"PRId32"], "
"offset %"PRId64", %zd bytes):\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition,
rkmessage->offset, rkmessage->len);
if (rkmessage->key_len) {
printf("Key: %.*s\n",
(int)rkmessage->key_len, (char *)rkmessage->key);
}
printf("%.*s\n",
(int)rkmessage->len, (char *)rkmessage->payload);
}
/*
init all configuration of kafka
*/
int initKafka(char *brokers, char *group,char *topic){
rd_kafka_conf_t *conf;
rd_kafka_topic_conf_t *topic_conf;
rd_kafka_resp_err_t err;
char tmp[16];
char errstr[512];
/* Kafka configuration */
conf = rd_kafka_conf_new();
//quick termination
snprintf(tmp, sizeof(tmp), "%i", SIGIO);
rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);
//topic configuration
topic_conf = rd_kafka_topic_conf_new();
/* Consumer groups require a group id */
if (!group)
group = "rdkafka_consumer_example";
if (rd_kafka_conf_set(conf, "group.id", group,
errstr, sizeof(errstr)) !=
RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% %s\n", errstr);
return -1;
}
/* Consumer groups always use broker based offset storage */
if (rd_kafka_topic_conf_set(topic_conf, "offset.store.method",
"broker",
errstr, sizeof(errstr)) !=
RD_KAFKA_CONF_OK) {
fprintf(stderr, "%% %s\n", errstr);
return -1;
}
/* Set default topic config for pattern-matched topics. */
rd_kafka_conf_set_default_topic_conf(conf, topic_conf);
//实例化一个顶级对象rd_kafka_t作为基础容器,提供全局配置和共享状态
rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
if(!rk){
fprintf(stderr, "%% Failed to create new consumer:%s\n", errstr);
return -1;
}
//Librdkafka需要至少一个brokers的初始化list
if (rd_kafka_brokers_add(rk, brokers) == 0){
fprintf(stderr, "%% No valid brokers specified\n");
return -1;
}
//重定向 rd_kafka_poll()队列到consumer_poll()队列
rd_kafka_poll_set_consumer(rk);