设为首页 加入收藏

TOP

Flink 生成ParquetFile(一)
2023-07-25 21:32:58 】 浏览:58
Tags:Flink 生成 ParquetFile

前言

这周主要是学习使用Flink, 其中有一部分学习的内容就是生成parquet。 Flink自身提供的文档写了个大概,但是真要自己动手去生成pqrquet文件,发现还是有些小坑,本文就是记录这些坑。

开始

官方文档总是最好的开始的地方, 下面是官方文档上面的内容
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/filesystem/#file-sink
image
从官方文档上面看,似乎很简单, 使用FileSink, 然后设置下格式使用AvroParquetWriters就可以了。
但是按照这个设置后,连FileSink这个类都找不到。
FilkSink需要这个dependency,

org.apache.flink
flink-connector-files
${flink.version}

AvroParquetWriters需要的是这个dependency

org.apache.flink
flink-parquet
${flink.version}
provided

使用AVRO

官方文档中使用了AvroParquetWriters, 那我们就先定义一个AVRO的schema文件MarketPrice.avsc,然后生成对应的类,

{
  "namespace": "com.ken.parquet",
  "type": "record",
  "name": "MarketPrice",
  "fields": [
    {"name":"performance_id", "type":"string"},
    {"name":"price_as_of_date", "type":"int", "logicalType": "date"},
    {"name":"open_price", "type": ["null", "double"], "default": null},
    {"name":"high_price", "type": ["null", "double"], "default": null},
    {"name":"low_price", "type": ["null", "double"], "default": null},
    {"name":"close_price", "type": ["null", "double"], "default": null}
  ]
}

然后加上Maven插件, 通过这个文件来生成Java类

            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>${avro.version}</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
                            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

添加好后,我们使用maven, compile的时候会生成对应的Java类。

编写代码

我们这里不从外部读取了,直接用env.fromCollection, 然后输出到本地文件系统中


@Component
public class ParquetRunner implements ApplicationRunner {
    @Override
    public void run(ApplicationArguments args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        List<MarketPrice> marketPriceList = new ArrayList<>();
        MarketPrice marketPrice = new MarketPrice();
        marketPrice.setPerformanceId("123456789");
        marketPrice.setPriceAsOfDate(100);
        marketPrice.setOpenPrice(100d);
        marketPrice.setHighPrice(120d);
        marketPrice.setLowPrice(99d);
        marketPrice.setClosePrice(101.1d);
        marketPriceList.add(marketPrice);

        DataStream<MarketPrice>  marketPriceDataStream = env.fromCollection(marketPriceList);

        String localPath = "C:\\temp\\flink\\";

        File outputParquetFile = new File(localPath);

        String localURI = outputParquetFile.toURI().toString();
        Path outputPath = new Path(localURI);

        final FileS
首页 上一页 1 2 下一页 尾页 1/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇day01-Tomcat框架分析 下一篇【算法数据结构专题】「延时队列..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目