淘先锋技术网

首页 1 2 3 4 5 6 7

场景

,正常来说几秒钟就会有数据过来,为了监控上游数据接口是不是有问题,可通过监控kafa数据在指定时间内是否有数据产生,没有数据则进行告警。本代码是伪代码,读者可自行修改成自己想要 的。

逻辑分析

  • 技术应用

借助状态ValueState 和定时器,这要求必须是process方法,且必须是keyByStream.

  • 逻辑分析
    -第一条数据到来的时候,将当前processTime存储到状态中。并注册定时器
    后续数据到来,判断当前时间和状态中时间差值大于指定间隔,则不做处理。此时意味着第一步定义的定时器会发生,也就是会告警。 若差值小于指定间隔则 删除第一步的定时器,并更新状态,新的状=现在的处理时间+间隔
    定时器 执行的时候,可发出告警信息,同事清除定时器,和状态

代码

package com.check.alarm;



import org.apache.commons.lang.time.FastDateFormat;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.scala.typeutils.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

import java.util.Date;
import java.util.concurrent.TimeUnit;


class MySource implements SourceFunction<Integer> {
    @Override
    public void run(SourceContext<Integer> ctx) throws Exception {
        int i = 1;
        while(true)
        {
            if(i==10){
                Thread.sleep(10000);
            }
            {
                Thread.sleep(1000);
            }
            ctx.collect(i);
            i++;
        }
    }

    @Override
    public void cancel() {

    }
}

public class CheckAlarm {
    /*
    *
    * 监控kafka数据,一分钟没数据告警
    * 注意线上打包的时候,需要将pom.xml中的  <scope>打开,排除依赖的jar,  目前所有的jar都由streamx 统一管理,只有测试的时候需要将<scope>注释掉
    **/
    private static final String BROKER_LIST = "alikafka-pre-cn-7mz2urb69009-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-7mz2urb69009-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-7mz2urb69009-3-vpc.alikafka.aliyuncs.com:9092";
    private static final String TOPIC = "dsj_game_log";
    private static final String defaultCheckpointPath = "hdfs://flinkmaster:8020/remote-default-checkpoints/penggan/" + CheckAlarm.class.getSimpleName();
    private static final String GROUP_ID = "dsj_game_log_flink";
    public static void main(String[] args) throws Exception {
        //                              构造执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //启动检查点,线上要开启,测试可关闭
//        setStateBackendAndCheckpoint(env, defaultCheckpointPath);



        DataStreamSource<Integer> s =  env.addSource(new MySource());
        SingleOutputStreamOperator<String> alarmStream = s.keyBy(new KeySelector<Integer, String>() {
            @Override
            public String getKey(Integer value) throws Exception {
                return ""; //意思是不分组
            }
        }).process(new KeyedProcessFunction<String,Integer, String>() {
            private ValueState<Long> clockTime;
            @Override
            public void open(Configuration parameters) throws Exception {
                //初始化闹钟状态, 初始化温度状态
                clockTime = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timer", Types.LONG()));

            }
            @Override
            public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
                long time = ctx.timerService().currentProcessingTime();
                if(clockTime.value()==null) {
                    System.out.println("初始化");
                    clockTime.update(time);//缓存第一条数据到来的时间
                    ctx.timerService().registerProcessingTimeTimer(time + 8000);//注册告警定时器
                }else {
                    if(time-clockTime.value()>8000){
                        System.out.println("闹钟将会响起");
                        //闹钟将会响起
                    }else {
                        ctx.timerService().deleteProcessingTimeTimer(clockTime.value()+8000);//删除告警定时器
                        clockTime.clear();
                    }
                }
            }


            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
//                System.out.println(FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss").format(new Date(clockTime.value())));
                System.out.println("闹钟响起,清空状态");
                out.collect("告警内容");
                ctx.timerService().deleteProcessingTimeTimer(clockTime.value()+8000);
                clockTime.clear();
            }
        });
        alarmStream.print();







        env.execute();

    }
    public static void setStateBackendAndCheckpoint(StreamExecutionEnvironment env, String checkpointPath) {
        System.out.println("启用状态后端");
        env.setStateBackend(new EmbeddedRocksDBStateBackend(true));   // 设置状态后端
        //每30秒启动一个检查点
        env.enableCheckpointing(30000);
        //允許几次檢查點失敗
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);

        // 设置状态后端
        env.setStateBackend(new EmbeddedRocksDBStateBackend(true));

        env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(checkpointPath));

        //检查点保存模式
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        //设置最小间隔
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(TimeUnit.MINUTES.toMillis(1));

        //设置超时时长
        env.getCheckpointConfig().setCheckpointTimeout(TimeUnit.MINUTES.toMillis(5));

        // 最大并发检查点数量,如果上面设置了 最小间隔,其实这个参数已经不起作用了
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

        //可恢复
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //重试机制
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, org.apache.flink.api.common.time.Time.of(30, TimeUnit.SECONDS)));
    }
}