sum
在对datastream keyby后使用sum函数聚合
package com.stanley.wordcount
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
/**
* Created by admin on 2020/7/2.
*/
object SumWordCount {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//并行度设置为1
env.setParallelism(1)
//读取文本流数据
val inputDataStream:DataStream[String] = env.socketTextStream("node1",9999)
val outputDataStream:DataStream[(String,Int)] = inputDataStream.flatMap(_.split(" "))
.map((_, 1))
.keyBy(0)
.sum(1)
outputDataStream.print("sum_wordcount")
env.execute("wc test")
}
}
processfunction
调用最底层processfunction,将count保存成一个keyedstate
package com.stanley.wordcount
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector
/**
* Created by admin on 2020/7/2.
*/
object ProcessWordCount {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val inputDataStream:DataStream[String] = env.socketTextStream("node1",9999)
val outputDataStraem:DataStream[(String,Int)] = inputDataStream
.flatMap(_.split(" "))
.map((_, 1))
.keyBy(0)
//调用新建的MyProcessFunction
.process(new MyProcessFunction)
outputDataStraem.print("process_wordcount")
env.execute("wc test")
}
}
class MyProcessFunction extends KeyedProcessFunction[Tuple,(String,Int),(String,Int)]{
//创建一个countState
private var countState:ValueState[Int] = _
override def open(parameters: Configuration): Unit = {
//初始化countState
countState = getRuntimeContext.getState[Int](new ValueStateDescriptor[Int]("count",classOf[Int]))
}
override def processElement(i: (String, Int), context: KeyedProcessFunction[Tuple, (String, Int), (String, Int)]#Context, collector: Collector[(String, Int)]): Unit = {
//取出count
var count = countState.value()
count+=1
//更新countState
countState.update(count)
collector.collect((i._1,count))
}
}
RichMapFunction
RichMapFunction和ProcessFunction一样都是实现了AbstractRichFunction,所以同样拥有生命周期方法和运行时上下文,以及keyed state
package com.stanley.wordcount
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector
/**
* Created by admin on 2020/7/2.
*/
object RichWordCount {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val inputDataStream:DataStream[String] = env.socketTextStream("node1",9999)
val outputDataStraem:DataStream[(String,Int)] = inputDataStream
.flatMap(_.split(" "))
.map((_, 1))
.keyBy(0)
//调用MyRichMapFunction
.map(new MyRichMapFunction)
outputDataStraem.print("rich_wordcount")
env.execute("wc test")
}
}
class MyRichMapFunction extends RichMapFunction[(String,Int),(String,Int)]{
private var countState:ValueState[Int] = _
override def open(parameters: Configuration): Unit = {
countState = getRuntimeContext.getState[Int](new ValueStateDescriptor[Int]("count",classOf[Int]))
}
override def map(in: (String,Int)): (String, Int) = {
var count = countState.value()
count+=1
countState.update(count)
(in._1,count)
}
}
总结
sum方法适合在比较简单的逻辑的计算中使用,ProcessFunction和RichMapFunction在实际应用环境中可以通过将状态保存到状态后端,如果出现故障通过checkpoint来恢复。