淘先锋技术网

首页 1 2 3 4 5 6 7

1. 为什么要自定义连接器

通常我们会有这样的需求,在使用Flink SQL将指标进行聚合计算完成之后,想要写入到我们想要写入的中间件时,例如opentsdb时序数据库,可能会发现Flink官方并没有给我们提供opentsdb的连接器,这个时候我们就需要自己去定义连接器

2. 自定义连接器的步骤

  1. 创建TableFactory,子类有StreamTableSourceFactoryStreamTableSinkFactory
  2. 创建TableSinkTableSource,子类有AppendStreamTableSinkStreamTableSource
  3. 自定义校验器ConnectorDescriptorValidator
  4. 创建子类继承RichSinkFunction
  5. 在resource目录下创建META-INF/services,并且创建org.apache.flink.table.factories.TableFactory文件

在这里插入图片描述

3. 各步骤解释

这里我们以sink的角度来解释一下各个步骤,source的角度是类似的

3.1 StreamTableSinkFactory

我们需要创建自己的Factory去实现StreamTableSinkFactory,主要关注它的几个方法:createStreamTableSink(Map)requiredContext()supportedProperties():定义connector支持的配置

createStreamTableSink(Map):创建StreamTableSink

requiredContext():唯一标识这个connector的类型,即connector.type

3.2 AppendStreamTableSink

这是一个追加流,当然还有upsertStreamTableSinkRetractStreamTableSink,根据自己的需求去使用,它们之间的区别略过

consumeDataStream():这是我们重点关注的方法,这个方法用于消费数据流中的数据然后通过addSink调用RichSinkFunction,将数据进行消费

3.3 RichSinkFunction

我们自己实现的Sink方法,主要有三个方法

invoke(Row,Context):关键代码,我们在这里获取数据然后进行操作

open:一般进行初始化操作,例如初始化一些客户端如httpClient,kafkaClient

close:结束时调用,一般进行关闭操作例如客户端的关闭

3.4 ConnectorDescriptorValidator

4. 实战代码

4.1 生成数据

首先我们将数据推送到我们的Kafka中:

创建JavaBean:

public class KafkaTestBean {

    private Double bandWidth;

    private Long app_time;

    private Double packet;

    private String networkLineId;


    public KafkaTestBean(Double bandWidth, Long app_time, Double packet, String networkLineId) {
        this.bandWidth = bandWidth;
        this.app_time = app_time;
        this.packet = packet;
        this.networkLineId = networkLineId;
    }

    public Double getPacket() {
        return packet;
    }

    public void setPacket(Double packet) {
        this.packet = packet;
    }

    public KafkaTestBean() {
    }

    public Double getBandWidth() {
        return bandWidth;
    }

    public void setBandWidth(Double bandWidth) {
        this.bandWidth = bandWidth;
    }

    public Long getApp_time() {
        return app_time;
    }

    public void setApp_time(Long app_time) {
        this.app_time = app_time;
    }

    public String getNetworkLineId() {
        return networkLineId;
    }

    public void setNetworkLineId(String networkLineId) {
        this.networkLineId = networkLineId;
    }
}

Kafka消息生成器:

public class KafkaMessageGenerator {

    public static void main(String[] args) throws Exception{
        //配置信息
        Properties props = new Properties();
        //kafka服务器地址
        props.put("bootstrap.servers", "192.168.245.11:9092");
        //设置数据key和value的序列化处理类
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //创建生产者实例
        KafkaProducer<String,String> producer = new KafkaProducer<>(props);

        while (true){
            KafkaTestBean bean = new KafkaTestBean();
            bean.setNetworkLineId(UUID.randomUUID().toString());
            bean.setBandWidth(generateBandWidth());
            bean.setApp_time(System.currentTimeMillis() / 1000);
            bean.setPacket(generateBandWidth());
            ProducerRecord<String,String> record = new ProducerRecord("firsttopic", JSONObject.toJSONString(bean));
            producer.send(record);
            Thread.sleep(1000);
        }

    }
    private static Double generateBandWidth() {
        String s1 = String.valueOf((int) ((Math.random()) * 10));
        String s2 = String.valueOf((int) ((Math.random()) * 10));
        return Double.parseDouble(s1.concat(".").concat(s2));
    }
}

