淘先锋技术网

首页 1 2 3 4 5 6 7

Flink 中的EventTime详细概念 及示例代码

gym02

Flink时间窗口的计算中,支持多种时间的概念:Processsing,IngestionTime,EventTime。

如果在Flink中用户不做任何设置,默认使用的是ProcesssingTime,其中ProcesssingTime,IngestionTime都是由计算节点产生。不同的是IngestionTime是DataSource组件在产生记录的时候指定时间,而ProcesssingTime记录抵达计算算子的时间,由于以上两种时间都是系统自动产生,因此使用起来难度较低,用户无需关心时钟问题,也不会出现迟到的数据。

但是以上两种时间不能够准确的表达数据产生的实际时间,因此一般来说如果系统对事件的概念比较苛刻,这个时候就推荐使用EventTime。所谓的EventTime指的是数据产生的实际时间,系统不在参考计算机节点的系统时钟。

虽然EventTime可以准确地表达事件所在的窗口,但是由于提取的是事件时间是嵌入在数据记录内部的时间,因此可能会因为网络延迟或者故障等因素导致数据不能按照产生的时间顺序抵达计算节点,这就为窗口关闭产生问题。因为计算节点并不知道何时该关闭该窗口。

Watermarker
因此在基于EventTime语义的窗口计算提出Watermarker概念,该概念用于告知计算节点目前的系统时钟,一旦水位线越过该窗口endtime,则系统就会认定该窗口的是ready的就可对窗口实施WindowFunction计算。当水位线的时间越过时间窗口的endtime + 允许迟到的时间,则窗口会被消亡。

watermarker(T) = 计算节点所获取的最大时间 - 最大乱序时间

Watermarker实现有两种方式:固定频次(推荐),Per Event计算

固定频次:

class UserDefineAssignerWithPeriodicWatermarks(maxOrdernessTime:Long) extends AssignerWithPeriodicWatermarks[(String,Long)]{
  private var maxSeenEventTime=0L
  private var sdf:SimpleDateFormat=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
  //系统会定期调用该方法,实现watermarker计算
  override def getCurrentWatermark: Watermark = {
    return new Watermark(maxSeenEventTime-maxOrdernessTime)
  }
  //每收到一条记录系统就会调用该方法,提取当前事件时间
  override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long): Long = {
    //将事件的最大值,赋值给maxSeenEventTime
    maxSeenEventTime=Math.max(maxSeenEventTime,element._2)

    println(s"事件时间:${sdf.format(element._2)} 水位线:${sdf.format(maxSeenEventTime-maxOrdernessTime)}")

    return element._2
  }
}

Per Event计算:

class UserDefineAssignerWithPunctuatedWatermarks(maxOrdernessTime:Long)  extends AssignerWithPunctuatedWatermarks[(String,Long)]{
  private var maxSeenEventTime=0L
  private var sdf:SimpleDateFormat=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
  //每接收一条记录就会计算一次
  override def checkAndGetNextWatermark(lastElement: (String, Long), extractedTimestamp: Long): Watermark = {
    return new Watermark(maxSeenEventTime-maxOrdernessTime)
  }

  override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long): Long = {
    //将事件的最大值,赋值给maxSeenEventTime
    maxSeenEventTime=Math.max(maxSeenEventTime,element._2)

    println(s"事件时间:${sdf.format(element._2)} 水位线:${sdf.format(maxSeenEventTime-maxOrdernessTime)}")

    return element._2
  }
}

Allowed Lateness
在Flink中默认情况下水位线一旦没过窗口的EndTime,这个时候窗口就被理解为就绪状态,系统会调用WindowFunction实现对窗口元素的聚合运算,然后丢弃窗口,原因是当用户不设置late时间默认值0。

窗口删除条件: Watermarker >= Window EndTime +Allow Late Time

窗口触发条件: Watermarker >= Window EndTime

因此flink可以通过设置Allow LateTime来处理迟到的时间,只要窗口还没有被删除。迟到的数据则可以再次加入窗口计算。

示例代码:

class UserDefineAssignerWithPunctuatedWatermarks(maxOrdernessTime:Long)  extends AssignerWithPunctuatedWatermarks[(String,Long)]{
  private var maxSeenEventTime=0L
  private var sdf:SimpleDateFormat=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
  //每接收一条记录就会计算一次
  override def checkAndGetNextWatermark(lastElement: (String, Long), extractedTimestamp: Long): Watermark = {

    return new Watermark(maxSeenEventTime-maxOrdernessTime)
  }

