设为首页 加入收藏

TOP

c语言使用librdkafka库实现kafka生产和消费的实例(二)
2018-07-24 06:05:15 】 浏览:1165
Tags:语言 使用 librdkafka 实现 kafka 生产 消费 实例
urn 1;

}

/*用于中断的信号*/

signal(SIGINT, stop);

fprintf(stderr,

"%% Type some text and hit enter to produce message\n"

"%% Or just hit enter to only serve delivery reports\n"

"%% Press Ctrl-C or Ctrl-D to exit\n");

while(run && fgets(buf, sizeof(buf), stdin)){

size_t len = strlen(buf);

if(buf[len-1] == '\n')

buf[--len] = '\0';

if(len == 0){

/*轮询用于事件的kafka handle,

事件将导致应用程序提供的回调函数被调用

第二个参数是最大阻塞时间,如果设为0,将会是非阻塞的调用*/

rd_kafka_poll(rk, 0);

continue;

}

retry:

/*Send/Produce message.

这是一个异步调用,在成功的情况下,只会将消息排入内部producer队列,

对broker的实际传递尝试由后台线程处理,之前注册的传递回调函数(dr_msg_cb)

用于在消息传递成功或失败时向应用程序发回信号*/

if (rd_kafka_produce(

/* Topic object */

rkt,

/*使用内置的分区来选择分区*/

RD_KAFKA_PARTITION_UA,

/*生成payload的副本*/

RD_KAFKA_MSG_F_COPY,

/*消息体和长度*/

buf, len,

/*可选键及其长度*/

NULL, 0,

NULL) == -1){

fprintf(stderr,

"%% Failed to produce to topic %s: %s\n",

rd_kafka_topic_name(rkt),

rd_kafka_err2str(rd_kafka_last_error()));

if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL){

/*如果内部队列满,等待消息传输完成并retry,

内部队列表示要发送的消息和已发送或失败的消息,

内部队列受限于queue.buffering.max.messages配置项*/

rd_kafka_poll(rk, 1000);

goto retry;

}

}else{

fprintf(stderr, "%% Enqueued message (%zd bytes) for topic %s\n",

len, rd_kafka_topic_name(rkt));

}

/*producer应用程序应不断地通过以频繁的间隔调用rd_kafka_poll()来为



传送报告队列提供服务。在没有生成消息以确定先前生成的消息已发送了其

发送报告回调函数(和其他注册过的回调函数)期间,要确保rd_kafka_poll()

仍然被调用*/

rd_kafka_poll(rk, 0);

}

fprintf(stderr, "%% Flushing final message.. \n");

/*rd_kafka_flush是rd_kafka_poll()的抽象化,

等待所有未完成的produce请求完成,通常在销毁producer实例前完成

以确保所有排列中和正在传输的produce请求在销毁前完成*/

rd_kafka_flush(rk, 10*1000);

/* Destroy topic object */

rd_kafka_topic_destroy(rkt);

/* Destroy the producer instance */

rd_kafka_destroy(rk);

return 0;

}

二、consumer

librdkafka进行kafka消费操作的大致步骤如下:

1、创建kafka配置

rd_kafka_conf_t *rd_kafka_conf_new (void)

2、创建kafka topic的配置

rd_kafka_topic_conf_t *rd_kafka_topic_conf_new (void) 

3、配置kafka各项参数

rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf,

const char *name,

const char *value,

char *errstr, size_t errstr_size)

4、配置kafka topic各项参数

rd_kafka_conf_res_t rd_kafka_topic_conf_set (rd_kafka_topic_conf_t *conf,

const char *name,

const char *value,

char *errstr, size_t errstr_size)

5、创建consumer实例

rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *conf, char *errstr, size_t errstr_size)

6、为consumer实例添加brokerlist

int rd_kafka_brokers_add (rd_kafka_t *rk, const char *brokerlist)

7、开启consumer订阅

rd_kafka_subscribe (rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *topics)

8、轮询消息或事件,并调用回调函数

rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk,int timeout_ms)

9、关闭consumer实例

rd_kafka_resp_err_t rd_kafka_consumer_close (rd_kafka_t *rk)

10、释放topic list资源

rd_kafka_topic_partition_list_destroy (rd_kafka_topic_partition_list_t *rktparlist)

11、销毁consumer实例

void rd_kafka_destroy (rd_kafka_t *rk) 

12、等待consumer对象的销毁

int rd_kafka_wait_destroyed (int timeout_ms)

完整代码如下my_consumer.c

#include

#include

#include

#include

#include

#include

#include "../src/rdkafka.h"

static int run = 1;

//`rd_kafka_t`自带一个可选的配置API,如果没有调用API,Librdkafka将会使用CONFIGURATION.md中的默认配置。

static rd_kafka_
编程开发网

首页 上一页 1 2 3 4 下一页 尾页 2/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇C语言:写一个宏可以将一个数字的.. 下一篇C语言基础学习之结构体与指针实例

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(217) }