一、简介
Flink 1.9以后新增一个SQL DDL 的新特性,本文介绍从kafka消费基本操作如下。
二、版本及创建表
插件版本: flink 1.10.1 kafka 0.9
DDL建表:
CREATE TABLE source_table (
id BIGINT,
name STRING,
score BIGINT
) WITH (
'connector.type' = 'kafka', 使用kafka connector
'connector.version' = 'universal', kafka版本,universal支持0.11以上的版本
'connector.topic' = 'flink-ddl-test', topic
'connector.properties.zookeeper.connect' = 'localhost:2181', zookeeper地址
'connector.properties.bootstrap.servers' = 'localhost:6667', broker service的地址
'format.type' = 'json' 数据格式
);
建表实例:
CREATE TABLE KAFKA_DATA (
content VARCHAR COMMENT '具体信息',
eventType VARCHAR COMMENT '数据类型',
id VARCHAR COMMENT 'ID号',
schemaName VARCHAR COMMENT '库名',
tableName VARCHAR COMMENT '表名'
)
WITH (
'connector.type' = 'kafka', -- 使用 kafka connector
'connector.version' = '0.9', -- kafka 版本
'connector.topic' = 'bigfrog', -- kafka topic
'connector.startup-mode' = 'earliest-offset', -- 从最新的 offset 开始读取
'connector.properties.0.key' = 'zookeeper.connect', -- 连接信息
'connector.properties.0.value' = 'node-3,node-4,node-5:2181/kafka02',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = 'node-3:9092,node-4:9092,node-5:9092',
'update-mode' = 'append',
'format.type' = 'json', -- 数据源格式为 json
'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则
)
上述基本语法是 create table () with ()
with 后面是一些基本的属性,比如 connector.type 描述了 从 kafka 中读取数据
connector.version 描述了 使用的是哪个版本的 kafka,如果设置universal支持0.11以上的版本
connector.topic 描述了 从 哪个 topic 中读取数据
connector.startup-mode 描述了 从 哪个位置开始读取数据 分为 earliest-offset 、latest-offset等。
三、实例代码
package com.china.qjd.DataSet.flinksql
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.scala._
import org.apache.flink.types.Row
object FlinkKafkaDDLDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(3)
val settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
val tEnv = StreamTableEnvironment.create(env, settings)
val createTable =
"""
|CREATE TABLE KAFKA_DATA (
| content VARCHAR COMMENT '具体信息',
| eventType VARCHAR COMMENT '数据类型',
| id VARCHAR COMMENT 'ID号',
| schemaName VARCHAR COMMENT '库名',
| tableName VARCHAR COMMENT '表名'
|)
|WITH (
| 'connector.type' = 'kafka', -- 使用 kafka connector
| 'connector.version' = '0.9', -- kafka 版本
| 'connector.topic' = 'bigfrog', -- kafka topic
| 'connector.startup-mode' = 'earliest-offset', -- 从最新的 offset 开始读取
| 'connector.properties.0.key' = 'zookeeper.connect', -- 连接信息
| 'connector.properties.0.value' = 'node-3,node-4,node-5:2181/kafka02',
| 'connector.properties.1.key' = 'bootstrap.servers',
| 'connector.properties.1.value' = 'node-3:9092,node-4:9092,node-5:9092',
| 'update-mode' = 'append',
| 'format.type' = 'json', -- 数据源格式为 json
| 'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则
|)
""".stripMargin
tEnv.sqlUpdate(createTable)
val query =
"""
|SELECT content FROM KAFKA_DATA
""".stripMargin
val result = tEnv.sqlQuery(query)
result.toRetractStream[Row].print()
tEnv.execute("Flink SQL DDL")
}
}
以下是本地DDL结果
集群测试
这个问题一直困扰了我好几天网上一篇答案感觉都不是自己想要的那个答案,那只能自己好好分析一下。重新拷贝相关jar包到flink安装目录lib下
IDEA打包
或许这两个jar包也不需要,重新导入flink安装目录lib下的jar包后,重新再yarn上申请运行环境
yarn-session.sh -d -n 2 -nm QJD_FLINK
flink run -yid application_1603198127218_59530 --class com.china.qjd.DataSet.flinksql.FlinkKafkaDDLDemo /opt/learning_flink.jar
接下来flink WebUi查看运行结果
困扰几天的问题终于迎刃而解了,新版本的升级后API确实有很大的改动,虽然历程很心酸,但是还是很开心,时间仓促如有错误恳请大家原谅