淘先锋技术网

首页 1 2 3 4 5 6 7

flink自定义source与自定义sink

flink 的source和sink即数据源和数据接收器。在原生的flink中提供了一些常用的数据源连接器,但是在日常开发中我们所使用的数据源和持久化工具是多种多样的,flink提供的 source和sink就不能满足我们的需求,这时就需要使用flink提供的接口自定义source和sink。

flink 提供的source 和 sink
flink 支持的source 和 sink
自定义MongoDB source

以鸢尾花数据集作为mongodb中的数据原
鸢尾花数据集
定义一个Iris数据类做数据转换

import com.alibaba.fastjson.JSON
 
case class Iris(
                 category: String,
                 sepalLength: Double,
                 sepalWidth: Double,
                 petalLength: Double,
                 petalWidth: Double
               )  {
}

object Iris {

  def apply(jsonString: String): Iris = {
    val jobject = JSON.parseObject(jsonString)
    new Iris(jobject.getString("class"),
      jobject.getDouble("sepalLength"),
      jobject.getDouble("sepalWidth"),
      jobject.getDouble("petalLength"),
      jobject.getDouble("petalWidth"))
  }

}

自定义 Mongodb source
mongodb-driver 的pom依赖

        <dependency>
            <groupId>org.mongodb.scala</groupId>
            <artifactId>mongo-scala-driver_2.11</artifactId>
            <version>2.5.0</version>
        </dependency>
import bean.Iris
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.mongodb.scala.{Document, MongoClient, MongoCollection, MongoDatabase}
import Helpers._
// https://github.com/mongodb/mongo-scala-driver/blob/master/examples/src/test/scala/tour/Helpers.scala
 
class MongodbSource
  extends RichSourceFunction[Seq[Iris]] {

  // mongodb url
  private val mongourl = "mongodb://root:123456@localhost:27017/admin"
  var mongoClient: MongoClient = null
  var collection: MongoCollection[Document] = null

  override def cancel(): Unit = {}

  // open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接
  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    mongoClient = MongoClient(mongourl)
    val database: MongoDatabase = mongoClient.getDatabase("datatest")
    collection = database.getCollection("iris")

  }

 // 关闭连接
  override def close(): Unit = {
    super.close()

    if (mongoClient != null) {
      mongoClient.close()
    }
  }

  // DataStream 调用  run() 方法用来获取数据
  override def run(ctx: SourceFunction.SourceContext[Seq[Iris]]) = {
    val irises = collection.find().results()
      .map(_.toJson())
      .map(Iris.apply)

    ctx.collect(irises)

  }

}

自定义 MySQL sink


import java.sql.{Connection, DriverManager, PreparedStatement}
import bean.Iris
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
 
class MysqlSink extends RichSinkFunction[Iris] {

  private var connection: Connection = null
  private var ps: PreparedStatement = null

  /**
    * 在open()方法中建立连接,这样不用每次invoke的时候都要建立连接和释放连接。
    */
  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    val driver = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://localhost:3306/bike?characterEncoding=utf-8&useSSL=true"
    val username = "root"
    val password = "123456"
    //1.加载驱动
    Class.forName(driver)
    //2.创建连接
    connection = DriverManager.getConnection(url, username, password)

    val sql = "insert into tb_iris(category,sepalLength,sepalWidth,petalLength,petalWidth,insertTime) values(?,?,?,?,?,?);"

    //3.获得执行语句
    ps = connection.prepareStatement(sql)
  }

  /**
    * 每个元素的插入都要调用一次invoke()方法进行插入操作
    */
  override def invoke(value: Iris): Unit = {
    try {
      ps.setString(1, value.category)
      ps.setDouble(2, value.sepalLength)
      ps.setDouble(3, value.sepalWidth)
      ps.setDouble(4, value.petalLength)
      ps.setDouble(5, value.petalWidth)
      ps.setLong(6, System.currentTimeMillis())
      ps.executeUpdate()
    } catch {
      case e: Exception => println(e.getMessage)
    }
  }

  /**
    * 关闭连接
    */
  override def close(): Unit = {
    super.close()
    if (connection != null) {
      connection.close()
    }
    if (ps != null) {
      ps.close()
    }
  }
}

运行主程序

  object mongoSourceTest {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

  // 获取source的数据
    val iris = env.addSource(new MongodbSource())
      .flatMap(_.toSeq)
    
    // 中间的数据处理逻辑省略
    
    iris.addSink(new MysqlSink)
    
    env.execute("Flink add mongo data sourc")

  }
}

数据结果
Mysql 数据结果