场景
,正常来说几秒钟就会有数据过来,为了监控上游数据接口是不是有问题,可通过监控kafa数据在指定时间内是否有数据产生,没有数据则进行告警。本代码是伪代码,读者可自行修改成自己想要 的。
逻辑分析
- 技术应用
借助状态ValueState 和定时器,这要求必须是process方法,且必须是keyByStream.
- 逻辑分析
-第一条数据到来的时候,将当前processTime存储到状态中。并注册定时器
后续数据到来,判断当前时间和状态中时间差值大于指定间隔,则不做处理。此时意味着第一步定义的定时器会发生,也就是会告警。 若差值小于指定间隔则 删除第一步的定时器,并更新状态,新的状=现在的处理时间+间隔
定时器 执行的时候,可发出告警信息,同事清除定时器,和状态
代码
package com.check.alarm;
import org.apache.commons.lang.time.FastDateFormat;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.scala.typeutils.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import java.util.Date;
import java.util.concurrent.TimeUnit;
class MySource implements SourceFunction<Integer> {
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
int i = 1;
while(true)
{
if(i==10){
Thread.sleep(10000);
}
{
Thread.sleep(1000);
}
ctx.collect(i);
i++;
}
}
@Override
public void cancel() {
}
}
public class CheckAlarm {
/*
*
* 监控kafka数据,一分钟没数据告警
* 注意线上打包的时候,需要将pom.xml中的 <scope>打开,排除依赖的jar, 目前所有的jar都由streamx 统一管理,只有测试的时候需要将<scope>注释掉
**/
private static final String BROKER_LIST = "alikafka-pre-cn-7mz2urb69009-1-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-7mz2urb69009-2-vpc.alikafka.aliyuncs.com:9092,alikafka-pre-cn-7mz2urb69009-3-vpc.alikafka.aliyuncs.com:9092";
private static final String TOPIC = "dsj_game_log";
private static final String defaultCheckpointPath = "hdfs://flinkmaster:8020/remote-default-checkpoints/penggan/" + CheckAlarm.class.getSimpleName();
private static final String GROUP_ID = "dsj_game_log_flink";
public static void main(String[] args) throws Exception {
// 构造执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//启动检查点,线上要开启,测试可关闭
// setStateBackendAndCheckpoint(env, defaultCheckpointPath);
DataStreamSource<Integer> s = env.addSource(new MySource());
SingleOutputStreamOperator<String> alarmStream = s.keyBy(new KeySelector<Integer, String>() {
@Override
public String getKey(Integer value) throws Exception {
return ""; //意思是不分组
}
}).process(new KeyedProcessFunction<String,Integer, String>() {
private ValueState<Long> clockTime;
@Override
public void open(Configuration parameters) throws Exception {
//初始化闹钟状态, 初始化温度状态
clockTime = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timer", Types.LONG()));
}
@Override
public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
long time = ctx.timerService().currentProcessingTime();
if(clockTime.value()==null) {
System.out.println("初始化");
clockTime.update(time);//缓存第一条数据到来的时间
ctx.timerService().registerProcessingTimeTimer(time + 8000);//注册告警定时器
}else {
if(time-clockTime.value()>8000){
System.out.println("闹钟将会响起");
//闹钟将会响起
}else {
ctx.timerService().deleteProcessingTimeTimer(clockTime.value()+8000);//删除告警定时器
clockTime.clear();
}
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// System.out.println(FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss").format(new Date(clockTime.value())));
System.out.println("闹钟响起,清空状态");
out.collect("告警内容");
ctx.timerService().deleteProcessingTimeTimer(clockTime.value()+8000);
clockTime.clear();
}
});
alarmStream.print();
env.execute();
}
public static void setStateBackendAndCheckpoint(StreamExecutionEnvironment env, String checkpointPath) {
System.out.println("启用状态后端");
env.setStateBackend(new EmbeddedRocksDBStateBackend(true)); // 设置状态后端
//每30秒启动一个检查点
env.enableCheckpointing(30000);
//允許几次檢查點失敗
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
// 设置状态后端
env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(checkpointPath));
//检查点保存模式
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//设置最小间隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(TimeUnit.MINUTES.toMillis(1));
//设置超时时长
env.getCheckpointConfig().setCheckpointTimeout(TimeUnit.MINUTES.toMillis(5));
// 最大并发检查点数量,如果上面设置了 最小间隔,其实这个参数已经不起作用了
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//可恢复
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//重试机制
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, org.apache.flink.api.common.time.Time.of(30, TimeUnit.SECONDS)));
}
}