设为首页 加入收藏

TOP

c语言使用librdkafka库实现kafka生产和消费的实例(一)
2018-07-24 06:05:15 】 浏览:1492
Tags:语言 使用 librdkafka 实现 kafka 生产 消费 实例

c语言使用librdkafka库实现kafka生产和消费的实例

关于librdkafka库的介绍,可以参考kafka的c/c++高性能客户端librdkafka简介,本文使用librdkafka库来进行kafka的简单的生产、消费

一、producer

librdkafka进行kafka生产操作的大致步骤如下:

1、创建kafka配置

rd_kafka_conf_t *rd_kafka_conf_new (void)

2、配置kafka各项参数

rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf,

const char *name,

const char *value,

char *errstr, size_t errstr_size)

3、设置发送回调函数

void rd_kafka_conf_set_dr_msg_cb (rd_kafka_conf_t *conf,

void (*dr_msg_cb) (rd_kafka_t *rk,

const rd_kafka_message_t *

rkmessage,

void *opaque))

4、创建producer实例

rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *conf,char *errstr, size_t errstr_size)

5、实例化topic

rd_kafka_topic_t *rd_kafka_topic_new (rd_kafka_t *rk, const char *topic, rd_kafka_topic_conf_t *conf)

6、异步调用将消息发送到指定的topic

int rd_kafka_produce (rd_kafka_topic_t *rkt, int32_t partition,

int msgflags,

void *payload, size_t len,

const void *key, size_t keylen,

void *msg_opaque)

7、阻塞等待消息发送完成

int rd_kafka_poll (rd_kafka_t *rk, int timeout_ms)

8、等待完成producer请求完成

rd_kafka_resp_err_t rd_kafka_flush (rd_kafka_t *rk, int timeout_ms)

9、销毁topic

void rd_kafka_topic_destroy (rd_kafka_topic_t *app_rkt)

10、销毁producer实例

void rd_kafka_destroy (rd_kafka_t *rk)

完整代码如下my_producer.c:

#include

#include

#include

#include "../src/rdkafka.h"

static int run = 1;

static void stop(int sig){

run = 0;

fclose(stdin);

}

/*

每条消息调用一次该回调函数,说明消息是传递成功(rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR)

还是传递失败(rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR)

该回调函数由rd_kafka_poll()触发,在应用程序的线程上执行

*/

static void dr_msg_cb(rd_kafka_t *rk,

const rd_kafka_message_t *rkmessage, void *opaque){

if(rkmessage->err)

fprintf(stderr, "%% Message delivery failed: %s\n",

rd_kafka_err2str(rkmessage->err));

else

fprintf(stderr,

"%% Message delivered (%zd bytes, "

"partition %"PRId32")\n",

rkmessage->len, rkmessage->partition);

/* rkmessage被librdkafka自动销毁*/

}

int main(int argc, char **argv){

rd_kafka_t *rk; /*Producer instance handle*/

rd_kafka_topic_t *rkt; /*topic对象*/

rd_kafka_conf_t *conf; /*临时配置对象*/

char errstr[512];

char buf[512];

const char *brokers;

const char *topic;

if(argc != 3){

fprintf(stderr, "%% Usage: %s \n", argv[0]);

return 1;

}

brokers = argv[1];

topic = argv[2];

/* 创建一个kafka配置占位 */

conf = rd_kafka_conf_new();

/*创建broker集群*/

if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr,

sizeof(errstr)) != RD_KAFKA_CONF_OK){

fprintf(stderr, "%s\n", errstr);

return 1;

}

/*设置发送报告回调函数,rd_kafka_produce()接收的每条消息都会调用一次该回调函数

*应用程序需要定期调用rd_kafka_poll()来服务排队的发送报告回调函数*/

rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);

/*创建producer实例

rd_kafka_new()获取conf对象的所有权,应用程序在此调用之后不得再次引用它*/

rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));

if(!rk){

fprintf(stderr, "%% Failed to create new producer:%s\n", errstr);

return 1;

}

/*实例化一个或多个topics(`rd_kafka_topic_t`)来提供生产或消费,topic

对象保存topic特定的配置,并在内部填充所有可用分区和leader brokers,*/

rkt = rd_kafka_topic_new(rk, topic, NULL);

if (!rkt){

fprintf(stderr, "%% Failed to create topic object: %s\n",

rd_kafka_err2str(rd_kafka_last_error()));

rd_kafka_destroy(rk);

ret

首页 上一页 1 2 3 4 下一页 尾页 1/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇C语言:写一个宏可以将一个数字的.. 下一篇C语言基础学习之结构体与指针实例

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目