流式计算分为无状态和有状态两种情况。无状态的计算观察每个独立事件,并根据最后一个事件输出结果。例如:流处理应用程序从传感器接收温度读数,并在温度超过90度时发出警告。有状态的计算则会基于多个事件输出结果。例子如下:
①所有类型的窗口。例如:计算过去一小时的平均温度,就是有状态的计算。
②所有用于复杂事件处理的状态机。例如:若在一分钟内收到两个相差20度以上的温度读数,则发出警告,这是有状态的计算。
③流与流之间的所有关联操作,以及流与静态表或动态表之间的关联操作,都是有状态的计算。
下图展示了无状态流处理与有状态流处理的主要区别。无状态流处理分别接收每条数据记录(图中的黑条),然后,根据最新输入的数据生成输出数据(图中的白条)。有状态流处理会维护状态(根据每条数据记录进行更新),并基于最新输入的记录和当前的状态值生成输出记录(图中的灰条)。
上图中输入数据由黑条表示。无状态流处理每次只转换一跳输入记录,并且仅根据最新的输入记录输出结果(图中的白条)。有状态流处理维护所有已处理记录的状态值,并根据每条新输入的记录更新状态,因此输出记录(图中的灰条)反映的是综合考虑多个事件之后的结果。
尽管无状态的计算很重要,但是流处理对有状态的计算更感兴趣。事实上,正确地实现有状态的计算比实现无状态的计算难得多。旧的流处理系统并不支持有状态的计算,而新一代的流处理系统则将状态及其正确性视为重中之重。
Flink内置的很多算子,数据源Source,数据存储Sink都是有状态的,流中的数据都是buffer records,会保存一定的元素或者元数据。例如:ProcessWindowFunction会缓存输入流的数据,ProcessFunction会保存设置的定时器信息等等。下面介绍一个Flink中的状态,如下图:
从图2中可以看出:
①状态由一个任务维护,并且用来计算某个结果的所有数据,都属于这个任务的状态。
②可以认为状态就是一个本地变量,可以被任务的业务逻辑访问。
③Flink会进行状态管理,包括状态一一致性、故障处理以及高效存储和访问,以便开发人员可以专注于应用程序的逻辑。
在Flink中,状态始终与特定算子相关联。为了使运行时的Flink了解算子的状态,算子都需要预先注册其状态。
总的来说,有两种类型的状态:
①算子状态(operator state)
②键控状态(keyed state)
1 算子状态(Operator State)
1.1基本概念
算子状态的作用范围限定为算子任务。这意味着由同一并行任务所处理的所有数据都可以访问到相同的状态。
状态对于同一任务而言是共享的。
算子状态不能由相同或不同的算子的另一个任务访问。
1.2 数据结构
Flink为算子状态提供三种基本数据结构。
①列表状态(List State)
将状态表示为一组数据的列表
②联合列表状态(Union List State)
也将状态表示为数据的列表。它与常规列表状态的区别在于:在发生故障时,或者保存点(savepoint)启动应用程序时如何恢复。
③广播状态
如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态
2 键控状态(Keyed State)
2.1基本概念
键控状态是根据输入数据流中定义的键(key)来维护和访问的。
Flink为每个key维护一个状态实例,并将具有相同键的所有数据,都分区到同一
个算子任务中,这个任务会维护和处理这个key对应的状态。
当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。
KeyedState很类似于一个分布式的key-value map数据结构,只能用于KeyedStream(keyBy算子处理之后)。
2.2 数据结构
键控状态数据结构如下:
①值状态(ValueState)
将状态表示为单个的值
②列表状态(ListState)
将状态表示为一组数据的列表
③映射状态(MapState)
将状态表示为一组key-value对。
④聚合状态(ReducingState和AggregatingState)
将状态表示为一个用于聚合操作的列表。
2.3 使用
需求:监控温度传感器的温度值,如果相邻两条数据的温度差大于10,则报警。
object ProcessFunctionTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val inputStream = env.socketTextStream(IP地址,7777)
val dataStream = inputStream.map(
data =>{
val dataArr = data.split(",")
SensorReading(dataArr(0).trim,dataArr(1).trim.toLong,dataArr(2).trim.toDouble)
}
)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.milliseconds(1000)) {
override def extractTimestamp(element: SensorReading): Long = {
element.timeStamp * 1000L
}
})
dataStream.print("dataStream")
val flatMapWithStateStream = dataStream.keyBy(_.id)
.flatMapWithState[(String, Double, Double), Double] {
// 如果没有状态的话,也就是没有数据来过,那么就将当前数据温度值存入状态
case (input: SensorReading, None) => (List.empty, Some(input.temperature))
// 如果有状态,就应该与上次的温度值比较差值,如果大于阈值就输出报警
case (input: SensorReading,lastTemp: Some[Double]) =>
val dif = (input.temperature - lastTemp.get).abs
if(dif > 10.0){
( List((input.id, lastTemp.get, input.temperature)), Some(input.temperature) )
}else{
( List.empty, Some(input.temperature) )
}
}
flatMapWithStateStream.print("flatMapWithStateStream")
env.execute("process function test")
}
}
3 状态后端(State Backends)
3.1 基本概念
每传入一条数据,有状态的算子任务都会读取和更新状态。
由于有效的状态访问对于处理数据的低延迟至关重要,因此每个并行任务都会在本地维护其状态,以确保快速的状态访问。
状态存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做状态后端。
状态后端主要负责:本地的状态管理和将检查点(checkpoint)状态写入远程存储。
3.2 状态后端的类型
(1)MemoryStateBackend
内存级的状态后端,会将键控状态作为内存汇总的对象进行管理,将它们存储在TaskManager的JVM堆上,而将checkpoint存储在JobManager的内存中。
特点:快速、低延迟,但不稳定
(2)FsStateBackend
将checkpoint存到远程的持久化文件系统(FileSystem)上,而对于本地状态,跟MemoryStateBackend一样,也会存在TaskManager的JVM堆上。
特点:拥有内存级的本地访问速度,以及更好的容错保证。
(3)RocksDBStateBackend
将所有状态序列化后,存入本地的RocksDB存储。
注:RocksDB需要引入依赖才可使用:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.12</artifactId>
<version>1.7.0</version>
</dependency>
代码中可以直接设置:
但是这种方法已经不推荐使用了,我在源码中也未找到关于另一种推荐使用方法,去网上查了一下,貌似推荐使用配置文件,截图如下: