1、简介
1.1、Kafka Consumer提供了2种API:high level与low level(SimpleConsumer)。
(1)high level consumer的API较为简单,不需要关心offset、partition、broker等信息,kafka会自动读取zookeeper中该consumer group的last offset。
(2)low level consumer也叫SimpleConsumer,这个接口非常复杂,需要自己写代码去实现对offset、partition、broker以及broker的切换,能不用就不用,那何时必须用?
1、Read a message multiple times
2、Consume only a subset of the partitions in a topic in a process
3、Manage transactions to make sure a message is processed once and only once
2、Flink的开发准备
Flink提供了high level的API来消费kafka的数据:flink-connector-kafka-0.8_2.10。注意,这里的0.8代表的是kafka的版本,你可以通过maven来导入kafka的依赖,具体如下:
例如你的kafka安装版本是“kafka_2.10-0.8.2.1”,即此版本是由scala2.10编写,kafka的自身版本是0.8.2.1.那此时你需要添加如下的内容到maven的pom.xml文件中:
<dependency >
<groupId > org.apache.flink</groupId >
<artifactId > flink-connector-kafka-0.8_2.10</artifactId >
<version > ${flink.version}</version >
</dependency >
注意:
$${flink.version}是个变量,自己调整下代码,例如可以直接写1.0.0。我的项目里采用的是添加了properties来控制${flink.version}:
<properties >
<project.build.sourceEncoding > UTF-8</project.build.sourceEncoding >
<flink.version > 1.0.0</flink.version >
</properties >
3、集群环境准备
这里主要是介绍下Flink集群与kafka集群的搭建。
基础的软件安装包括JDK、scala、hadoop、zookeeper、kafka以及flink就不介绍了,直接看下flink的集群配置以及kafka的集群配置。
zookeeper–3.4.6
hadoop–2.6.0
kafka–2.10-0.8.2.1
flink–1.0.3
3.1、Flink集群配置(standalone且没有用zookeeper的HA)
3.1.1、环境变量
添加FLINK_HOME以及path的内容:
export FLINK_HOME=/usr/local/flink/flink-1.0 .3
export PATH=.:${JAVA_HOME} /bin:${SCALA_HOME} /bin:${HADOOP_HOME} /bin:${HADOOP_HOME} /sbin:${ZOOKEEPER_HOME} /bin:${KAFKA_HOME} /bin:${FLINK_HOME} /bin:$PATH
export CLASS_PATH=.:${JAVA_HOME} /lib:${JRE_HOME} /lib
3.1.2、修改conf/flink-conf.yaml
这几乎是最简单的配置方式了,主要注意要修改jobmanager.rpc.address为集群中jobManager的IP或hostname。检查点以及HA的参数都没有配置。
3.1.3、slaves文件
这个文件中存放的信息是taskmanager的hostname。
3.1.4、复制flink目录以及.bashrc文件到集群中其他的机器,并使bashrc生效
root@master :/usr/local/flink
root@master :/usr/local/flink
root@master :/usr/local/flink
root@master :/usr/local/flink
root@worker1 :~
root@worker2 :~
3.2、kafka集群配置
3.2.1、环境变量
省略
3.2.2、配置config/zookeeper.properties
由于kafka集群依赖于zookeeper集群,所以kafka提供了通过kafka去启动zookeeper集群的功能,当然也可以手动去启动zookeeper的集群而不通过kafka去启动zookeeper的集群。
注意这里的dataDir最好不要指定/tmp目录下,因为机器重启会删除此目录下的文件。且指定的新路径必须存在。
3.2.3、配置config/server.properties
这个文件是启动kafka集群需要指定的配置文件,注意2点:
broker.id=0
listeners=PLAINTEXT :// :9092
broker.id在kafka集群的每台机器上都不一样,我这里3台集群分别是0、1、2.
zookeeper.connect=master :2181 ,worker1 :2181 ,worker2 :2181
zookeeper.connection.timeout.ms=6000
zookeeper.connect要配置kafka集群所依赖的zookeeper集群的信息,hostname:port。
3.2.4、复制kafka路径及环境变量到其他kafka集群的机器,并修改server.properties中的broker_id.
复制过程省略
3.3、启动kafka集群+Flink集群
3.3.1、首先启动zookeeper集群(3台zookeeper机器都要启动):
root@master :/usr/local/zookeeper/zookeeper- 3.4 .6 /bin
root@worker1 :/usr/local/zookeeper/zookeeper- 3.4 .6 /bin
root@worker2 :/usr/local/zookeeper/zookeeper- 3.4 .6 /bin
验证zookeeper集群:
进程是否启动;zookeeper集群中是否可以正常显示leader以及follower。
root@master :/usr/local/zookeeper/zookeeper- 3.4 .6 /bin
3295 QuorumPeerMain
root@master :/usr/local/zookeeper/zookeeper- 3.4 .6 /bin
JMX enabled by default
Using config: /usr/local/zookeeper/zookeeper-3.4 .6 /bin/../conf/zoo.cfg
Mode : follower
root@master :/usr/local/zookeeper/zookeeper- 3.4 .6 /bin
root@worker1 :/usr/local/zookeeper/zookeeper- 3.4 .6 /bin
JMX enabled by default
Using config: /usr/local/zookeeper/zookeeper-3.4 .6 /bin/../conf/zoo.cfg
Mode : follower
root@worker1 :/usr/local/zookeeper/zookeeper- 3.4 .6 /bin
root@worker2 :/usr/local/zookeeper/zookeeper- 3.4 .6 /bin
JMX enabled by default
Using config: /usr/local/zookeeper/zookeeper-3.4 .6 /bin/../conf/zoo.cfg
Mode : leader
root@worker2 :/usr/local/zookeeper/zookeeper- 3.4 .6 /bin
3.3.2、启动kafka集群(3台都要启动)
root@master :/usr/local/kafka/kafka_2 .10 -0 .8.2 .1 /bin
root@worker1 :/usr/local/kafka/kafka_2 .10 -0 .8.2 .1 /bin
root@worker2 :/usr/local/kafka/kafka_2 .10 -0 .8.2 .1 /bin
验证:
进程;日志
3512 Kafka
3.3.3、启动hdfs(master上启动即可)
root@master :/usr/local/hadoop/hadoop- 2.6 .0 /sbin
验证:进程及webUI
root@master :/usr/local/hadoop/hadoop- 2.6 .0 /sbin
3798 NameNode
4007 SecondaryNameNode
root@worker1 :/usr/local/hadoop/hadoop- 2.6 .0 /sbin
3843 DataNode
root@worker2 :/usr/local/hadoop/hadoop- 2.6 .0 /sbin
3802 DataNode
webUI:50070,默认可配置
3.3.4、启动Flink集群(master即可)
root@master :/usr/local/flink/flink- 1.0 .3 /bin
验证:进程及WebUI
root@master :/usr/local/flink/flink- 1.0 .3 /bin
4411 JobManager
root@worker1 :/usr/local/flink/flink- 1.0 .3 /bin
4151 TaskManager
root@worker2 :/usr/local/flink/flink- 1.0 .3 /bin
4110 TaskManager
WebUI:8081(默认,可配置)
4、编写Flink程序,实现consume kafka的数据(demo)
4.1、代码
这里就是简单的实现接收kafka的数据,要指定zookeeper以及kafka的集群配置,并指定topic的名字。
最后将consume的数据直接打印出来。
import java .util .Properties
import org.apache .flink .streaming .api .{CheckpointingMode, TimeCharacteristic}
import org.apache .flink .streaming .api .scala .StreamExecutionEnvironment
import org.apache .flink .streaming .connectors .kafka .FlinkKafkaConsumer 08
import org.apache .flink .streaming .util .serialization .SimpleStringSchema
import org.apache .flink .streaming .api .scala ._
object ReadingFromKafka {
private val ZOOKEEPER_HOST = "master:2181,worker1:2181,worker2:2181"
private val KAFKA_BROKER = "master:9092,worker1:9092,worker2:9092"
private val TRANSACTION_GROUP = "transaction"
def main(args : Array[String]){
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic (TimeCharacteristic.EventTime )
env.enableCheckpointing (1000 )
env.getCheckpointConfig .setCheckpointingMode (CheckpointingMode.EXACTLY _ONCE)
// configure Kafka consumer
val kafkaProps = new Properties()
kafkaProps.setProperty ("zookeeper.connect" , ZOOKEEPER_HOST)
kafkaProps.setProperty ("bootstrap.servers" , KAFKA_BROKER)
kafkaProps.setProperty ("group.id" , TRANSACTION_GROUP)
//topicd的名字是new,schema默认使用SimpleStringSchema()即可
val transaction = env
.addSource (
new FlinkKafkaConsumer08[String]("new" , new SimpleStringSchema(), kafkaProps)
)
transaction.print ()
env.execute ()
}
}
4.2、打包:
mvn clean package
看到成功标志,否则会提示error的地方。
4.3、发布到集群
root@master :/usr/local/flink/flink- 1.0 .3 /bin
验证:进程及WebUI
root@master :/usr/local/flink/flink- 1.0 .3 /bin
6080 CliFrontend
5、kafka produce数据,验证flink是否正常消费
5.1、通过kafka console produce数据
之前已经在kafka中创建了名字为new的topic,因此直接produce new的数据:
root@master :/usr/local/kafka/kafka_2 .10 -0 .8.2 .1 /bin
生产数据:
5.2、查看flink的标准输出中,是否已经消费了这部分数据:
root@worker2 :/usr/local/flink/flink- 1.0 .3 /log
-rw-r--r-- 1 root root 254 6 月 29 09: 37 flink-root-taskmanager-0 -worker2.out
root@worker2 :/usr/local/flink/flink- 1.0 .3 /log
我们在worker2的log中发现已经有了数据,下面看看内容:
OK,没问题,flink正常消费了数据。
6、总结
kafka作为一个消息系统,本身具有高吞吐、低延时、持久化、分布式等特点,其topic可以指定replication以及partitions,使得可靠性和性能都可以很好的保证。
Kafka+Flink的架构,可以使flink只需关注计算本身。
参考
http://www.tuicool.com/articles/fI7J3m
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/kafka.html
http://kafka.apache.org/082/documentation.html
http://dataartisans.github.io/flink-training/exercises/toFromKafka.html
http://data-artisans.com/kafka-flink-a-practical-how-to/