一、简介
大家知道,Flink用水位线和窗口机制配合来处理乱序事件,保证窗口计算数据的正确性,当水位线超过窗口结束时间的时候,就会触发窗口计算
- 水位线是动态生成的,根据进入窗口的最大事件时间-允许延迟时间
那么窗口的开始时间和结束时间是怎么计算的呢?这里不讨论计数窗口,因为数量统计很容易知道,只针对时间窗口的计算
- 滚动时间窗口:按照固定的时间长度对数据进行分组,窗口之间没有重叠,例如,5秒的滚动窗口。开始时间为当前窗口大小的整数倍,结束时间为开始时间加上窗口大小,比如
- 滑动时间窗口:按照固定的时间长度对数据进行分组,窗口之间有重叠,例如,5秒的滑动窗口,每2秒钟滑动一次。开始时间为当前窗口大小的整数倍加上窗口滑动步长的整数倍,结束时间为开始时间加上窗口大小。
- 会话窗口:按照数据的时间间隔进行分组,当两个数据之间的时间间隔大于指定的间隔时间时,就认为前一个窗口结束,后一个窗口开始。开始时间为第一个数据的时间戳,结束时间为最后一个数据的时间戳加上间隔时间
看完上面可能还有些疑惑,没关系,下面会有具体示例
二、代码实现
自定义一个Mywindowfunction继承WindowFunction,可以在流上调用这个function从而打印
class MyWindowFunction extends WindowFunction[User, String, Tuple, TimeWindow] {
override def apply(key: Tuple, window: TimeWindow, input: Iterable[User], out: Collector[String]): Unit = {
val startTime = window.getStart
val endTime = window.getEnd
val result = s"Window start time: $startTime, end time: $endTime"
out.collect(result)
}
}
三、测试
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.api.java.tuple._
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.table.api.{TableEnvironment, Types}
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
case class User(id:Int,name:String,age:Int,timestamp:Long)
object WaterMarkTest {
def main(args: Array[String]): Unit = {
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(streamEnv)
//指定时间类型为事件时间
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream = streamEnv.fromElements(
User(1, "nie", 22, 1511658001000L),
User(2, "xiao", 19, 1511658005000L)
).assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[User](Time.seconds(2)) {
override def extractTimestamp(t: User): Long = t.timestamp
}
)
stream.keyBy(0)
.timeWindow(Time.seconds(2))
.apply(new MyWindowFunction)
.print()
streamEnv.execute("test")
}
}
class MyWindowFunction extends WindowFunction[User, String, Tuple, TimeWindow] {
override def apply(key: Tuple, window: TimeWindow, input: Iterable[User], out: Collector[String]): Unit = {
val startTime = window.getStart
val endTime = window.getEnd
val result = s"Window start time: $startTime, end time: $endTime"
out.collect(result)
}
}
比如上面的两条数据时间戳分别为1511658001000L、1511658005000L,转换成日期格式为2017-11-26 9:0:1,2017-11-26 9:0:5,设置为滚动窗口,窗口间隔为2s,上面说到滚动窗口窗口开始时间为当前窗口大小的整数倍,结束时间为开始时间加上窗口大小,那么可以推断窗口为2017-11-26 9:0:0(整数倍) ~ 2017-11-26 9:0:2、2017-11-26 9:0:4 ~ 2017-11-26 9:0:6,运行下代码试下:
确实是一样的,窗口开始时间为事件时间往下推,直到找到窗口大小的整数倍,窗口结束时间就是开始时间加上时间间隔
这时候我们改成滑动窗口,窗口间隔时间为5s,滑动距离为2s,这时候修改一下上述代码
stream.keyBy(0)
.timeWindow(Time.seconds(5),Time.seconds(3))
.apply(new MyWindowFunction)
.print()
再运行一下:
上面说到滑动窗口开始时间为当前窗口大小的整数倍加上窗口滑动步长的整数倍,可以理解为窗口大小为5s,滑动距离为2s,事件事件分别为1511658001000L、1511658005000L,相对于1511658001000L,5s的整数倍可以是1511657995000,1511657990000,这时候加上滑动距离的整数倍,可以是1511657992000、1511657994000、1511657998000、1511657997000,为什么窗口最开始为1511657998000,肯定是综合考量的结果