设为首页 加入收藏

TOP

Hadoop生态圈(九):kafka组件
2019-04-22 02:13:08 】 浏览:36
Tags:Hadoop 生态 kafka 组件

目录

1Kafka概述

1.1 消息队列

1.2 为什么需要消息队列

1.3 什么是Kafka

1.4 Kafka架构

2 Kafka集群部署

2.1 环境准备

2.1.1 集群规划

2.1.2 下载

2.2 Kafka集群部署

2.3 Kafka命令行操作

3 Kafka工作流程分析

3.1 Kafka生产过程分析

3.1.1 写入方式

3.1.2 分区(Partition)

3.1.3 副本(Replication)

3.1.4 写入流程

3.2 Broker 保存消息

3.2.1 存储方式

3.2.2 存储策略(消息删除策略)

3.3 Kafka消费过程分析

3.3.1高级API

3.3.2 低级API[了解]

3.3.3 消费者组

3.3.4 消费方式

3.3.5 消费者组案例

4 Kafka的API实战

4.1 环境准备

4.2 Kafka生产者javaAPI

4.2.1 创建生产者

4.2.2 创建带回调函数的生产者

4.2.3 自定以分区生产者

4.3 Kafka消费者javaAPI

4.3.1 高级API

4.3.2 低级API

5kafka product拦截器(interceptor)

5.1 拦截器原理

5.2 拦截器案例

6 Flume与kafka集成


1Kafka概述

1.1 消息队列

(1)点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)

点对点模型通常是一个基于拉取或者轮询的消息传送模型,这种模型从队列中请求信息,而不是将消息推送到客户端。这个模型的特点是发送到队列的消息被一个且只有一个接收者接收处理,即使有多个消息监听者也是如此

(2)发布/订阅模式(一对多,数据生产后,推送给所有订阅者)

发布订阅模型则是一个基于推送的消息传送模型。发布订阅模型可以有多种不同的订阅者,临时订阅者只在主动监听主题时才接收消息,而持久订阅者则监听主题的所有消息,即使当前订阅者不可用,处于离线状态。

1.2 为什么需要消息队列

1)解耦:

允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

2)冗余:

消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。

3)扩展性:

因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。

4)灵活性&峰值处理能力:

在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

5)可恢复性:

系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

6)顺序保证:

在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。(Kafka保证一个Partition内的消息的有序性)

7)缓冲:

有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况

8)异步通信:

很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

1.3 什么是Kafka

一个开源消息系统,一个分布式消息队列;目的提供一个统一、高通量、低等待的平台;依赖于Zookeeper来保证系统的可用性

在流式计算中。Kafka一般用于缓存数据,Storm或者spark通过消费Kafka的数据进行计算。

1)Apache Kafka是一个开源消息系统,由Scala写成。是由Apache软件基金会开发的一个开源消息系统项目。

2)Kafka最初是由LinkedIn公司开发,并于2011年初开源。2012年10月从Apache Incubator毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。

3)Kafka是一个分布式消息队列Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)称为broker。

4)无论是kafka集群,还是consumer都依赖于zookeeper集群保存一些meta信息,来保证系统可用性。

1.4 Kafka架构

Kafka整体架构:

Kafka详细架构:

组件介绍:

1)Producer :消息生产者,就是向kafka broker发消息的客户端;

2)Consumer :消息消费者,向kafka broker取消息的客户端;

3)Topic :可以理解为一个队列(就是同一个业务的数据放在一个topic下)

4) Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic;

5)Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic;

6)Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序;

7)Offset:偏移量。

2 Kafka集群部署

2.1 环境准备

2.1.1 集群规划

hadoop101

hadoop102 hadoop103
zk zk zk
Kafka Kafka Kafka

2.1.2 下载

官网下载:http://kafka.apache.org/downloads.html

网盘链接:请点这里 提取码:lm8k

2.2 Kafka集群部署

1)解压安装包

[root@hadoop101software]$ tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/

2)修改解压后的文件名称

[root@hadoop101module]$ mv kafka_2.11-0.11.0.0/ kafka

3)在/opt/module/kafka目录下创建logs文件夹

[root@hadoop101kafka]$ mkdir logs

4)修改配置文件

[root@hadoop101kafka]$ cd config/

[root@hadoop101config]$ vim server.properties

#broker全局唯一编号,不能重复

broker.id=1

#删除topic功能使能

delete.topic.enable=true

#kafka运行日志存放的路径

