1.参考:
http://kafka.apache.org/081/documentation.html#consumerconfigs
http://kafka.apache.org/081/documentation.html#highlevelconsumerapi
http://kafka.apache.org/081/documentation.html#simpleconsumerapi
http://kafka.apache.org/081/documentation.html#apidesign
2.Consumer参数说明
参数名称
|
默认参数值
|
备注
|
group.id
|
|
Consumer的groupid值,如果多个Consumer的groupid的值一样,那么表示这多个Consumer属于同一个group组
|
zookeeper.connect
|
|
Kafka元数据Zookeeper存储的url,和配置文件中的参数一样
|
consumer.id
|
|
消费者id字符串,如果不给定的话,默认自动产生一个随机id
|
socket.timeout.ms
|
30000
|
Consumer连接超时时间,实际超时时间是socket.timeout.ms+max.fetch.wait
|
socket.receive.buffer.bytes
|
65536
|
接收数据的缓冲区大小,默认64kb
|
fetch.message.max.bytes
|
1048576
|
指定每个分区每次获取数据的最大字节数,一般该参数要求比message允许的最大字节数要大,否则可能出现producer产生的数据consumer没法消费
|
num.consumer.fetchers
|
1
|
Consumer获取数据的线程数量
|
auto.commit.enable
|
true
|
是否自动提交offset偏移量,默认为true(自动提交)
|
auto.commit.interval.ms
|
60000
|
自动提交offset偏移量的间隔时间
|
rebalance.max.retries
|
4
|
当一个新的Consumer添加到ConsumerGroup的时候,会触发数据消费的rebalance操作;rebalance操作可能会失败,该参数的主要作用是设置rebalance的最大重试次数
|
fetch.min.bytes
|
1
|
一个请求最少返回记录大小,当一个请求中的返回数据大小达到该参数的设置值后,记录数据返回到consumer中
|
fetch.wait.max.ms
|
100
|
一个请求等待数据返回的最大停留时间
|
rebalance.backoff.ms
|
2000
|
rebalance重试过程中的间隔时间
|
auto.offset.reset
|
largest
|
指定consumer消费kafka数据的时候offset初始值是啥,可选参数:largest和smallest;smallest指该consumer的消费offset是当前kafka数据中的最小偏移量;largest指该consumer的消费offset是当前kafka数据中的最大偏移量
|
consumer.timeout.ms
|
-1
|
给定当consumer多久时间没有消费数据后,抛出异常;-1表示不抛出异常
|
zookeeper.session.timeout.ms
|
6000
|
zk会话时间
|
zookeeper.connection.timeout.ms
|
6000
|
连接zk过期时间
|
3.Kafka提供了两种Consumer API
(1)High Level Consumer API:将底层具体获取数据、更新offset、设置偏移量等操作屏蔽掉,直接操作数据流的处理工作。优点是:操作简单;缺点:可操作性太差,无法按照自己的业务场景选择处理方式。(类:ConsumerConnector)
(2)Lower Level Consumer API:通过直接操作底层API获取数据的方式获取Kafka中的数据,需要自行给定分区、偏移量等属性。优点:可操作性强;缺点:代码相对而言比较复杂。(类:SimpleConsumer)
4.代码
package _0807ProducerSelf;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.serializer.Decoder;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class ConsumerDemo {
private ConsumerConnector connector = null;
private String topicName = null;
private int numThreads = 0;
public ConsumerDemo(String groupId, String zookeeperUrl, boolean largest, String topicName, int numThreads) {
this.topicName = topicName;
this.numThreads = numThreads;
// 1. 给定Consumer连接的相关参数
Properties props = new Properties();
// a. 给定group id
props.put("group.id", groupId);
// b. 给定zk连接url
props.put("zookeeper.connect", zookeeperUrl);
// c. 给定自动提交offset偏移量间隔时间修改为2s(默认60s)
props.put("auto.commit.interval.ms", "2000");
// d. 给定初始化consumer时候的offset值(该值只有在第一次consumer消费数据的时候有效 --> 只要zk中保存了该consumer的offset偏移量信息,那么该参数就无效了)
if (largest) {
props.put("auto.offset.reset", "largest");
} else {
props.put("auto.offset.reset", "smallest");
}
// 2. 创建Consumer上下文
ConsumerConfig config = new ConsumerConfig(props);
// 3. 创建Consumer连接器
this.connector = Consumer.createJavaConsumerConnector(config);
}
public void shutdown() {
if (this.connector != null) {
// 当调用shutdown后,KafkaStream所产生的ConsumerIterator迭代器就没有数据了
this.connector.shutdown();
}
}
public void run() {
// TODO: topicCountMap给定消费者消费的Topic名称以及消费该Topic使用多少个线程进行数据消费操作;一个消费者可以消费多个Topic的数据 ==> key为topic名称,value为该topic数据消费的线程数
final Map<String, Integer> topicCountMap = new HashMap<>();
topicCountMap.put(topicName, numThreads);
Decoder<String> keyDecoder = new StringDecoder(new VerifiableProperties());
Decoder<String> valueDecoder = new StringDecoder(new VerifiableProperties());
// 2. 根据参数创建数据读取流
// TODO: 该API返回的集合中的数据是一个以Topic名称为Key,以该Topic的读取数据流集合为Value的一个Map集合;
// TODO: List<KafkaStream<String, String>> ==> 指的其实就是对应Topic消费数据的流,该List集合中的流对象数目和给定的topicCountMap中该topic对应的count值一样
// TODO: List<KafkaStream<String, String>> ==> 如果一个Topic有多个分区,而且在topicCountMap中该topic给定的count值大于分区数,那么其实表示一个KafkaStream流消费一个Topic分区的数据;这里类似Kafka的Consumer Group Rebalance ===> 一个分区的数据只允许一个KafkaStream消费,但是一个KafkaStream可以消费多个分区的数据(>=0)
Map<String, List<KafkaStream<String, String>>> consumerStreamsMap = this.connector.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
// 3. 获取对应Topic的数据消费流
List<KafkaStream<String, String>> streams = consumerStreamsMap.get(topicName);
// 4. 数据消费
int k = 0;
for (final KafkaStream<String, String> stream : streams) {
new Thread(new Runnable() {
@Override
public void run() {
int count = 0;
String threadNames = Thread.currentThread().getName();
ConsumerIterator<String, String> iter = stream.iterator();
while (iter.hasNext()) {
// 获取数据
MessageAndMetadata<String, String> messageAndMetadata = iter.next();
// 处理数据
StringBuilder sb = new StringBuilder();
sb.append("线程").append(threadNames);
// 1. 获取元数据
long offset = messageAndMetadata.offset();
int partitionID = messageAndMetadata.partition();
String topicName = messageAndMetadata.topic();
// TODO: 元数据存储,方便做容错
// TODO: 这里可以将元数据保存zk/redis/mysql...(可以考虑一下怎么保存)
sb.append(";元数据=>[").append("offset=").append(offset).append("; partitionID=").append(partitionID).append("; topicName=").append(topicName).append("]");
// 2. 获取消息(key/value键值对)
String value = messageAndMetadata.message();
String key = messageAndMetadata.key();
sb.append("; 消息=>[key=").append(key).append("; value=").append(value).append("]");
System.out.println(sb.toString());
count++;
}
System.out.println("线程" + threadNames + "总共消费数据" + count + "条!!!");
}
}, "Thread-[" + k + "]-[" + topicName + "]").start();
k++;
}
}
public static void main(String[] args) throws InterruptedException {
String groupId = "170505_2";
String zookeeperUrl = "bigdata.ibeifeng.com:2181/kafka08";
boolean largest = true;
String topicName = "beifeng1";
int numThreads = 1;
ConsumerDemo demo = new ConsumerDemo(groupId, zookeeperUrl, largest, topicName, numThreads);
demo.run();
// 休息一段数据后关闭
Thread.sleep(2000);
demo.shutdown();
}
}