4.2 实现功能

实现这么一个功能:从kafka接受数据然后进行聚合计算,写入到opentsdb中

4.2.1 创建TableFactory

package com.cxc.flink.extend;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.FormatDescriptorValidator;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.factories.StreamTableSinkFactory;
import org.apache.flink.table.factories.StreamTableSourceFactory;
import org.apache.flink.table.sinks.StreamTableSink;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.Row;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static com.cxc.flink.extend.CustomizedConnectorDescriptorValidator.*;
import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;

/**
 * create by chenxichao
 */
public class CustomizedTableSourceSinkFactory implements StreamTableSinkFactory<Row>, StreamTableSourceFactory<Row> {



    private CustomizedConnectorDescriptorValidator customizedConnectorDescriptorValidator;

    @Override
    public StreamTableSink<Row> createStreamTableSink(Map<String, String> properties) {
        customizedConnectorDescriptorValidator = new CustomizedConnectorDescriptorValidator();
        final DescriptorProperties descriptorProperties = new DescriptorProperties(true);
        descriptorProperties.putProperties(properties);
        //参数校验
        customizedConnectorDescriptorValidator.validate(descriptorProperties);
        final TableSchema schema = TableSchemaUtils.getPhysicalSchema(
                descriptorProperties.getTableSchema(Schema.SCHEMA));
        String job = descriptorProperties.getString(CONNECTOR_JOB);
        String metrics = descriptorProperties.getString(CONNECTOR_METRICS);
        String address = descriptorProperties.getString(CONNECTOR_ADDRESS);
        String format = descriptorProperties.getString(FORMAT_TYPE);
        return new CustomizedTableSink(job, metrics, address, schema,format);

    }

    @Override
    public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
        return null;
    }

    @Override
    public Map<String, String> requiredContext() {
        /**
         * 这里connector类型,通过这个配置flink有且只能discover一种connector
         */
        Map<String,String> context = new HashMap<>();
        context.put(CONNECTOR_TYPE,CONNECTOR_TYPE_VALUE_CUSTOMIZE);
        return context;
    }

    /**
     * 这里是自定义connector支持的配置
     * @return
     */
    @Override
    public List<String> supportedProperties() {
        List<String> supportProperties = new ArrayList<>();
        supportProperties.add(CONNECTOR_JOB);
        supportProperties.add(CONNECTOR_METRICS);

        supportProperties.add(CONNECTOR_ADDRESS);
        //schema
        supportProperties.add(Schema.SCHEMA + ".#." + Schema.SCHEMA_DATA_TYPE);
        supportProperties.add(Schema.SCHEMA + ".#." + Schema.SCHEMA_NAME);
        supportProperties.add(FormatDescriptorValidator.FORMAT_TYPE);
        return supportProperties;
    }

}

4.2.2 创建StreamTableSink

package com.cxc.flink.extend;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.TableConnectorUtils;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

import java.util.Arrays;

/**
 * create by chenxichao
 */
public class CustomizedTableSink implements AppendStreamTableSink<Row> {

    private final String job;
    private final String metrics;
    private final String address;
    private final TableSchema schema;
    private final String formatType;

    public CustomizedTableSink(String job, String metrics, String address, TableSchema schema,String formatType) {
        this.job = Preconditions.checkNotNull(job, "job must not be null.");
        this.metrics = Preconditions.checkNotNull(metrics, "metrics must not be null.");
        this.address = Preconditions.checkNotNull(address, "address must not be null.");
        this.schema = TableSchemaUtils.checkNoGeneratedColumns(schema);
        this.formatType = Preconditions.checkNotNull(formatType,"formatType must not be null");
    }

    @Override
    public void emitDataStream(DataStream<Row> dataStream) {

    }

