opt = getopt(argc, argv, "g:b:t:qd:eX:As:DO")) != -1){ switch (opt) { case 'b': brokers = optarg; break; case 'g': group = optarg; break; case 't': topics = optarg; break; default: break; } } /*for (; optind < argc ; optind++) topics.push_back(std::string(argv[optind]));*/ signal(SIGINT, sigterm); signal(SIGTERM, sigterm); std::shared_ptr
kafka_consumer_client_ = std::make_shared
(brokers, topics, group, 0); //std::shared_ptr
kafka_consumer_client_ = std::make_shared
(); if (!kafka_consumer_client_->initClient()){ fprintf(stderr, "kafka server initialize error\n"); }else{ printf("start kafka consumer\n"); kafka_consumer_client_->consume(1000); } fprintf(stderr, "kafka consume exit! \n"); return 0; }
编译:
g++ my_consumer.cpp -o my_consumer_cpp -std=c++11 -lrdkafka++ -lz -lpthread -lrt
在运行my_producer或my_consumer时可能会报错"error while loading shared librariesxxx.so", 此时需要在/etc/ld.so.conf中加入xxx.so所在的目录
在本地启动一个简单的kafka服务,设置broker集群为localhost:9092并创建一个叫“test”的topic
启动consumer:
开启kafka 自带的producer,并发送消息“hello world”
consumer处收到的消息: