TOP

Flink入门(七) 写入HDFS
2019-04-17 00:17:15 】 浏览:962
Tags:Flink 入门 写入 HDFS

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/baifanwudi/article/details/88247469

maven依赖
增加

      <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-filesystem_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

读kafka消息初始化

import com.tc.flink.conf.KafkaConfig;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.fs.StringWriter;
import org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink;
import org.apache.flink.streaming.connectors.fs.bucketing.DateTimeBucketer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

import java.time.ZoneId;
import java.util.Properties;
       ....

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);
        Properties propsConsumer = new Properties();
        propsConsumer.setProperty("bootstrap.servers", KafkaConfig.KAFKA_BROKER_LIST);
        propsConsumer.setProperty("group.id", "trafficwisdom-streaming");
        propsConsumer.put("enable.auto.commit", false);
        propsConsumer.put("max.poll.records", 1000);
        FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<String>("topic_test", new SimpleStringSchema(), propsConsumer);
        consumer.setStartFromLatest();
        DataStream<String> stream = env.addSource(consumer);

写入hdfs

  		BucketingSink<String> sink = new BucketingSink<String>("/data/twms/traffichuixing/test_topic");
        sink.setBucketer(new DateTimeBucketer<String>("yyyy-MM-dd", ZoneId.of("Asia/Shanghai")));
//        sink.setWriter(new StringWriter());
        sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,
        sink.setBatchRolloverInterval(60* 60 * 1000); // this is 60 mins
        sink.setPendingPrefix("");
        sink.setPendingSuffix("");
        sink.setInProgressPrefix(".");
        stream.addSink(sink);
        
        env.execute("SaveToHdfs");

BucketingSink默认是StringWriter所以不需要设置,
flink会根据上海时区,每天自动建立/data/twms/traffichuixing/test_topic/2919-03-06/文件夹,写入文件。
flink会先写入临时文件,再把临时文件变成正式文件。
触发是setBatchSize,setBatchRolloverInterval,满足其中一个条件就自动转变为正式文件。
我这里面设置临时文件以"."开头。具体参数可以看api设置试试
在这里插入图片描述


Flink入门(七) 写入HDFS https://www.cppentry.com/bencandy.php?fid=115&id=218937

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Hadoop-->HDFS原理总结 下一篇sqoop   把 hdfs 和关系型数..