    @Override
    public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
        return dataStream.addSink(new CustomizedSinkFunction(this.address)).setParallelism(dataStream.getParallelism())
                .name(TableConnectorUtils.generateRuntimeName(this.getClass(),getFieldNames()));
    }

    @Override
    public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
        if (!Arrays.equals(getFieldNames(), fieldNames) || !Arrays.equals(getFieldTypes(), fieldTypes)) {
            throw new ValidationException("Reconfiguration with different fields is not allowed. " +
                    "Expected: " + Arrays.toString(getFieldNames()) + " / " + Arrays.toString(getFieldTypes()) + ". " +
                    "But was: " + Arrays.toString(fieldNames) + " / " + Arrays.toString(fieldTypes));
        }
        return this;
    }

    @Override
    public TypeInformation<Row> getOutputType() {
        return schema.toRowType();
    }

    @Override
    public String[] getFieldNames() {
        return schema.getFieldNames();
    }

    @Override
    public TypeInformation<?>[] getFieldTypes() {
        return schema.getFieldTypes();
    }
}

4.2.3 创建RickSinkFunction

package com.cxc.flink.extend;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.json.JsonRowSerializationSchema;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.types.Row;

/**
 * create by chenxichao
 */
public class CustomizedSinkFunction extends RichSinkFunction<Row> {

    private String address;

    public CustomizedSinkFunction(String address){
        this.address  = address;
    }

    @Override
    public void invoke(Row value, Context context) {
        //打印即可
        System.out.println("send to " + address + "---" + value);
    }
}

4.2.4 创建ConnectorDescriptorValidator

package com.cxc.flink.extend;

import org.apache.flink.table.descriptors.ConnectorDescriptorValidator;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * create by chenxichao
 */
public class CustomizedConnectorDescriptorValidator extends ConnectorDescriptorValidator {
    public static final String CONNECTOR_JOB = "connector.job";
    public static final String CONNECTOR_METRICS = "connector.metrics";
    public static final String CONNECTOR_ADDRESS = "connector.address";
    public static final String CONNECTOR_TYPE_VALUE_CUSTOMIZE = "customize";
    public static final String FORMAT_TYPE = "format.type";
  
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public void validate(DescriptorProperties properties) {
        /**
         * 这里对连接属性进行校验
         */
        logger.info("开始校验连接器参数");
        super.validate(properties);
        logger.info("连接器参数校验完毕");
    }
}

4.2.5 创建META-INF/services

TableFactory是利用Java的SPI去发现工厂的,可以在TableServiceFactorydiscoverFactories()方法去查看源码

在目录下创建META-INF/services/org.apache.flink.table.factories.TableFactory

内容为:

com.cxc.flink.extend.CustomizedTableSourceSinkFactory

如果不创建该文件则会导致ServiceLoader找不到该工厂,使用就会报错

4.2.6 SQL代码

写一个Flink SQL测试程序,计算之后发送到我们自定义的sink中

import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;

/**
 * create by chenxichao
 */
public class SQLExtendTest {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,settings);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        //输入表的sql语句
        StringBuilder INPUT_SQL = new StringBuilder();
        INPUT_SQL.append("CREATE TABLE bandWidthInputTable (")
                .append("networkLineId VARCHAR,bandWidth DOUBLE,app_time BIGINT,packet DOUBLE,")
                .append("ts as TO_TIMESTAMP(FROM_UNIXTIME(app_time,'yyyy-MM-dd HH:mm:ss')),")
                .append("WATERMARK FOR ts AS ts - INTERVAL '5' second) ")
                .append("WITH(")
                .append("'connector.type' = 'kafka',")
                .append("'connector.version' = 'universal',")
                .append("'connector.topic' = 'firsttopic',")
                .append("'connector.properties.group.id' = 'start_log_group',")
                .append("'connector.properties.zookeeper.connect' = '192.168.245.11:2181',")
                .append("'connector.properties.bootstrap.servers' = '192.168.245.11:9092',")
                .append("'format.type' = 'json'")
                .append(")");

        tableEnv.sqlUpdate(INPUT_SQL.toString());

        //输出表的sql语句
        StringBuilder OUT_TABLE_SQL = new StringBuilder();
        OUT_TABLE_SQL.append("CREATE TABLE bandWidthOutputTable (")
                .append("metricset VARCHAR,`value` DOUBLE,`timestamp` BIGINT,networkLineId VARCHAR)")
                .append("WITH(")
                .append("'connector.type' = 'customize',")
                .append("'connector.address' = '192.168.245.138:8081',")
                .append("'connector.job' = 'testextendjob',")
                .append("'connector.metrics' = 'testmetric',")
                .append("'format.type' = 'json'")
                .append(")");

        tableEnv.sqlUpdate(OUT_TABLE_SQL.toString());

        String sql = "INSERT INTO bandWidthOutputTable SELECT 'nmct_line_metric_bandwidth' as metricset,sum(bandWidth),avg(app_time),networkLineId FROM bandWidthInputTable GROUP BY tumble(ts,interval '5' second),networkLineId";
        tableEnv.sqlUpdate(sql);

        env.execute("window sql job");
    }
}