log.dirs=/opt/module/kafka/logs

#配置连接Zookeeper集群地址

zookeeper.connect=hadoop101:2181,hadoop102:2181,hadoop103:2181

5)分发安装包

6)分别在hadoop102和hadoop103上修改配置文件/opt/module/kafka/config/server.properties中的broker.id=2、broker.id=3

注:broker.id不得重复

7)启动集群

依次在hadoop101、hadoop102、hadoop103节点上启动kafka

[root@hadoop101kafka]$ bin/kafka-server-start.sh config/server.properties &

[root@hadoop102kafka]$ bin/kafka-server-start.sh config/server.properties &

[root@hadoop103kafka]$ bin/kafka-server-start.sh config/server.properties &

8)关闭集群

[root@hadoop101kafka]$ bin/kafka-server-stop.sh stop

[root@hadoop102kafka]$ bin/kafka-server-stop.sh stop

[root@hadoop103kafka]$ bin/kafka-server-stop.sh stop

2.3 Kafka命令行操作

1)查看当前服务器中的所有topic

[root@hadoop101kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181 --list

2)创建topic

[root@hadoop101kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181 --create--replication-factor 3 --partitions 1 --topic first

选项说明:

--topic 定义topic名

--replication-factor 定义副本数

--partitions 定义分区数

3)删除topic

[root@hadoop101kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181 --delete--topic first

需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。

4)发送消息

创建second主题

bin/kafka-topics.sh --zookeeper hadoop101:2181 --create --replication-factor 3 --partitions 3 --topic second

[root@hadoop101kafka]$ bin/kafka-console-producer.sh --broker-list hadoop101:9092 --topic second

>hello world

>bigdatabigdata

5)消费消息

[root@hadoop102kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop101:2181 --topic second--from-beginning

--from-beginning会把first主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。

6)查看某个Topic的详情

[root@hadoop101kafka]$ bin/kafka-topics.sh --zookeeper hadoop101:2181 --describe --topic second

3 Kafka工作流程分析

Create topic(创建topic的时候指定该topic分区数、副本数);此处对比create table Producer向kafka的topic发送消息

3.1 Kafka生产过程分析

3.1.1 写入方式

producer采用推(push)模式将消息发布到broker,每条消息都被追加(append)到分区(patition)中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障kafka吞吐率)。

3.1.2 分区(Partition)

消息发送时都被发送到一个topic,其本质就是一个目录,而topic是由一些Partition Logs(分区日志)组成,其组织结构如下图所示:

我们可以看到,每个Partition中的消息都是有序的,生产的消息被不断追加到Partition log上,其中的每一个消息都被赋予了一个唯一的offset值

1)分区的原因

(1)方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了;

(2)可以提高并发,因为可以以Partition为单位读写了。

2)分区的原则

(1)指定了patition,则直接使用; <1, “我是中国人”>

(2)未指定patition但指定key,通过对key的值进行hash出一个patition;

(3)patition和key都未指定,使用轮询选出一个patition。

DefaultPartitioner类
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

3.1.3 副本(Replication)

副本是针对分区的副本

同一个partition可能会有多个replication(对应 server.properties 配置中的 default.replication.factor=N)。没有replication的情况下,一旦broker 宕机,其上所有 patition 的数据都不可被消费,同时producer也不能再将数据存于其上的patition。引入replication之后,同一个partition可能会有多个replication,而这时需要在这些replication之间选出一个leader,producer和consumer只与这个leader交互,其它replication作为follower从leader 中复制数据。

3.1.4 写入流程

producer写入消息流程如下

1)producer先从zookeeper的 "/brokers/.../state"节点找到该partition的leader

2)producer将消息发送给该leader

3)leader将消息写入本地log

4)followers从leader pull消息,写入本地log后向leader发送ACK

5)leader收到所有ISR中的replication的ACK后向producer发送ACK

3.2 Broker 保存消息

3.2.1 存储方式

物理上把topic分成一个或多个patition(对应 server.properties 中的num.partitions=3配置),每个patition物理上对应一个文件夹(该文件夹存储该patition的所有消息和索引文件),如下:

[root@hadoop101logs]$ ll

drwxrwxr-x. 2 rootroot4096 8月 6 14:37 first-0

drwxrwxr-x. 2 rootroot4096 8月 6 14:35 first-1

drwxrwxr-x. 2 rootroot8月 6 14:37 first-2

[root@hadoop101logs]$ cd first-0

