淘先锋技术网

首页 1 2 3 4 5 6 7

一、简介

      Flink 1.9以后新增一个SQL DDL 的新特性,本文介绍从kafka消费基本操作如下。

二、版本及创建表

插件版本: flink 1.10.1   kafka 0.9

DDL建表:

CREATE TABLE source_table (
id BIGINT,
name STRING,
score BIGINT
) WITH (
'connector.type' = 'kafka', 使用kafka connector
'connector.version' = 'universal', kafka版本,universal支持0.11以上的版本
'connector.topic' = 'flink-ddl-test', topic
'connector.properties.zookeeper.connect' = 'localhost:2181', zookeeper地址
'connector.properties.bootstrap.servers' = 'localhost:6667', broker service的地址
'format.type' = 'json' 数据格式
);

建表实例:

CREATE TABLE KAFKA_DATA (
    content VARCHAR COMMENT '具体信息',
    eventType VARCHAR COMMENT '数据类型',
    id VARCHAR COMMENT 'ID号',
    schemaName VARCHAR COMMENT '库名',
    tableName VARCHAR COMMENT '表名'
)
WITH (
    'connector.type' = 'kafka', -- 使用 kafka connector
    'connector.version' = '0.9',  -- kafka 版本
    'connector.topic' = 'bigfrog',  -- kafka topic
    'connector.startup-mode' = 'earliest-offset', -- 从最新的 offset 开始读取
    'connector.properties.0.key' = 'zookeeper.connect',  -- 连接信息
 'connector.properties.0.value' = 'node-3,node-4,node-5:2181/kafka02',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = 'node-3:9092,node-4:9092,node-5:9092',

'update-mode' = 'append',
 'format.type' = 'json',  -- 数据源格式为 json
 'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则
)

上述基本语法是 create table () with ()
with 后面是一些基本的属性,比如 connector.type 描述了 从 kafka 中读取数据
connector.version 描述了 使用的是哪个版本的 kafka,如果设置universal支持0.11以上的版本

connector.topic 描述了 从 哪个 topic 中读取数据
connector.startup-mode 描述了 从 哪个位置开始读取数据 分为 earliest-offset 、latest-offset等。

三、实例代码

     package com.china.qjd.DataSet.flinksql
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row

object FlinkKafkaDDLDemo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(3)
    val settings = EnvironmentSettings.newInstance()
      .useBlinkPlanner()
      .inStreamingMode()
      .build()
    val tEnv = StreamTableEnvironment.create(env, settings)
    val createTable =
      """
        |CREATE TABLE KAFKA_DATA (
        |    content VARCHAR COMMENT '具体信息',
        |    eventType VARCHAR COMMENT '数据类型',
        |    id VARCHAR COMMENT 'ID号',
        |    schemaName VARCHAR COMMENT '库名',
        |    tableName VARCHAR COMMENT '表名'
        |)
        |WITH (
        |    'connector.type' = 'kafka', -- 使用 kafka connector
        |    'connector.version' = '0.9',  -- kafka 版本
        |    'connector.topic' = 'bigfrog',  -- kafka topic
        |    'connector.startup-mode' = 'earliest-offset', -- 从最新的 offset 开始读取
        |    'connector.properties.0.key' = 'zookeeper.connect',  -- 连接信息
   |    'connector.properties.0.value' = 'node-3,node-4,node-5:2181/kafka02',
        |    'connector.properties.1.key' = 'bootstrap.servers',
     |    'connector.properties.1.value' = 'node-3:9092,node-4:9092,node-5:9092',
        |    'update-mode' = 'append',
        |    'format.type' = 'json',  -- 数据源格式为 json
        |    'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则
        |)
            """.stripMargin
    tEnv.sqlUpdate(createTable)
    val query =
      """
        |SELECT content FROM KAFKA_DATA
            """.stripMargin
    val result = tEnv.sqlQuery(query)
    result.toRetractStream[Row].print()
    tEnv.execute("Flink SQL DDL")
  }
}

以下是本地DDL结果

01c0d4aa03e6d641e4e0197b1d3d068a.png

集群测试

75ac8c788572812bf04cd7246035c5a1.png

这个问题一直困扰了我好几天网上一篇答案感觉都不是自己想要的那个答案,那只能自己好好分析一下。重新拷贝相关jar包到flink安装目录lib下

0fff7c5d6df27244fe7e02ce875d7561.png

IDEA打包

28d4b734cc09cb6880e46fd31b8fa752.png

或许这两个jar包也不需要,重新导入flink安装目录lib下的jar包后,重新再yarn上申请运行环境 

yarn-session.sh -d -n 2  -nm QJD_FLINK

flink  run -yid application_1603198127218_59530  --class com.china.qjd.DataSet.flinksql.FlinkKafkaDDLDemo /opt/learning_flink.jar

接下来flink WebUi查看运行结果

c77d5b54faba13215f0006b3cd1c133f.png

困扰几天的问题终于迎刃而解了,新版本的升级后API确实有很大的改动,虽然历程很心酸,但是还是很开心0417f246000b389fe0f69b5e85348a2c.png,时间仓促如有错误恳请大家原谅c95b95cc39f51314587b587e6b07d41e.png