5. 功能扩展

上述代码我们可以实现一个小小的功能就是打印在控制台,但是我们发现我们的数据并不是一个标准的JSON格式,在很多场景中我们都需要我们的数据是JSON,那么如何实现呢?

这里通过翻阅Flink kafka连接器,发现一个SerializationSchema接口,这样就很简单了,找到实现类JsonRowSerializationSchema 它内部利用jackson进行json序列化,直接使用即可

5.1 修改RichSinkFunction

在invoke方法中处理数据然后进行序列化操作

package com.cxc.flink.extend;

import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.json.JsonRowSerializationSchema;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.types.Row;

/**
 * create by chenxichao
 */
public class CustomizedSinkFunction extends RichSinkFunction<Row> {

    private String address;

    private SerializationSchema<Row> serializationSchema;

    public CustomizedSinkFunction(TypeInformation<Row> rowType,String address,String formatType){
        this.address  = address;
        if(formatType.equals("json")){
            this.serializationSchema = new JsonRowSerializationSchema.Builder(rowType).build();
        }else{
            throw new RuntimeException("current custom format only support json serializer");
        }
    }

    @Override
    public void invoke(Row value, Context context) {
        //打印即可
        byte[] serialize = this.serializationSchema.serialize(value);
        String jsonValue = new String(serialize);
        System.out.println("send to " + address + "---" + jsonValue);
    }

}

5.2 修改StreamTableSink

创建SinkFunction的时候将数据schema传入,为了生成jsonNode

package com.cxc.flink.extend;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.utils.TableConnectorUtils;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

import java.util.Arrays;

/**
 * create by chenxichao
 */
public class CustomizedTableSink implements AppendStreamTableSink<Row> {

    private final String job;
    private final String metrics;
    private final String address;
    private final TableSchema schema;
    private final String formatType;

    public CustomizedTableSink(String job, String metrics, String address, TableSchema schema,String formatType) {
        this.job = Preconditions.checkNotNull(job, "job must not be null.");
        this.metrics = Preconditions.checkNotNull(metrics, "metrics must not be null.");
        this.address = Preconditions.checkNotNull(address, "address must not be null.");
        this.schema = TableSchemaUtils.checkNoGeneratedColumns(schema);
        this.formatType = Preconditions.checkNotNull(formatType,"formatType must not be null");
    }

    @Override
    public void emitDataStream(DataStream<Row> dataStream) {

    }

    @Override
    public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
        return dataStream.addSink(new CustomizedSinkFunction(this.schema.toRowType(),this.address,formatType)).setParallelism(dataStream.getParallelism())
                .name(TableConnectorUtils.generateRuntimeName(this.getClass(),getFieldNames()));
    }

    @Override
    public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
        if (!Arrays.equals(getFieldNames(), fieldNames) || !Arrays.equals(getFieldTypes(), fieldTypes)) {
            throw new ValidationException("Reconfiguration with different fields is not allowed. " +
                    "Expected: " + Arrays.toString(getFieldNames()) + " / " + Arrays.toString(getFieldTypes()) + ". " +
                    "But was: " + Arrays.toString(fieldNames) + " / " + Arrays.toString(fieldTypes));
        }
        return this;
    }

    @Override
    public TypeInformation<Row> getOutputType() {
        return schema.toRowType();
    }

    @Override
    public String[] getFieldNames() {
        return schema.getFieldNames();
    }

    @Override
    public TypeInformation<?>[] getFieldTypes() {
        return schema.getFieldTypes();
    }
}