8 多流转换
8.1 分流
- 简单实现
对流三次filter算子操作实现分流
// 筛选 Mary 的浏览行为放入 MaryStream 流中
DataStream<Event> MaryStream = stream.filter(new FilterFunction<Event>()
{
@Override
public boolean filter(Event value) throws Exception {
return value.user.equals("Mary");
}
});
// 筛选 Bob 的购买行为放入 BobStream 流中
DataStream<Event> BobStream = stream.filter(new FilterFunction<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.user.equals("Bob");
}
});
// 筛选其他人的浏览行为放入 elseStream 流中
DataStream<Event> elseStream = stream.filter(new FilterFunction<Event>()
{
@Override
public boolean filter(Event value) throws Exception {
return !value.user.equals("Mary") && !value.user.equals("Bob") ;
}
});
MaryStream.print("Mary pv");
BobStream.print("Bob pv");
elseStream.print("else pv");
- 使用测输出流
- 代码
public class SplitStreamTest {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
);
//定义测输出流标签
//测输出流类型可以跟主流不同,因此换个类型
OutputTag<Tuple3<String, String, Long>> maryTag = new OutputTag<Tuple3<String, String, Long>>("Mary") {};
OutputTag<Tuple3<String, String, Long>> bobTag = new OutputTag<Tuple3<String, String, Long>>("Bob") {};
SingleOutputStreamOperator<Event> processStream = stream.process(new ProcessFunction<Event, Event>() {//主流类型还是Event吧
@Override
public void processElement(Event value, Context ctx, Collector<Event> out) throws Exception {
if (value.user.equals("Mary")) {
//把数据写道测输出流,第一个参数标签,第二个是输出形式
ctx.output(maryTag, Tuple3.of(value.user, value.url, value.timestamp));
} else if (value.user.equals("Bob")) {
//把数据写道测输出流,第一个参数标签,第二个是输出形式
ctx.output(bobTag, Tuple3.of(value.user, value.url, value.timestamp));
} else {//其他放主流
out.collect(value);
}
}
});
processStream.print("else");
processStream.getSideOutput(maryTag).print("Mary");
processStream.getSideOutput(bobTag).print("Bob");
env.execute();
}
}
- 结果
else> Event{user='Alice', url='./home', timestamp=2022-11-25 21:56:01.958}
else> Event{user='Alice', url='./cart', timestamp=2022-11-25 21:56:02.971}
Mary> (Mary,./home,1669384564001)
else> Event{user='Alice', url='./cart', timestamp=2022-11-25 21:56:05.004}
Bob> (Bob,./prod?id=100,1669384566019)
Bob> (Bob,./cart,1669384567024)
else> Event{user='Alice', url='./cart', timestamp=2022-11-25 21:56:08.027}
8.2 合流
8.2.1 概述
- 含义
要求:数据类型要相同
特点:可以合并多条流
- 使用
8.2.2 联合(Union)
- 代码
public class UnionTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Event> stream1 = env.socketTextStream("hadoop2",7777)
.map(data->{
String[] field = data.split(",");
return new Event(field[0].trim(),field[1].trim(),Long.valueOf(field[2].trim()));
})
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
);
stream1.print("stream1");
SingleOutputStreamOperator<Event> stream2 = env.socketTextStream("hadoop2",8888)
.map(data->{
String[] field = data.split(",");
return new Event(field[0].trim(),field[1].trim(),Long.valueOf(field[2].trim()));
})
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})
);
stream2.print("stream2");
//合并两条流
stream1.union(stream2)
.process(new ProcessFunction<Event, String>() {
@Override
public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
out.collect("水位线:"+ctx.timerService().currentWatermark());
}
})
.print();
env.execute();
}
}
- 结果及分析
图我习惯往上一层,方便对应着看
端口7777时间戳为2000,等下次事件发生由于延迟两秒,再-1,水位线为-1
但是由于端口8888的一直没数据,因此无论端口7777时间戳到哪里,水位线都是由两条流中较低的而决定,即以端口8888决定,因此水位线一直为一个很大的负数
此时如果在端口8888的窗口中输入数据,那么水位线会根据此流而变化,由于此流延迟五秒,会到6000的时候才会推动上一个时间戳5000的水位线到达-1,并与7777端口直至持平
当8888端口时间戳进行到7000的时候,水位线没有变成199(6000-50000-1),由于7777端口的时间戳才到-1,因此由低的流决定,显示水位线为-1
在7777端口输入Mary,./home,6000的推近上一个时间戳的水位线到3999(6000-2000-1),在8888端口输入Mary,./home,7000的时候也推近了7000这个时间戳的水位线,因为只需要2000毫秒就能触发,现在2000毫秒过了,就能触发,因此最后水位线显示1999
8.2.3 连接(connect)
- 概述
两个不同类型的DataStream连接(通过.connect)形成ConnectedStreams,进行算子转换后才得到DataStream
但是之前的map(),flatMap(),process()传入的都是对应的函数类处理单流数据,现在需要处理多流,会在原来的MapFunction前面加上Co,即CoMapFunction,其他的也一样CoProcessFunction,并且CoMapFuntion中方法有map1()方法和map2方法
- 分析
stream2(DataStream)调用connect后得到的是ConnectedStream
ConnectStream不继承DataStream了,ConnectStream的泛型分别是两个流的类型,其中有process()方法等,传入的CoProcessFunction
CoProcessFunction继承Function,并且有两个map的方法,分别是map1和map2传入的三个参数分别是,第一个流,第二个流,以及输出(即合流后的类型),
- 代码
- demo
public class ConnectTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Integer> stream1 = env.fromElements(1, 2, 3);
DataStreamSource<Long> stream2 = env.fromElements(4L,5L,6L,7L);
stream2.connect(stream1)
.map(new CoMapFunction<Long, Integer, String>() {
@Override
public String map1(Long value) throws Exception {
return "Long:"+value.toString();
}
@Override
public String map2(Integer value) throws Exception {
return "Integer:"+value.toString();
}
})
.print();
env.execute();
}
}
结果
Integer:1
Integer:2
Integer:3
Long:4
Long:5
Long:6
Long:7
- 实时对账案例
两条流,一条是app用户提交支付的请求,另一条流是第三方支付平台给我们反馈的订单支付的请求
public class BillCheckExample {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//来自app的支付日志
SingleOutputStreamOperator<Tuple3<String,String,Long>> appStream = env.fromElements(
Tuple3.of("order-1", "app", 1000L),
Tuple3.of("order-2", "app", 2000L),
Tuple3.of("order-3", "app", 3500L)
).assignTimestampsAndWatermarks(WatermarkStrategy.
<Tuple3<String,String,Long>>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
@Override
public long extractTimestamp(Tuple3<String, String, Long> element, long recordTimestamp) {
return element.f2;
}
})
);
//来自第三方平台的支付日志
SingleOutputStreamOperator<Tuple4<String,String,String,Long>> thirdpartStream = env.fromElements(
Tuple4.of("order-1", "third-party", "success", 3000L),
Tuple4.of("order-3", "third-party", "success", 4000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.
<Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple4<String,String,String,Long>>() {
@Override
public long extractTimestamp(Tuple4<String,String,String,Long> element, long recordTimestamp) {
return element.f3;
}
})
);
//检测同一支付单在两条流中是否匹配,等待一段时间后,不匹配就报警
// //这种也可以
// appStream.keyBy(data->data.f0)
// .connect(thirdpartStream.keyBy(data -> data.f0));
//
appStream.connect(thirdpartStream)
.keyBy(data->data.f0,data-> data.f0)
.process(new OrderMatchResult())
.print();
env.execute();
}
//自定义实现CoFunction
public static class OrderMatchResult extends CoProcessFunction<Tuple3<String,String,Long>,
Tuple4<String,String,String,Long>,String>{
//定义状态变量,用来保存已经到达的事件
private ValueState<Tuple3<String, String, Long>> appEventState;
private ValueState<Tuple4<String, String, String, Long>> thirdPartyEventState;
//运行上下文环境中获取状态
@Override
public void open(Configuration parameters) throws Exception {
appEventState = getRuntimeContext().getState(
new ValueStateDescriptor<Tuple3<String, String, Long>>("app-event", Types.TUPLE(Types.STRING,Types.STRING,Types.LONG))
);
thirdPartyEventState = getRuntimeContext().getState(
new ValueStateDescriptor<Tuple4<String, String, String, Long>>("thirdparty-event", Types.TUPLE(Types.STRING, Types.STRING, Types.STRING,Types.LONG))
);
}
@Override
public void processElement1(Tuple3<String, String, Long> value, CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String>.Context ctx, Collector<String> out) throws Exception {
//来的是app event,看另一条流中事件是否来过
if(thirdPartyEventState.value()!=null){
out.collect("对账成功:"+value+" "+thirdPartyEventState.value());
//清空状态
thirdPartyEventState.clear();
}else{
//如果每来就等待,并且更新状态
appEventState.update(value);
//注册一个5秒后的定时器,开始等待另一条的事件
ctx.timerService().registerEventTimeTimer(value.f2+5000L);
}
}
@Override
public void processElement2(Tuple4<String, String, String, Long> value, CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String>.Context ctx, Collector<String> out) throws Exception {
//来的是app event,看另一条流中事件是否来过
if(appEventState.value()!=null){
out.collect("对账成功:"+appEventState.value()+" "+value);
//清空状态
appEventState.clear();
}else{
//如果没来就等待,并且更新状态
thirdPartyEventState.update(value);
//注册一个5秒后的定时器,开始等待另一条的事件
ctx.timerService().registerEventTimeTimer(value.f3);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
//定时器触发,判断状态,如果某个状态不为空,说明另一条中事件没来
//并且不会存在两个都不为空,因为其中一个不为空后会被清除
//没有没清空表示失败
if(appEventState.value()!=null){
out.collect("对账失败:"+appEventState.value()+" "+"第三方支付平台信息未到");
}
if(thirdPartyEventState.value()!=null){
out.collect("对账失败:"+thirdPartyEventState.value()+" "+"APP信息信息未到");
}
//清空所有数据
appEventState.clear();
thirdPartyEventState.clear();
}
}
}
- 结果
对账成功:(order-1,app,1000) (order-1,third-party,success,3000)
对账成功:(order-3,app,3500) (order-3,third-party,success,4000)
对账失败:(order-2,app,2000) 第三方支付平台信息未到
8.2.4 广播连接流(broadcast)
- 概述
DataStream调用connect()方法后可以传入BroadcastStream广播流
传入广播流后返回的是BroadcastConnectedStream广播连接流,用于动态实时变化定义配置的场景
BroadcastStream广播流通过保存成广播状态广播给下游
DataStream中有broadcast()方法,需要传入MapStateDescriotor映射状态描述器,保存成映射状态然后广播至下游,最后返回了BroadcastStream
- 运用
new MapStateDescriotor形成MapStateDescriotor,传入broadcast(),返回BroadcastStream
再将得到的BroadcastStream对象放入DataStream调用connect()方法中,最终得到BroadcastConnectedStream广播连接流
而后BroadcastConnectedStream也可以调用process()方法,跟之前一样可以传入KeyedBroadcastProcessFunction,里面也是两个,一个是processElement(数据流使用的)以及processBroadcastElement(广播流用的)最后返回SingleOutputStreamOperator
8.3 双流join
8.3.1 概述
- 两条流类型不同
- 特殊的connect
8.3.2 窗口联结(Window Join)
- 分析
DataStream直接调用join方法,并得到JoinedStream
得到JoinedStream后,就可以调用where方法,where()中传入第一条流的KeySelelctor,返回Where类型
Where是JoinedStream的内部类,内部类中equalTo()传入第二条流KeySelelctor,并且返回EqualTo内部类
EqualTo内部类的方法window(),传入WindowAssigner跟之前的window函数一样了,可以传入TumblingEventTimeWindows滚动窗口以及其他的滑动以及会话窗口,最终返回的是WithWindow静态类
WithWindow中的方法就是之前窗口API能做的事情,例如apply(),然后apply()中键可以再传入FlatJoinFunction以及JoinFunction 函数
JoinFunction 参数类比CoMapFunction,方法为join联合两条流并输出OUT,FlatJoinFunction也差不多
- 总结
- 使用
stream1.join(stream2)
.where(<KeySelector>)
.equalTo(<KeySeletor>)
.window(<WindowAssigner>)
.apply(<JoinFunction>)
跟sql的join where xx=xx很像,上面的结果默认是inner join
橙流join绿流
- 代码
- demo
public class WindowJoinTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Tuple2<String,Long>> stream1 = env.fromElements(
Tuple2.of("a", 1000L),
Tuple2.of("b", 1000L),
Tuple2.of("a", 2000L),
Tuple2.of("b", 2000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
return element.f1;
}
}));
SingleOutputStreamOperator<Tuple2<String,Integer>> stream2 = env.fromElements(
Tuple2.of("a", 3000),
Tuple2.of("b", 4000),
Tuple2.of("a", 4500),
Tuple2.of("b", 5500)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String,Integer>>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Integer>>() {
@Override
public long extractTimestamp(Tuple2<String, Integer> element, long recordTimestamp) {
return element.f1;
}
}));
stream1.join(stream2)
.where(data->data.f0)
.equalTo(data->data.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Integer>, String>() {
@Override
public String join(Tuple2<String, Long> first, Tuple2<String, Integer> second) throws Exception {
return first+" -> "+second;
}
}).print();
env.execute();
}
}
结果
(a,1000) -> (a,3000)
(a,1000) -> (a,4500)
(a,2000) -> (a,3000)
(a,2000) -> (a,4500)
(b,1000) -> (b,4000)
(b,2000) -> (b,4000)
8.3.3 间隔联结(Interval Join)
- 概述
区间有lowerBound下届,upperBound上届
a.timestamp+lowerBound<=b.timestamp<=a.timestamp+upperBound
- 分析
DataStream先keyby后得到KeyedStream,再用KeyedStream的intervalJoin方法,传入的也要是另一个KeyedStream,得到IntervalJoin
IntervalJoin中可以指定是inEventTime(事件时间)还是inProcessing(处理时间),以及between
between传入参数,可以指定上届和下届,返回IntervalJoined
IntervalJoined方法中有lowerBoundExclusive()刨除下届以及upperBoundExclusive()刨除上届,以及process()方法传入ProcessJoinFunction
ProcessJoinFunction是个抽象类,有Context抽象类以及有processElement方法,参数是第一流数据,第二个流数据,上下文,以及输出
- 使用
stream1
.keyBy(<KeySelector>)
.intervalJoin(stream2.Keyby(<KeySelector>))
.between(Time.milliseconds(-2),Time.milliseconds(1))
.process(new ProcessJoinFunction<Integer,Integer,String>(){
....
}
- 案例
- 场景
两条流,一条是下订单的流,一条是浏览数据的流,做联结。观察一个用户的下订单事件和这个用户的最近十分钟的浏览数据进行一个联结查询
- 代码
public class IntervalJoinTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Tuple2<String,Long>> orderStream = env.fromElements(
Tuple2.of("Mary", 1000L),
Tuple2.of("Alice", 1000L),
Tuple2.of("Bob", 2000L),
Tuple2.of("Alice", 2000L),
Tuple2.of("Cary", 2000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
return element.f1;
}
}));
SingleOutputStreamOperator<Event> clickStream = env.fromElements(
new Event("Bob", "./cart", 2000L),
new Event("Alice", "./prod?id=100", 3000L),
new Event("Bob", "./prod?id=1", 3300L),
new Event("Alice", "./prod?id=200", 3000L),
new Event("Bob", "./home", 3500L),
new Event("Bob", "./prod?id=2", 3800L),
new Event("Bob", "./prod?id=3", 4200L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
}));
//将两条流做一个连接
orderStream.keyBy(data->data.f0)
.intervalJoin(clickStream.keyBy(data->data.user))
.between(Time.seconds(-5),Time.seconds(10))
.process(new ProcessJoinFunction<Tuple2<String, Long>, Event, String>(){
@Override
public void processElement(Tuple2<String, Long> left, Event right, ProcessJoinFunction<Tuple2<String, Long>, Event, String>.Context ctx, Collector<String> out) throws Exception {
out.collect(right+" => "+left);//浏览记录导致订单
}
})
.print();
env.execute();
}
}
- 结果
Event{user='Alice', url='./prod?id=100', timestamp=1970-01-01 08:00:03.0} => (Alice,1000)
Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0} => (Bob,2000)
Event{user='Bob', url='./prod?id=1', timestamp=1970-01-01 08:00:03.3} => (Bob,2000)
Event{user='Alice', url='./prod?id=100', timestamp=1970-01-01 08:00:03.0} => (Alice,2000)
Event{user='Alice', url='./prod?id=200', timestamp=1970-01-01 08:00:03.0} => (Alice,2000)
Event{user='Alice', url='./prod?id=200', timestamp=1970-01-01 08:00:03.0} => (Alice,1000)
Event{user='Bob', url='./home', timestamp=1970-01-01 08:00:03.5} => (Bob,2000)
Event{user='Bob', url='./prod?id=2', timestamp=1970-01-01 08:00:03.8} => (Bob,2000)
Event{user='Bob', url='./prod?id=3', timestamp=1970-01-01 08:00:04.2} => (Bob,2000)
8.3.4 窗口同组连接(Window CoGroup)
- 使用
stream1.coGroup(Stream2)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.apply(<CoGroupFunction>)
对比窗口联结
stream1.join(stream2)
.where(<KeySelector>)
.equalTo(<KeySeletor>)
.window(<WindowAssigner>)
.apply(<JoinFunction>
把join变成了coGroup,以及JoinFunction变成CoGroupFunction
- 分析
DataStream有coGroup方法,需要传入DataStream,返回CoGroupedStreams
CoGroupedStreams方法中可以调用where方法得到Where类型,然后调用equalTo()方法得到EqualTo类型,然后调用window()方法指定窗口得到WithWindow类型,可以在调用apply()
apply()传入的是CoGroupFunction接口,也是只有一个单一抽象方法coGroup(),coGroup()方法中传入的参数,是Iterable集合类型,表示的是窗口内的一组元素(非一个)
重点:使用coGroup()方法可以实现除了内连接以外的连接,也可以实现左外连接和右外连接
- 代码
- 代码
public class CoGroupTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Tuple2<String,Long>> stream1 = env.fromElements(
Tuple2.of("a", 1000L),
Tuple2.of("b", 1000L),
Tuple2.of("a", 2000L),
Tuple2.of("b", 2000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
return element.f1;
}
}));
SingleOutputStreamOperator<Tuple2<String,Integer>> stream2 = env.fromElements(
Tuple2.of("a", 3000),
Tuple2.of("b", 4000),
Tuple2.of("a", 4500),
Tuple2.of("b", 5500)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String,Integer>>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Integer>>() {
@Override
public long extractTimestamp(Tuple2<String, Integer> element, long recordTimestamp) {
return element.f1;
}
}));
stream1.coGroup(stream2)
.where(data->data.f0)
.equalTo(data->data.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new CoGroupFunction<Tuple2<String, Long>, Tuple2<String, Integer>, String>() {
@Override
public void coGroup(Iterable<Tuple2<String, Long>> first, Iterable<Tuple2<String, Integer>> second, Collector<String> out) throws Exception {
out.collect(first+"=>"+second);
}
}).print();
env.execute();
}
}
- 结果
[(a,1000), (a,2000)]=>[(a,3000), (a,4500)]
[(b,1000), (b,2000)]=>[(b,4000)]
[]=>[(b,5500)]