flink中的状态:算子状态(Operatior)、键控状态(Keyed State)、状态后端(State Backends)
状态的定义:
1、有一个任务维护,并且用来计算某个结果的所有数据,都属于这个任务的状态
2、可以任务状态是一个本地变量,可以被任务的业务逻辑访问
3、Flink的状态管理主要是状态一致性、故障处理以及高效储存和访问。
注意:
1、在Flink中,状态始终与特定算子相关联
2、运行时的Flink了解算子的状态,算子需要预先注册其状态
算子状态(Operatior):算子状态的作用范围限定为算子任务。主要针对分区中某个task的数据进行处理
1、列表状态(List state):将状态表示为一组数据的列表
2、联合列表状态(Unionlist state):也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点savepoint启动应用程序是如何恢复
3、广播状态(Broadcast state):如果一个算子有多项任务,而他的每项任务状态相同,那么这种特殊情况最适合应用广播状态。
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Collections;
import java.util.List;
public class OperateState_Test {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("127.0.0.1", 9999);
SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[3]));
});
// 定义一个有状态的map操作,统计当前分区数据个数
SingleOutputStreamOperator<Integer> resultStream = dataStream.map(new myCountMapper());
resultStream.print();
env.execute();
}
// 自定义MapFunction
public static class myCountMapper implements MapFunction<SensorReading,Integer>, ListCheckpointed<Integer> {
// 定义一个本地变量,作为算子状态
private Integer count = 0;
@Override
public Integer map(SensorReading value) throws Exception {
count ++;
return count;
}
// 状态的快照
@Override
public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
return Collections.singletonList(count);
}
// 故障时将快照的数据恢复
@Override
public void restoreState(List<Integer> state) throws Exception {
for (Integer num:state){
count += num;
}
}
}
}
键控状态(Keyed State):根据输入数据中定义的键(key)来维护和访问。Flink为每一个key维护一个状态实例,并将具有相同键的所有数据都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态。当任务处理一条数据时,他会自动将状态的访问范围限定为当前数据的key。
1、值状态(ValueState):将状态表示为单个的值
2、列表状态(List Value):将状态表示为一组数据的列表
3、映射状态(MapSate):将状态表示为一组Key-Value对
4、聚合状态(Reducing state&Aggregating State):将状态表示为一个用于聚合操作的列表
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class KeyedState_Test {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> inputStream = env.socketTextStream("127.0.0.1", 9999);
SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// 定义一个有状态的map操作,统计当前sensor数据个数
SingleOutputStreamOperator<Integer> resultStream = dataStream
.keyBy("id")
.map(new MyKeyCountMapper());
resultStream.print("a");
env.execute();
}
// 自定义RichMapFunction
public static class MyKeyCountMapper extends RichMapFunction<SensorReading,Integer>{
// 声明一个键控状态
private ValueState<Integer> keyCountState;
// 其他状态类型的声明
private ListState<String> myListState;
private MapState<String,Double> myMapState;
private ReducingState<SensorReading> myReducingState;
@Override
public Integer map(SensorReading value) throws Exception {
// 其他状态api调用
// liststate
Iterable<String> iterable = myListState.get();
for (String s :iterable) {
System.out.println(s);
}
myListState.add(value.getTemperature().toString());
myMapState.get("1");
myMapState.put("2",12.3);
// reduce state
myReducingState.add(value);
myReducingState.clear();
// valuestate
Integer count = keyCountState.value();
count++;
// 键控状态的赋值
keyCountState.update(count);
return count;
}
@Override
public void open(Configuration parameters) throws Exception {
// 键控状态的声明
keyCountState = getRuntimeContext()
.getState(new ValueStateDescriptor<Integer>("key-count",Integer.class,0));
myListState = getRuntimeContext()
.getListState(new ListStateDescriptor<String>("my-list",String.class));
myMapState = getRuntimeContext()
.getMapState(new MapStateDescriptor<String, Double>("my-map",String.class,Double.class));
// myReducingState = getRuntimeContext().getReducingState(new ReducingStateDescriptor<SensorReading>("my-reduce",));
}
}
}
状态后端(State Backends):
1、MemoryStateBackend(内存级的状态后端):将键控状态作为内存中的对象进行管理,将它们存储在TaskManager的JVM堆上,而将checkpoint储存在JobManager的内存中
2、FsStateBackend:将checkpoint存在远程的持久化文件系统上(FileSystem),二队本地状态跟MemoryStateBackend一样,也会存在TaskManager的JVM堆上。同时拥有内存级的本地访问速度,和更好的容错保证
3、RocksDBStateBackend:将所有状态序列化,存入本地的RocksDB中储存
注意:引入依赖
<!-- 状态后端statebackend -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.12</artifactId>
<version>1.10.1</version>
</dependency>
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import scala.Tuple3;
public class StateBackend_FaultTolerance {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 状态后端编程
env.setStateBackend( new MemoryStateBackend());
// 需要输入检查点路劲
env.setStateBackend( new FsStateBackend(""));
env.setStateBackend( new RocksDBStateBackend(""));
DataStreamSource<String> inputStream = env.socketTextStream("127.0.0.1", 9999);
SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
dataStream.print("dataStream");
env.execute();
}
}