[root@hadoop101first-0]$ ll

-rw-rw-r--. 1 rootroot10485760 8月 6 14:33 00000000000000000000.index

-rw-rw-r--. 1 rootroot219 8月 6 15:07 00000000000000000000.log

-rw-rw-r--. 1 rootroot10485756 8月 6 14:33 00000000000000000000.timeindex

-rw-rw-r--. 1 rootroot8 8月 6 14:37 leader-epoch-checkpoint

3.2.2 存储策略(消息删除策略)

无论消息是否被消费,kafka都会保留所有消息。有两种策略可以删除旧数据:

1)基于时间:log.retention.hours=168

2)基于大小:log.retention.bytes=1073741824

需要注意的是,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高 Kafka 性能无关。

3.3 Kafka消费过程分析

kafka提供了两套consumer API:高级Consumer API和低级Consumer API。

3.3.1高级API

1)高级API优点

高级API 写起来简单

不需要自行去管理offset,系统通过zookeeper自行管理

不需要管理分区,副本等情况,.系统自动管理

消费者断线会自动根据上一次记录在zookeeper中的offset去接着获取数据(默认设置1分钟更新一下zookeeper中存的offset)

可以使用group来区分对同一个topic 的不同程序访问分离开来(不同的group记录不同的offset,这样不同程序读取同一个topic才不会因为offset互相影响)

2)高级API缺点

不能自行控制offset(对于某些特殊需求来说)

不能细化控制如分区、副本、zk等

3.3.2 低级API[了解]

1)低级 API 优点

能够开发者自己控制offset,想从哪里读取就从哪里读取。

自行控制连接分区,对分区自定义进行负载均衡

对zookeeper的依赖性降低(如:offset不一定非要靠zk存储,自行存储offset即可,比如存在文件或者内存中)

2)低级API缺点

太过复杂,需要自行控制offset,连接哪个分区,找到分区leader 等。

3.3.3 消费者组

消费者是以consumer group消费者组的方式工作,由一个或者多个消费者组成一个组,共同消费一个topic。每个分区在同一时间只能由group中的一个消费者读取,但是多个group可以同时消费这个partition。在图中,有一个由三个消费者组成的group,有一个消费者读取主题中的两个分区,另外两个分别读取一个分区。某个消费者读取某个分区,也可以叫做某个消费者是某个分区的拥有者。

在这种情况下,消费者可以通过水平扩展的方式同时读取大量的消息。另外,如果一个消费者失败了,那么其他的group成员会自动负载均衡读取之前失败的消费者读取的分区。

3.3.4 消费方式

Consumer采用pull(拉)模式从broker中读取数据。

push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。

对于Kafka而言,pull模式更合适,它可简化broker的设计,consumer可自主控制消费消息的速率,同时consumer可以自己控制消费方式。

pull模式不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直等待数据到达。

3.3.5 消费者组案例

消费者组效果

1)需求:测试同一个消费者组中的消费者,同一时刻只能有一个消费者消费。

2)案例实操

(1)在hadoop102、hadoop103上修改/opt/module/kafka/config/consumer.properties配置文件中的group.id属性为任意组名。

[root@hadoop102config]$ vim consumer.properties

group.id=hadoop_kafka

(2)在hadoop102、hadoop103上分别启动消费者

[root@hadoop102kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop101:2181 --topic second--consumer.config config/consumer.properties

[root@hadoop103 kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop101:2181 --topic second--consumer.config config/consumer.properties

(3)在hadoop101上启动生产者

[root@hadoop101kafka]$ bin/kafka-console-producer.sh \

--broker-list hadoop101:9092 --topic second

>hello world

(4)查看hadoop102和hadoop103的接收者。

同一时刻只有一个消费者接收到消息。

4 Kafka的API实战

4.1 环境准备

1)启动zk和kafka集群,在kafka集群中打开一个消费者

[bigdata@hadoop101kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop101:2181 --topic second

2)导入下面的依赖

        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId	>
            <version>0.11.0.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>0.11.0.0</version>
        </dependency>

4.2 Kafka生产者javaAPI

4.2.1 创建生产者

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class NewProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        // Kafka服务端的主机名和端口号
        props.put("bootstrap.servers", "hadoop103:9092");
        // 等待所有副本节点的应答
        props.put("acks", "all");
        // 消息发送最大尝试次数
        props.put("retries", 0);
        // 发送缓存区内存大小
        props.put("buffer.memory", 33554432);
        // key序列化
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // value序列化
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 50; i++) {
            producer.send(new ProducerRecord<String, String>("second", Integer.toString(i), "hello world-" + i));
        }
        producer.close();
    }
}

