前言
这周主要是学习使用Flink, 其中有一部分学习的内容就是生成parquet。 Flink自身提供的文档写了个大概,但是真要自己动手去生成pqrquet文件,发现还是有些小坑,本文就是记录这些坑。
开始
官方文档总是最好的开始的地方, 下面是官方文档上面的内容
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/filesystem/#file-sink
从官方文档上面看,似乎很简单, 使用FileSink, 然后设置下格式使用AvroParquetWriters就可以了。
但是按照这个设置后,连FileSink这个类都找不到。
FilkSink需要这个dependency,
AvroParquetWriters需要的是这个dependency
使用AVRO
官方文档中使用了AvroParquetWriters, 那我们就先定义一个AVRO的schema文件MarketPrice.avsc,然后生成对应的类,
{
"namespace": "com.ken.parquet",
"type": "record",
"name": "MarketPrice",
"fields": [
{"name":"performance_id", "type":"string"},
{"name":"price_as_of_date", "type":"int", "logicalType": "date"},
{"name":"open_price", "type": ["null", "double"], "default": null},
{"name":"high_price", "type": ["null", "double"], "default": null},
{"name":"low_price", "type": ["null", "double"], "default": null},
{"name":"close_price", "type": ["null", "double"], "default": null}
]
}
然后加上Maven插件, 通过这个文件来生成Java类
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
添加好后,我们使用maven, compile的时候会生成对应的Java类。
编写代码
我们这里不从外部读取了,直接用env.fromCollection, 然后输出到本地文件系统中
@Component
public class ParquetRunner implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
List<MarketPrice> marketPriceList = new ArrayList<>();
MarketPrice marketPrice = new MarketPrice();
marketPrice.setPerformanceId("123456789");
marketPrice.setPriceAsOfDate(100);
marketPrice.setOpenPrice(100d);
marketPrice.setHighPrice(120d);
marketPrice.setLowPrice(99d);
marketPrice.setClosePrice(101.1d);
marketPriceList.add(marketPrice);
DataStream<MarketPrice> marketPriceDataStream = env.fromCollection(marketPriceList);
String localPath = "C:\\temp\\flink\\";
File outputParquetFile = new File(localPath);
String localURI = outputParquetFile.toURI().toString();
Path outputPath = new Path(localURI);
final FileS