淘先锋技术网

首页 1 2 3 4 5 6 7

flink中的状态:算子状态(Operatior)、键控状态(Keyed State)、状态后端(State Backends)

状态的定义:
1、有一个任务维护,并且用来计算某个结果的所有数据,都属于这个任务的状态
2、可以任务状态是一个本地变量,可以被任务的业务逻辑访问
3、Flink的状态管理主要是状态一致性、故障处理以及高效储存和访问。

注意:
1、在Flink中,状态始终与特定算子相关联
2、运行时的Flink了解算子的状态,算子需要预先注册其状态

算子状态(Operatior):算子状态的作用范围限定为算子任务。主要针对分区中某个task的数据进行处理
1、列表状态(List state):将状态表示为一组数据的列表
2、联合列表状态(Unionlist state):也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点savepoint启动应用程序是如何恢复
3、广播状态(Broadcast state):如果一个算子有多项任务,而他的每项任务状态相同,那么这种特殊情况最适合应用广播状态。

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Collections;
import java.util.List;

public class OperateState_Test {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> inputStream = env.socketTextStream("127.0.0.1", 9999);

        SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[3]));
        });

        // 定义一个有状态的map操作,统计当前分区数据个数
        SingleOutputStreamOperator<Integer> resultStream = dataStream.map(new myCountMapper());

        resultStream.print();

        env.execute();
    }

    // 自定义MapFunction
    public static class myCountMapper implements MapFunction<SensorReading,Integer>, ListCheckpointed<Integer> {
        //  定义一个本地变量,作为算子状态
        private Integer count = 0;

        @Override
        public Integer map(SensorReading value) throws Exception {
            count ++;
            return count;
        }

        //  状态的快照
        @Override
        public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
            return Collections.singletonList(count);
        }
 
        //  故障时将快照的数据恢复
        @Override
        public void restoreState(List<Integer> state) throws Exception {
            for (Integer num:state){
                count += num;
            }
        }
    }
}

键控状态(Keyed State):根据输入数据中定义的键(key)来维护和访问。Flink为每一个key维护一个状态实例,并将具有相同键的所有数据都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态。当任务处理一条数据时,他会自动将状态的访问范围限定为当前数据的key。
1、值状态(ValueState):将状态表示为单个的值
2、列表状态(List Value):将状态表示为一组数据的列表
3、映射状态(MapSate):将状态表示为一组Key-Value对
4、聚合状态(Reducing state&Aggregating State):将状态表示为一个用于聚合操作的列表

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class KeyedState_Test {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<String> inputStream = env.socketTextStream("127.0.0.1", 9999);

        SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

        //  定义一个有状态的map操作,统计当前sensor数据个数
        SingleOutputStreamOperator<Integer> resultStream = dataStream
                .keyBy("id")
                .map(new MyKeyCountMapper());

        resultStream.print("a");

        env.execute();
    }

    // 自定义RichMapFunction
    public static class MyKeyCountMapper extends RichMapFunction<SensorReading,Integer>{
        //  声明一个键控状态
        private ValueState<Integer> keyCountState;

        //  其他状态类型的声明
        private ListState<String> myListState;
        private MapState<String,Double> myMapState;
        private ReducingState<SensorReading> myReducingState;

        @Override
        public Integer map(SensorReading value) throws Exception {

            //  其他状态api调用
            //  liststate
            Iterable<String> iterable = myListState.get();
            for (String s :iterable) {
                System.out.println(s);
            }
            myListState.add(value.getTemperature().toString());

            myMapState.get("1");
            myMapState.put("2",12.3);

            // reduce state
            myReducingState.add(value);
            myReducingState.clear();

            //  valuestate
            Integer count = keyCountState.value();
            count++;
            //  键控状态的赋值
            keyCountState.update(count);
            return count;
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            //  键控状态的声明
            keyCountState = getRuntimeContext()
                    .getState(new ValueStateDescriptor<Integer>("key-count",Integer.class,0));

            myListState = getRuntimeContext()
                    .getListState(new ListStateDescriptor<String>("my-list",String.class));
            myMapState = getRuntimeContext()
                    .getMapState(new MapStateDescriptor<String, Double>("my-map",String.class,Double.class));
//            myReducingState = getRuntimeContext().getReducingState(new ReducingStateDescriptor<SensorReading>("my-reduce",));
        }
    }
}

状态后端(State Backends):
1、MemoryStateBackend(内存级的状态后端):将键控状态作为内存中的对象进行管理,将它们存储在TaskManager的JVM堆上,而将checkpoint储存在JobManager的内存中
2、FsStateBackend:将checkpoint存在远程的持久化文件系统上(FileSystem),二队本地状态跟MemoryStateBackend一样,也会存在TaskManager的JVM堆上。同时拥有内存级的本地访问速度,和更好的容错保证
3、RocksDBStateBackend:将所有状态序列化,存入本地的RocksDB中储存
注意:引入依赖

<!-- 状态后端statebackend -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_2.12</artifactId>
            <version>1.10.1</version>
        </dependency>
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import scala.Tuple3;

public class StateBackend_FaultTolerance {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //  状态后端编程
        env.setStateBackend( new MemoryStateBackend());
        //  需要输入检查点路劲
        env.setStateBackend( new FsStateBackend(""));
        env.setStateBackend( new RocksDBStateBackend(""));


        DataStreamSource<String> inputStream = env.socketTextStream("127.0.0.1", 9999);

        SingleOutputStreamOperator<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });

        dataStream.print("dataStream");
        env.execute();
    }
}