4.2.2 创建带回调函数的生产者

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class CallBackProducer {
	public static void main(String[] args) {
		Properties props = new Properties();
		// Kafka服务端的主机名和端口号
		props.put("bootstrap.servers", "hadoop103:9092");
		// 等待所有副本节点的应答
		props.put("acks", "all");
		// 消息发送最大尝试次数
		props.put("retries", 0);
		// 发送缓存区内存大小
		props.put("buffer.memory", 33554432);
		// key序列化
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		// value序列化
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

		// 自定义分区
		props.put("partitioner.class", "com.bigdata.CustomPartitioner");

		Producer<String, String> kafkaProducer = new KafkaProducer<>(props);

		for (int i = 0; i < 50; i++) {
			kafkaProducer.send(new ProducerRecord<String, String>("second", "hello" + i), new Callback() {
				@Override
				public void onCompletion(RecordMetadata metadata, Exception exception) {
					if (metadata != null) {
						System.err.println(metadata.partition() + "---" + metadata.offset());
					}
				}
			});
		}
		kafkaProducer.close();
	}
}

4.2.3 自定以分区生产者

1)需求:将所有数据存储到topic的第0号分区上

2)自定义分区

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class CustomPartitioner implements Partitioner {

	@Override
	public void configure(Map<String, > configs) {
	}
	@Override
	public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 控制分区
		return 0;
	}
	@Override
	public void close() {
	}
}

3)调用自定义分区

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class PartitionerProducer {

	public static void main(String[] args) {
		
		Properties props = new Properties();
		// Kafka服务端的主机名和端口号
		props.put("bootstrap.servers", "hadoop103:9092");
		// 等待所有副本节点的应答
		props.put("acks", "all");
		// 消息发送最大尝试次数
		props.put("retries", 0);
		// 发送缓存区内存大小
		props.put("buffer.memory", 33554432);
		// key序列化
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		// value序列化
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		// 自定义分区
		props.put("partitioner.class", "com.bigdata.CustomPartitioner");

		Producer<String, String> producer = new KafkaProducer<>(props);
		producer.send(new ProducerRecord<String, String>("second", "1", "bigdata"));

		producer.close();
	}
}

4)测试

在hadoop101上监控/opt/module/kafka/logs/目录下first主题3个分区的log日志动态变化情况

[root@hadoop101first-0]$ tail -f 00000000000000000000.log

[root@hadoop101first-1]$ tail -f 00000000000000000000.log

[root@hadoop101first-2]$ tail -f 00000000000000000000.log

4.3 Kafka消费者javaAPI

4.3.1 高级API

1)在控制台创建生产者

[root@hadoop101kafka]$ bin/kafka-console-producer.sh --broker-list hadoop101:9092 --topic second

>hello world

2)官方提供案例(自动维护消费情况)

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class CustomNewConsumer {

	public static void main(String[] args) {

		Properties props = new Properties();
		// 定义kakfa 服务的地址,不需要将所有broker指定上 
		props.put("bootstrap.servers", "hadoop101:9092");
		// 制定consumer group 
		props.put("group.id", "test");
		// 是否自动确认offset 
		props.put("enable.auto.commit", "true");
		// 自动确认offset的时间间隔 
		props.put("auto.commit.interval.ms", "1000");
		// key的序列化类
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		// value的序列化类 
		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		// 定义consumer 
		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
		
		// 消费者订阅的topic, 可同时订阅多个 
		consumer.subscribe(Arrays.asList("first", "second","third"));

		while (true) {
			// 读取数据,读取超时时间为100ms 
			ConsumerRecords<String, String> records = consumer.poll(100);
			
			for (ConsumerRecord<String, String> record : records)
				System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
		}
	}
}

4.3.2 低级API

实现使用低级API读取指定的topic,指定partition,指定offset的数据

1)方法描述:

findLeader()

客户端向种子节点发送主题元数据,将副本集加入备用节点

getLastOffset()

消费者客户端发送偏移量请求,获取分区最近的偏移量

run()

消费者低级AP I拉取消息的主要方法

findNewLeader()

当分区的主副本节点发生故障,客户将要找出新的主副本

2)代码:

import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.BrokerEndPoint;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;

