TOP

使用Flink新的Kafka Connector API读取Kafka Json格式数据
2018-12-29 14:25:32 】 浏览:933
Tags:使用 Flink Kafka Connector API 读取 Json 格式 数据

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/lxhandlbb/article/details/83449399

新的API比较好用。设置一个Json的Format,设置一个schema。和spark的方式有点像。读取,就完事了。自动注册成相应schema的表。
更多内容参考:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/connect.html#kafka-connector

package org.apache.flink.streaming.scala.examples.kafka

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.{TableEnvironment, Types}
import org.apache.flink.table.descriptors.{Json, Kafka, Rowtime, Schema}
import org.apache.flink.types.Row
import org.apache.flink.streaming.api.scala._

object KafkaJsonConnector {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // create a TableEnvironment for streaming queries
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    tableEnv
      .connect(
        new Kafka()
          .version("0.10")
          .topic("SM_USER_PROFILE")
          .startFromEarliest()
          .property("zookeeper.connect", "localhost:2181")
          .property("bootstrap.servers", "localhost:9092"))
      .withFormat(
        new Json()
          .deriveSchema()
      )
      .withSchema(
        new Schema()
          .field("COD_USERNO","string")
          .field("COD_USER_ID","string")
      )
      .inAppendMode()
      .registerTableSource("sm_user")

    val stream = tableEnv.scan("sm_user")
    tableEnv.toAppendStream[Row](stream).print().setParallelism(1)
    env.execute("example")
  }

}


使用Flink新的Kafka Connector API读取Kafka Json格式数据 https://www.cppentry.com/bencandy.php?fid=120&id=201476

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇kafka踩坑之消费者收不到消息 下一篇ActiveMQ、RabbitMQ和Kafka的简单..