说明
kafka版本:kafka_2.10-0.8.2.1(kafka0.9.xx版本提供了新的API)
IED环境:intellij14 + maven3.3
语言:java
consumer high API开发
高级别consumer特性
- 我们不需要自己管理消息的offset
- 高级别consumer会将从特定的partition读到的最后的offset保存至zookeeper, 且存储的offset是基于consumer提供的consumer group的名字
- 一个consumer group中的consumer不能重复消费,不同的consumer group之间可以重复消费
- consumer group名在整个kafka集群是全部可见的,因此在启动以同一个consumer group的程序时其他老的程序会被关闭。当以相同的consumer group的逻辑消费者启动后,kafka会将线程加入可消费该topic的线程集合并且触发re-balance。此时,kafka会将可用的partition分配给可用的线程,因此如果混合了新的和老的消费逻辑,可能会出现一些消息发给了老的消费逻辑。
- 高级别consumer是按topic的partition来进行消费的。如果消费程序线程数大于partition个数,一些线程将永远收不到消息;如果消费程序线程数小于partition个数,一些线程会收到多个partition的消息,并且这些消息从哪个partition消费可能是不确定的。
高级别API需要的配置
配置项 |
说明 |
zookeeper.connect |
zookeeper地址 |
group.id |
consumer group name |
zookeeper.session.timeout.ms |
连接zookeeper的session超时时间 |
zookeeper.sync.time.ms |
zookeeper同步时间 |
auto.commit.interval.ms |
consumer自动同步offset的间隔时间 |
程序示例
- maven
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
</dependency>
- 程序包组织
- 配置文件consumer.properties
#kafka zookeeper config
zookeeper.connect=xxxx:2181,xxxx:2181,xxxx:2181
group.id=consumer-group-name
zookeeper.session.timeout.ms=4000
zookeeper.sync.time.ms=200
auto.commit.interval.ms=1000
auto.offset.reset=largest
#kafka consumer config
topic=myTopic
threadNum=10
- KafkaConfig.java
package kafka.high;
public class KafkaConfig {
public String topic = null;
public int threadNum = 0;
}
- KafkaMsgTask.java
package kafka.high;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.Arrays;
/**
* 处理kafka消息的线程
*/
public class KafkaMsgTask implements Runnable {
private static final Logger logger = LogManager.getLogger(KafkaMsgTask.class);
private KafkaStream stream = null;
private int threadNum = 0;
public KafkaMsgTask(KafkaStream stream, int num) {
this.stream = stream;
this.threadNum = num;
}
public void run() {
ConsumerIterator<byte[], byte[]> iter = stream.iterator();
while (iter.hasNext()) {
String message = Arrays.toString(iter.next().message());
}
logger.info("shutting down thread :" + threadNum);
}
}
- KafkaHighConsumer.java
package kafka.high;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.FileInputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* kafka消费者
*/
public class KafkaHighConsumer {
private static final Logger logger = LogManager.getLogger(KafkaHighConsumer.class);
protected ConsumerConnector consumer = null;
protected ConsumerConfig consumerConfig = null;
protected ExecutorService executor = null;
protected KafkaConfig kafkaConfig = null;
public KafkaHighConsumer() { }
/**
* 初始化,加载consumer配置,topic,线程个数
* @param confFile
* @throws Exception
*/
public void init(String confFile) throws Exception {
Properties props = new Properties();
FileInputStream in = new FileInputStream(confFile);
props.load(in);
kafkaConfig = new KafkaConfig();
kafkaConfig.topic = props.getProperty("topic");
kafkaConfig.threadNum = Integer.parseInt(props.getProperty("threadNum"));
consumerConfig = new ConsumerConfig(props);
consumer = Consumer.createJavaConsumerConnector(consumerConfig);
logger.info("KafkaHighConsumer init success");
}
public void run() {
Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(this.kafkaConfig.topic, kafkaConfig.threadNum);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = this.consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(kafkaConfig.topic);
executor = Executors.newFixedThreadPool(kafkaConfig.threadNum);
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new KafkaMsgTask(stream, threadNumber));
logger.info("start kafka message task thread. threadnum :" + threadNumber);
threadNumber++;
}
}
public void shutdown() {
if (consumer != null) {
consumer.shutdown();
}
if (executor != null) {
executor.shutdown();
try {
if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
logger.warn("timed out waitting for consumer threads to shutdown");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
- Main.java
package kafka.high;
public class Main {
public static void main(String[] args) {
try {
KafkaHighConsumer consumer = new KafkaHighConsumer();
consumer.init("./consumer.properties");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
consumer.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
}
何时使用高级别API开发
当我们不太关心offset,不想自己控制offset的时候,一般对消息的处理要求不严格的时候,可以采用这种方式,简单高效。
更多参考:
1. http://kafka.apache.org/documentation.html#api
2. https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example