import java.nio.ByteBuffer;
import java.util.*;

public class SimpleExample {
    private List<String> m_replicaBrokers = new ArrayList<>();

    public SimpleExample() {
        m_replicaBrokers = new ArrayList<>();
    }

    public static void main(String args[]) {
        SimpleExample example = new SimpleExample();
        // 最大读取消息数量
        long maxReads = Long.parseLong("3");
        // 要订阅的topic
        String topic = "second";
        // 要查找的分区
        int partition = Integer.parseInt("0");
        // broker节点的ip
        List<String> seeds = new ArrayList<>();
        seeds.add("hadoop101");
        seeds.add("hadoop102");
        seeds.add("hadoop103");
        // 端口
        int port = Integer.parseInt("9092");
        try {
            example.run(maxReads, topic, partition, seeds, port);
        } catch (Exception e) {
            System.out.println("Oops:" + e);
            e.printStackTrace();
        }
    }

    public void run(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception {
        // 获取指定Topic partition的元数据
        PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
        if (metadata == null) {
            System.out.println("Can't find metadata for Topic and Partition. Exiting");
            return;
        }
        if (metadata.leader() == null) {
            System.out.println("Can't find Leader for Topic and Partition. Exiting");
            return;
        }
        String leadBroker = metadata.leader().host();
        String clientName = "Client_" + a_topic + "_" + a_partition;

        SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
        long readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime(), clientName);
        int numErrors = 0;
        while (a_maxReads > 0) {
            if (consumer == null) {
                consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName);
            }
            FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 100000).build();
            FetchResponse fetchResponse = consumer.fetch(req);

            if (fetchResponse.hasError()) {
                numErrors++;
                // Something went wrong!
                short code = fetchResponse.errorCode(a_topic, a_partition);
                System.out.println("Error fetching data from the Broker:" + leadBroker + " Reason: " + code);
                if (numErrors > 5)
                    break;
                if (code == ErrorMapping.OffsetOutOfRangeCode()) {
                    // We asked for an invalid offset. For simple case ask for
                    // the last element to reset
                    readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime(), clientName);
                    continue;
                }
                consumer.close();
                consumer = null;
                leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
                continue;
            }
            numErrors = 0;

            long numRead = 0;
            for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
                long currentOffset = messageAndOffset.offset();
                if (currentOffset < readOffset) {
                    System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
                    continue;
                }
                readOffset = messageAndOffset.nextOffset();
                ByteBuffer payload = messageAndOffset.message().payload();

                byte[] bytes = new byte[payload.limit()];
                payload.get(bytes);
                System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
                numRead++;
                a_maxReads--;
            }

            if (numRead == 0) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }
        }
        if (consumer != null)
            consumer.close();
    }

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);

        if (response.hasError()) {
            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
            return 0;
        }
        long[] offsets = response.offsets(topic, partition);
        return offsets[0];
    }


    private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception {
        for (int i = 0; i < 3; i++) {
            boolean goToSleep = false;
            PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition);
            if (metadata == null) {
                goToSleep = true;
            } else if (metadata.leader() == null) {
                goToSleep = true;
            } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
                // first time through if the leader hasn't changed give
                // ZooKeeper a second to recover
                // second time, assume the broker did recover before failover,
                // or it was a non-Broker issue
                //
                goToSleep = true;
            } else {
                return metadata.leader().host();
            }
            if (goToSleep) {
                    Thread.sleep(1000);
            }
        }
        System.out.println("Unable to find new leader after Broker failure. Exiting");
        throw new Exception("Unable to find new leader after Broker failure. Exiting");
    }

    private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) {
        PartitionMetadata returnMetaData = null;
        loop:
        for (String seed : a_seedBrokers) {
            SimpleConsumer consumer = null;
            try {
                consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
                List<String> topics = Collections.singletonList(a_topic);
                TopicMetadataRequest req = new TopicMetadataRequest(topics);
                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

                List<TopicMetadata> metaData = resp.topicsMetadata();
                for (TopicMetadata item : metaData) {
                    for (PartitionMetadata part : item.partitionsMetadata()) {
                        if (part.partitionId() == a_partition) {
                            returnMetaData = part;
                            break loop;
                        }
                    }
                }
            } catch (Exception e) {
                System.out.println("Error communicating with Broker [" + seed + "] to find Leader for [" + a_topic + ", " + a_partition + "] Reason: " + e);
            } finally {
                if (consumer != null)
                    consumer.close();
            }
        }
        if (returnMetaData != null) {
            m_replicaBrokers.clear();
            for (BrokerEndPoint replica : returnMetaData.replicas()) {
                m_replicaBrokers.add(replica.host());
            }
        }
        return returnMetaData;
    }
}

