设为首页 加入收藏

TOP

0.1 Flink--Kafka2Es之读取kakfa数据
2019-05-02 02:32:14 】 浏览:65
Tags:0.1 Flink--Kafka2Es 读取 kakfa 数据
版权声明:版权归个人所有,若有转载请注明出处,否则将追究法律责任。 https://blog.csdn.net/weixin_40251395/article/details/81179811

注:本人初次接触Flink,其优点以及优化细节有待后续研究,直接上手业务。

简单业务逻辑: 将kafka多个topic数据由Flink读取写入到Es不同index中。

1.其中topic中数据使用自定义avro格式压缩,前4个字节为数据schemaID,后续数据字节为具体avro格式的二进制数据

2.schema定义以文件形式存储在zookeeper中,文件名为schemaId。

3.topic名字:avro-bj-http,avro-bj-dns;

4.index名字: YYYYMMDD-http,YYYYMMDD-dns 其中索引根据数据中的recv_time、log_type字段确定

Begin:

1.单机搭建Flink。

在官网上找到下载包flink-1.5.1-bin-hadoop27-scala_2.11.tgz 下载解压,进入bin目录运行start-cluster.sh;

打开localhost:8081,页面打开即可。

(若有问题,初级搭建阶段,请多看log启动日志,会提示端口或者其他一些基础配置需要更改)

2.直接上手代码。

任务1 读取kakfa数据

因为flink对kafka、es等很多大数据组件都做了相应的集成,直接读懂文档上手即可。

百度什么的都不要信,参考官方文档

https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/connectors/kafka.html

pom文件如下:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.5.1</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.5.1</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.5.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.5.1</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.5.1</version>
            <!--<scope>provided</scope>-->
        </dependency>
        
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
            <version>1.5.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch5_2.11</artifactId>
            <version>1.5.0</version>
        </dependency>

        <dependency>
            <groupId>org.xerial.snappy</groupId>
            <artifactId>snappy-java</artifactId>
            <version>1.1.1.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.7.4</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.7</version>
        </dependency>

yml配置文件

bootstrapServer: 11.11.*.*:6667,11.11.*.*:6667
zookeeperConnect: 11.11.*.*:2181,11.11.*.*:2181
avroPath: /matthew/schema/avro
topics: avro-bj-pro-http,avro-bj-pro-dns
esTransports: 11.11.*.*:9300
esClusterName: ES
group.id: kafka2es

主类:

package com.matthew.flink;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.shaded.jackson2.org.yaml.snakeyaml.Yaml;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.elasticsearch5.ElasticsearchSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.InputStream;
import java.util.*;

/**
 * Created by developer on 7/16/18.
 */

public class Kafka2Es {

    private static final Logger logger = LoggerFactory.getLogger(Kafka2Es.class);

    public static void main(String[] args) {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        InputStream inputStream = Thread.currentThread().getContextClassLoader().getResourceAsStream("config.yml");
        Map<String, String> config = new Yaml().loadAs(inputStream, Map.class);
        IOUtils.closeStream(inputStream);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", config.get("bootstrapServer"));
        properties.setProperty("zookeeper.connect", config.get("zookeeperConnect"));
        properties.setProperty("group.id", config.get("group.id"));
        List<String> topics = Arrays.asList(config.get("topics").split(","));
      //  List<String> topics = Arrays.asList(args);

        FlinkKafkaConsumer010 kafkaConsumer = new FlinkKafkaConsumer010(topics, new SimpleStringSchema(), properties);
        
        DataStream<String> stream = env.addSource(kafkaConsumer).name("KafkaConsumer");

        stream.print();

        try {
            String t = "";
            for (String topic : topics) {
                t = t + topic +";";
            }
            env.execute("Kafka2Es-" + t.substring(0,t.length()-1));
        } catch (Exception e) {
            logger.error(e.getLocalizedMessage());
        }


    }
}

直接intellij debug或者run即可在控制台看到日志以字符串格式输出。

(注: 遇到问题,一般官方文档摘抄下的代码大多数不会有错误,我遇到的问题是程序没有报错,一直卡着,仔细排查日志后发现是因为个人忘记配置host文件,host文件应将kafka、zk、es所有节点的主机ip都可以在运行的机器上配置好。以前spark开发也会遇到类似问题,其实不仅仅这些组件,主要是因为kafka、zk内部集群网络通信的原因,配置hosts文件、不要在这上面浪费精力。)

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇KAFKA源码阅读——FetchRequestPu.. 下一篇kafka 0.8 0.9 offset 问题

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目