注:本人初次接触Flink,其优点以及优化细节有待后续研究,直接上手业务。
简单业务逻辑: 将kafka多个topic数据由Flink读取写入到Es不同index中。
1.其中topic中数据使用自定义avro格式压缩,前4个字节为数据schemaID,后续数据字节为具体avro格式的二进制数据
2.schema定义以文件形式存储在zookeeper中,文件名为schemaId。
3.topic名字:avro-bj-http,avro-bj-dns;
4.index名字: YYYYMMDD-http,YYYYMMDD-dns 其中索引根据数据中的recv_time、log_type字段确定
Begin:
1.单机搭建Flink。
在官网上找到下载包flink-1.5.1-bin-hadoop27-scala_2.11.tgz 下载解压,进入bin目录运行start-cluster.sh;
打开localhost:8081,页面打开即可。
(若有问题,初级搭建阶段,请多看log启动日志,会提示端口或者其他一些基础配置需要更改)
2.直接上手代码。
任务1 读取kakfa数据
因为flink对kafka、es等很多大数据组件都做了相应的集成,直接读懂文档上手即可。
百度什么的都不要信,参考官方文档
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html
pom文件如下:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.5.1</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.5.1</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.5.1</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.5.1</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch5_2.11</artifactId>
<version>1.5.0</version>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>1.1.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.7.4</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.7</version>
</dependency>
yml配置文件
bootstrapServer: 11.11.*.*:6667,11.11.*.*:6667
zookeeperConnect: 11.11.*.*:2181,11.11.*.*:2181
avroPath: /matthew/schema/avro
topics: avro-bj-pro-http,avro-bj-pro-dns
esTransports: 11.11.*.*:9300
esClusterName: ES
group.id: kafka2es
主类:
package com.matthew.flink;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.Yaml;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.InputStream;
import java.util.*;
/**
* Created by developer on 7/16/18.
*/
public class Kafka2Es {
private static final Logger logger = LoggerFactory.getLogger(Kafka2Es.class);
public static void main(String[] args) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
InputStream inputStream = Thread.currentThread().getContextClassLoader().getResourceAsStream("config.yml");
Map<String, String> config = new Yaml().loadAs(inputStream, Map.class);
IOUtils.closeStream(inputStream);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", config.get("bootstrapServer"));
properties.setProperty("zookeeper.connect", config.get("zookeeperConnect"));
properties.setProperty("group.id", config.get("group.id"));
List<String> topics = Arrays.asList(config.get("topics").split(","));
// List<String> topics = Arrays.asList(args);
FlinkKafkaConsumer010 kafkaConsumer = new FlinkKafkaConsumer010(topics, new SimpleStringSchema(), properties);
DataStream<String> stream = env.addSource(kafkaConsumer).name("KafkaConsumer");
stream.print();
try {
String t = "";
for (String topic : topics) {
t = t + topic +";";
}
env.execute("Kafka2Es-" + t.substring(0,t.length()-1));
} catch (Exception e) {
logger.error(e.getLocalizedMessage());
}
}
}
直接intellij debug或者run即可在控制台看到日志以字符串格式输出。
(注: 遇到问题,一般官方文档摘抄下的代码大多数不会有错误,我遇到的问题是程序没有报错,一直卡着,仔细排查日志后发现是因为个人忘记配置host文件,host文件应将kafka、zk、es所有节点的主机ip都可以在运行的机器上配置好。以前spark开发也会遇到类似问题,其实不仅仅这些组件,主要是因为kafka、zk内部集群网络通信的原因,配置hosts文件、不要在这上面浪费精力。)