  override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long): Long = {
    //将事件的最大值,赋值给maxSeenEventTime
    maxSeenEventTime=Math.max(maxSeenEventTime,element._2)

    println(s"事件时间:${sdf.format(element._2)} 水位线:${sdf.format(maxSeenEventTime-maxOrdernessTime)}")

    return element._2
  }
}

class UserDefineEventWindowFunction extends WindowFunction[(String,Long),String,String,TimeWindow]{
  private var sdf:SimpleDateFormat=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

  override def apply(key: String, window: TimeWindow,
                     input: Iterable[(String, Long)],
                     out: Collector[String]): Unit = {

   println(s"【${sdf.format(window.getStart)} - ${sdf.format(window.getEnd)}】")
   out.collect( input.map(t=> sdf.format(t._2)).mkString(" | ")  )
  }
}

object FlinkTumblingWindowsLateData {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //默认Flink用的是处理时间,必须设置EventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)//并行度设置为1,方便测试和观察

    //key 时间
    env.socketTextStream("CentOS", 9999)
        .map(_.split("\\s+"))
        .map(ts=>(ts(0),ts(1).toLong))
        .assignTimestampsAndWatermarks(new UserDefineAssignerWithPunctuatedWatermarks(2000))//设置水位线计算策略
        .keyBy(t =>t._1)
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
        .allowedLateness(Time.seconds(2)) //允许迟到2s
        .apply(new UserDefineEventWindowFunction)
        .print()


    env.execute("FlinkTumblingWindowsLateData")
  }
}

Getting late data as a side output
使用flink的SideOut功能,可以获取最近被丢弃的数据流。首先,需要使用窗口流上的sideOutputLateData(OutputTag)指定要获取的最新数据。然后,可以根据窗口化操作的结果获取SideOut流。

示例代码:

class UserDefineEventWindowFunction extends WindowFunction[(String,Long),String,String,TimeWindow]{
  private var sdf:SimpleDateFormat=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

  override def apply(key: String, window: TimeWindow,
                     input: Iterable[(String, Long)],
                     out: Collector[String]): Unit = {

   println(s"【${sdf.format(window.getStart)} - ${sdf.format(window.getEnd)}】")
   out.collect( input.map(t=> sdf.format(t._2)).mkString(" | ")  )
  }
}
class UserDefineAssignerWithPunctuatedWatermarks(maxOrdernessTime:Long)  extends AssignerWithPunctuatedWatermarks[(String,Long)]{
  private var maxSeenEventTime=0L
  private var sdf:SimpleDateFormat=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
  //每接收一条记录就会计算一次
  override def checkAndGetNextWatermark(lastElement: (String, Long), extractedTimestamp: Long): Watermark = {

    return new Watermark(maxSeenEventTime-maxOrdernessTime)
  }

  override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long): Long = {
    //将事件的最大值,赋值给maxSeenEventTime
    maxSeenEventTime=Math.max(maxSeenEventTime,element._2)

    println(s"事件时间:${sdf.format(element._2)} 水位线:${sdf.format(maxSeenEventTime-maxOrdernessTime)}")

    return element._2
  }
}

object FlinkTumblingWindowsLateData {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //默认Flink用的是处理时间,必须设置EventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)//并行度设置为1,方便测试和观察

    val lateTag = new OutputTag[(String,Long)]("latedata")
    //key 时间
   var stream= env.socketTextStream("CentOS", 9999)
        .map(_.split("\\s+"))
        .map(ts=>(ts(0),ts(1).toLong))
        .assignTimestampsAndWatermarks(new UserDefineAssignerWithPunctuatedWatermarks(2000))//设置水位线计算策略
        .keyBy(t =>t._1)
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
        .allowedLateness(Time.seconds(2)) //允许迟到2s
        .sideOutputLateData(lateTag)
        .apply(new UserDefineEventWindowFunction)


    stream.print("窗口输出")
    stream.getSideOutput(lateTag).printToErr("迟到数据")

    env.execute("FlinkTumblingWindowsLateData")
  }
}
 

————————————————
版权声明:本文为CSDN博主「gym02」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/gym02/article/details/105849358