5kafka product拦截器(interceptor)

5.1 拦截器原理

Producer拦截器(interceptor)是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。

对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:

(1)configure(configs)

获取配置信息和初始化数据时调用。

(2)onSend(ProducerRecord):

该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中。Producer确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算

(3)onAcknowledgement(RecordMetadata, Exception):

该方法会在消息被应答或消息发送失败时调用,并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率

(4)close:

关闭interceptor,主要用于执行一些资源清理工作

如前面所描述的,interceptor可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非在向上传递。

5.2 拦截器案例

1)需求:

实现一个简单的双interceptor组成的拦截链。第一个interceptor会在消息发送前将时间戳信息加到消息value的最前部;第二个interceptor会在消息发送后更新成功发送消息数或失败发送消息数。

2)案例

(1)增加时间戳拦截器

import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class TimeInterceptor implements ProducerInterceptor<String, String> {

	@Override
	public void configure(Map<String, > configs) {

	}

	@Override
	public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
		// 创建一个新的record,把时间戳写入消息体的最前部
		return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
				System.currentTimeMillis() + "," + record.value().toString());
	}

	@Override
	public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

	}

	@Override
	public void close() {

	}
}

(2)统计发送消息成功和发送失败消息数,并在producer关闭时打印这两个计数器

import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class CounterInterceptor implements ProducerInterceptor<String, String>{
    private int errorCounter = 0;
    private int successCounter = 0;

	@Override
	public void configure(Map<String, > configs) {
		
	}

	@Override
	public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
		 return record;
	}

	@Override
	public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
		// 统计成功和失败的次数
        if (exception == null) {
            successCounter++;
        } else {
            errorCounter++;
        }
	}

	@Override
	public void close() {
        // 保存结果
        System.out.println("Successful sent: " + successCounter);
        System.out.println("Failed sent: " + errorCounter);
	}
}

(3)producer主程序

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

public class InterceptorProducer {

	public static void main(String[] args) throws Exception {
		// 1 设置配置信息
		Properties props = new Properties();
		props.put("bootstrap.servers", "hadoop101:9092");
		props.put("acks", "all");
		props.put("retries", 0);
		props.put("buffer.memory", 33554432);
		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
		
		// 2 构建拦截链
		List<String> interceptors = new ArrayList<>();
		interceptors.add("com.bigdata.kafka.interceptor.TimeInterceptor");
                interceptors.add("com.bigdata.kafka.interceptor.CounterInterceptor"); 
		props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
		 
		String topic = "second";
		Producer<String, String> producer = new KafkaProducer<>(props);
		
		// 3 发送消息
		for (int i = 0; i < 10; i++) {
			
		    ProducerRecord<String, String> record = new ProducerRecord<>(topic, "message" + i);
		    producer.send(record);
		}
		 
		// 4 一定要关闭producer,这样才会调用interceptor的close方法
		producer.close();
	}
}

3)测试

(1)在kafka上启动消费者,然后运行客户端java程序。

[bigdata@hadoop101kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop101:2181 --from-beginning --topic second

1501904047034,message0

1501904047225,message1

1501904047230,message2

1501904047234,message3

1501904047236,message4

1501904047240,message5

1501904047243,message6

1501904047246,message7

1501904047249,message8

1501904047252,message9

(2)观察java平台控制台输出数据如下:

Successful sent: 10

Failed sent: 0

6 Flume与kafka集成

1)配置flume(flume-kafka.conf)

# define

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# source

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /opt/module/datas/flume.log

a1.sources.r1.shell = /bin/bash -c

# sink

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092

a1.sinks.k1.kafka.topic = second

a1.sinks.k1.kafka.flumeBatchSize = 20

a1.sinks.k1.kafka.producer.acks = -1

# channel

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

# bind

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

2)启动kafkaIDEA消费者

3)进入flume根目录下,启动flume

$ bin/flume-ng agent -c conf/ -n a1 -f jobs/flume-kafka.conf

4)向/opt/module/datas/flume.log里追加数据,查看kafka消费者消费情况

$ echohello > /opt/module/datas/flume.log

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Error while fetching metadata w.. 下一篇kafka   Partition分发策略

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目