1. 为什么要自定义连接器
通常我们会有这样的需求,在使用Flink SQL将指标进行聚合计算完成之后,想要写入到我们想要写入的中间件时,例如opentsdb
时序数据库,可能会发现Flink官方并没有给我们提供opentsdb
的连接器,这个时候我们就需要自己去定义连接器
2. 自定义连接器的步骤
- 创建
TableFactory
,子类有StreamTableSourceFactory
和StreamTableSinkFactory
- 创建
TableSink
和TableSource
,子类有AppendStreamTableSink
和StreamTableSource
- 自定义校验器
ConnectorDescriptorValidator
- 创建子类继承
RichSinkFunction
- 在resource目录下创建
META-INF/services
,并且创建org.apache.flink.table.factories.TableFactory
文件
3. 各步骤解释
这里我们以sink的角度来解释一下各个步骤,source的角度是类似的
3.1 StreamTableSinkFactory
我们需要创建自己的Factory去实现StreamTableSinkFactory
,主要关注它的几个方法:createStreamTableSink(Map)
,requiredContext()
,supportedProperties()
:定义connector支持的配置
createStreamTableSink(Map)
:创建StreamTableSink
requiredContext()
:唯一标识这个connector的类型,即connector.type
3.2 AppendStreamTableSink
这是一个追加流,当然还有upsertStreamTableSink
和RetractStreamTableSink
,根据自己的需求去使用,它们之间的区别略过
consumeDataStream()
:这是我们重点关注的方法,这个方法用于消费数据流中的数据然后通过addSink
调用RichSinkFunction
,将数据进行消费
3.3 RichSinkFunction
我们自己实现的Sink方法,主要有三个方法
invoke(Row,Context)
:关键代码,我们在这里获取数据然后进行操作
open
:一般进行初始化操作,例如初始化一些客户端如httpClient,kafkaClient
close
:结束时调用,一般进行关闭操作例如客户端的关闭
3.4 ConnectorDescriptorValidator
4. 实战代码
4.1 生成数据
首先我们将数据推送到我们的Kafka中:
创建JavaBean:
public class KafkaTestBean {
private Double bandWidth;
private Long app_time;
private Double packet;
private String networkLineId;
public KafkaTestBean(Double bandWidth, Long app_time, Double packet, String networkLineId) {
this.bandWidth = bandWidth;
this.app_time = app_time;
this.packet = packet;
this.networkLineId = networkLineId;
}
public Double getPacket() {
return packet;
}
public void setPacket(Double packet) {
this.packet = packet;
}
public KafkaTestBean() {
}
public Double getBandWidth() {
return bandWidth;
}
public void setBandWidth(Double bandWidth) {
this.bandWidth = bandWidth;
}
public Long getApp_time() {
return app_time;
}
public void setApp_time(Long app_time) {
this.app_time = app_time;
}
public String getNetworkLineId() {
return networkLineId;
}
public void setNetworkLineId(String networkLineId) {
this.networkLineId = networkLineId;
}
}
Kafka消息生成器:
public class KafkaMessageGenerator {
public static void main(String[] args) throws Exception{
//配置信息
Properties props = new Properties();
//kafka服务器地址
props.put("bootstrap.servers", "192.168.245.11:9092");
//设置数据key和value的序列化处理类
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//创建生产者实例
KafkaProducer<String,String> producer = new KafkaProducer<>(props);
while (true){
KafkaTestBean bean = new KafkaTestBean();
bean.setNetworkLineId(UUID.randomUUID().toString());
bean.setBandWidth(generateBandWidth());
bean.setApp_time(System.currentTimeMillis() / 1000);
bean.setPacket(generateBandWidth());
ProducerRecord<String,String> record = new ProducerRecord("firsttopic", JSONObject.toJSONString(bean));
producer.send(record);
Thread.sleep(1000);
}
}
private static Double generateBandWidth() {
String s1 = String.valueOf((int) ((Math.random()) * 10));
String s2 = String.valueOf((int) ((Math.random()) * 10));
return Double.parseDouble(s1.concat(".").concat(s2));
}
}
4.2 实现功能
实现这么一个功能:从kafka接受数据然后进行聚合计算,写入到opentsdb中
4.2.1 创建TableFactory
package com.cxc.flink.extend;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.FormatDescriptorValidator;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.factories.StreamTableSinkFactory;
import org.apache.flink.table.factories.StreamTableSourceFactory;
import org.apache.flink.table.sinks.StreamTableSink;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.Row;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static com.cxc.flink.extend.CustomizedConnectorDescriptorValidator.*;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
/**
* create by chenxichao
*/
public class CustomizedTableSourceSinkFactory implements StreamTableSinkFactory<Row>, StreamTableSourceFactory<Row> {
private CustomizedConnectorDescriptorValidator customizedConnectorDescriptorValidator;
@Override
public StreamTableSink<Row> createStreamTableSink(Map<String, String> properties) {
customizedConnectorDescriptorValidator = new CustomizedConnectorDescriptorValidator();
final DescriptorProperties descriptorProperties = new DescriptorProperties(true);
descriptorProperties.putProperties(properties);
//参数校验
customizedConnectorDescriptorValidator.validate(descriptorProperties);
final TableSchema schema = TableSchemaUtils.getPhysicalSchema(
descriptorProperties.getTableSchema(Schema.SCHEMA));
String job = descriptorProperties.getString(CONNECTOR_JOB);
String metrics = descriptorProperties.getString(CONNECTOR_METRICS);
String address = descriptorProperties.getString(CONNECTOR_ADDRESS);
String format = descriptorProperties.getString(FORMAT_TYPE);
return new CustomizedTableSink(job, metrics, address, schema,format);
}
@Override
public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
return null;
}
@Override
public Map<String, String> requiredContext() {
/**
* 这里connector类型,通过这个配置flink有且只能discover一种connector
*/
Map<String,String> context = new HashMap<>();
context.put(CONNECTOR_TYPE,CONNECTOR_TYPE_VALUE_CUSTOMIZE);
return context;
}
/**
* 这里是自定义connector支持的配置
* @return
*/
@Override
public List<String> supportedProperties() {
List<String> supportProperties = new ArrayList<>();
supportProperties.add(CONNECTOR_JOB);
supportProperties.add(CONNECTOR_METRICS);
supportProperties.add(CONNECTOR_ADDRESS);
//schema
supportProperties.add(Schema.SCHEMA + ".#." + Schema.SCHEMA_DATA_TYPE);
supportProperties.add(Schema.SCHEMA + ".#." + Schema.SCHEMA_NAME);
supportProperties.add(FormatDescriptorValidator.FORMAT_TYPE);
return supportProperties;
}
}
4.2.2 创建StreamTableSink
package com.cxc.flink.extend;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.TableConnectorUtils;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import java.util.Arrays;
/**
* create by chenxichao
*/
public class CustomizedTableSink implements AppendStreamTableSink<Row> {
private final String job;
private final String metrics;
private final String address;
private final TableSchema schema;
private final String formatType;
public CustomizedTableSink(String job, String metrics, String address, TableSchema schema,String formatType) {
this.job = Preconditions.checkNotNull(job, "job must not be null.");
this.metrics = Preconditions.checkNotNull(metrics, "metrics must not be null.");
this.address = Preconditions.checkNotNull(address, "address must not be null.");
this.schema = TableSchemaUtils.checkNoGeneratedColumns(schema);
this.formatType = Preconditions.checkNotNull(formatType,"formatType must not be null");
}
@Override
public void emitDataStream(DataStream<Row> dataStream) {
}
@Override
public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
return dataStream.addSink(new CustomizedSinkFunction(this.address)).setParallelism(dataStream.getParallelism())
.name(TableConnectorUtils.generateRuntimeName(this.getClass(),getFieldNames()));
}
@Override
public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
if (!Arrays.equals(getFieldNames(), fieldNames) || !Arrays.equals(getFieldTypes(), fieldTypes)) {
throw new ValidationException("Reconfiguration with different fields is not allowed. " +
"Expected: " + Arrays.toString(getFieldNames()) + " / " + Arrays.toString(getFieldTypes()) + ". " +
"But was: " + Arrays.toString(fieldNames) + " / " + Arrays.toString(fieldTypes));
}
return this;
}
@Override
public TypeInformation<Row> getOutputType() {
return schema.toRowType();
}
@Override
public String[] getFieldNames() {
return schema.getFieldNames();
}
@Override
public TypeInformation<?>[] getFieldTypes() {
return schema.getFieldTypes();
}
}
4.2.3 创建RickSinkFunction
package com.cxc.flink.extend;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.json.JsonRowSerializationSchema;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.types.Row;
/**
* create by chenxichao
*/
public class CustomizedSinkFunction extends RichSinkFunction<Row> {
private String address;
public CustomizedSinkFunction(String address){
this.address = address;
}
@Override
public void invoke(Row value, Context context) {
//打印即可
System.out.println("send to " + address + "---" + value);
}
}
4.2.4 创建ConnectorDescriptorValidator
package com.cxc.flink.extend;
import org.apache.flink.table.descriptors.ConnectorDescriptorValidator;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* create by chenxichao
*/
public class CustomizedConnectorDescriptorValidator extends ConnectorDescriptorValidator {
public static final String CONNECTOR_JOB = "connector.job";
public static final String CONNECTOR_METRICS = "connector.metrics";
public static final String CONNECTOR_ADDRESS = "connector.address";
public static final String CONNECTOR_TYPE_VALUE_CUSTOMIZE = "customize";
public static final String FORMAT_TYPE = "format.type";
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public void validate(DescriptorProperties properties) {
/**
* 这里对连接属性进行校验
*/
logger.info("开始校验连接器参数");
super.validate(properties);
logger.info("连接器参数校验完毕");
}
}
4.2.5 创建META-INF/services
TableFactory是利用Java的SPI去发现工厂的,可以在TableServiceFactory
的discoverFactories()
方法去查看源码
在目录下创建META-INF/services/org.apache.flink.table.factories.TableFactory
内容为:
com.cxc.flink.extend.CustomizedTableSourceSinkFactory
如果不创建该文件则会导致ServiceLoader
找不到该工厂,使用就会报错
4.2.6 SQL代码
写一个Flink SQL测试程序,计算之后发送到我们自定义的sink中
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;
/**
* create by chenxichao
*/
public class SQLExtendTest {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,settings);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//输入表的sql语句
StringBuilder INPUT_SQL = new StringBuilder();
INPUT_SQL.append("CREATE TABLE bandWidthInputTable (")
.append("networkLineId VARCHAR,bandWidth DOUBLE,app_time BIGINT,packet DOUBLE,")
.append("ts as TO_TIMESTAMP(FROM_UNIXTIME(app_time,'yyyy-MM-dd HH:mm:ss')),")
.append("WATERMARK FOR ts AS ts - INTERVAL '5' second) ")
.append("WITH(")
.append("'connector.type' = 'kafka',")
.append("'connector.version' = 'universal',")
.append("'connector.topic' = 'firsttopic',")
.append("'connector.properties.group.id' = 'start_log_group',")
.append("'connector.properties.zookeeper.connect' = '192.168.245.11:2181',")
.append("'connector.properties.bootstrap.servers' = '192.168.245.11:9092',")
.append("'format.type' = 'json'")
.append(")");
tableEnv.sqlUpdate(INPUT_SQL.toString());
//输出表的sql语句
StringBuilder OUT_TABLE_SQL = new StringBuilder();
OUT_TABLE_SQL.append("CREATE TABLE bandWidthOutputTable (")
.append("metricset VARCHAR,`value` DOUBLE,`timestamp` BIGINT,networkLineId VARCHAR)")
.append("WITH(")
.append("'connector.type' = 'customize',")
.append("'connector.address' = '192.168.245.138:8081',")
.append("'connector.job' = 'testextendjob',")
.append("'connector.metrics' = 'testmetric',")
.append("'format.type' = 'json'")
.append(")");
tableEnv.sqlUpdate(OUT_TABLE_SQL.toString());
String sql = "INSERT INTO bandWidthOutputTable SELECT 'nmct_line_metric_bandwidth' as metricset,sum(bandWidth),avg(app_time),networkLineId FROM bandWidthInputTable GROUP BY tumble(ts,interval '5' second),networkLineId";
tableEnv.sqlUpdate(sql);
env.execute("window sql job");
}
}
5. 功能扩展
上述代码我们可以实现一个小小的功能就是打印在控制台,但是我们发现我们的数据并不是一个标准的JSON格式,在很多场景中我们都需要我们的数据是JSON,那么如何实现呢?
这里通过翻阅Flink kafka连接器,发现一个SerializationSchema
接口,这样就很简单了,找到实现类JsonRowSerializationSchema
它内部利用jackson
进行json序列化,直接使用即可
5.1 修改RichSinkFunction
在invoke方法中处理数据然后进行序列化操作
package com.cxc.flink.extend;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.json.JsonRowSerializationSchema;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.types.Row;
/**
* create by chenxichao
*/
public class CustomizedSinkFunction extends RichSinkFunction<Row> {
private String address;
private SerializationSchema<Row> serializationSchema;
public CustomizedSinkFunction(TypeInformation<Row> rowType,String address,String formatType){
this.address = address;
if(formatType.equals("json")){
this.serializationSchema = new JsonRowSerializationSchema.Builder(rowType).build();
}else{
throw new RuntimeException("current custom format only support json serializer");
}
}
@Override
public void invoke(Row value, Context context) {
//打印即可
byte[] serialize = this.serializationSchema.serialize(value);
String jsonValue = new String(serialize);
System.out.println("send to " + address + "---" + jsonValue);
}
}
5.2 修改StreamTableSink
创建SinkFunction的时候将数据schema传入,为了生成jsonNode
package com.cxc.flink.extend;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.utils.TableConnectorUtils;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import java.util.Arrays;
/**
* create by chenxichao
*/
public class CustomizedTableSink implements AppendStreamTableSink<Row> {
private final String job;
private final String metrics;
private final String address;
private final TableSchema schema;
private final String formatType;
public CustomizedTableSink(String job, String metrics, String address, TableSchema schema,String formatType) {
this.job = Preconditions.checkNotNull(job, "job must not be null.");
this.metrics = Preconditions.checkNotNull(metrics, "metrics must not be null.");
this.address = Preconditions.checkNotNull(address, "address must not be null.");
this.schema = TableSchemaUtils.checkNoGeneratedColumns(schema);
this.formatType = Preconditions.checkNotNull(formatType,"formatType must not be null");
}
@Override
public void emitDataStream(DataStream<Row> dataStream) {
}
@Override
public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
return dataStream.addSink(new CustomizedSinkFunction(this.schema.toRowType(),this.address,formatType)).setParallelism(dataStream.getParallelism())
.name(TableConnectorUtils.generateRuntimeName(this.getClass(),getFieldNames()));
}
@Override
public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
if (!Arrays.equals(getFieldNames(), fieldNames) || !Arrays.equals(getFieldTypes(), fieldTypes)) {
throw new ValidationException("Reconfiguration with different fields is not allowed. " +
"Expected: " + Arrays.toString(getFieldNames()) + " / " + Arrays.toString(getFieldTypes()) + ". " +
"But was: " + Arrays.toString(fieldNames) + " / " + Arrays.toString(fieldTypes));
}
return this;
}
@Override
public TypeInformation<Row> getOutputType() {
return schema.toRowType();
}
@Override
public String[] getFieldNames() {
return schema.getFieldNames();
}
@Override
public TypeInformation<?>[] getFieldTypes() {
return schema.getFieldTypes();
}
}