版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/lxhandlbb/article/details/83449399
新的API比较好用。设置一个Json的Format,设置一个schema。和spark的方式有点像。读取,就完事了。自动注册成相应schema的表。
更多内容参考:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/connect.html#kafka-connector
package org.apache.flink.streaming.scala.examples.kafka
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{TableEnvironment, Types}
import org.apache.flink.table.descriptors.{Json, Kafka, Rowtime, Schema}
import org.apache.flink.types.Row
import org.apache.flink.streaming.api.scala._
object KafkaJsonConnector {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
tableEnv
.connect(
new Kafka()
.version("0.10")
.topic("SM_USER_PROFILE")
.startFromEarliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092"))
.withFormat(
new Json()
.deriveSchema()
)
.withSchema(
new Schema()
.field("COD_USERNO","string")
.field("COD_USER_ID","string")
)
.inAppendMode()
.registerTableSource("sm_user")
val stream = tableEnv.scan("sm_user")
tableEnv.toAppendStream[Row](stream).print().setParallelism(1)
env.execute("example")
}
}