Flink连接Kafka的Scala代码
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.11.0</version>
</dependency>
</dependencies>
package flink
import java.util.Properties
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
object flink_kafka {
def main(args: Array[String]): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "10.21.13.181:9092")
properties.setProperty("zookeeper.connect", "10.21.13.181:2181")
properties.setProperty("group.id", "consume_id")
val kafkaSource = new FlinkKafkaConsumer[String]("test", new SimpleStringSchema(), properties)
val tesDS: DataStream[String] = env.addSource(kafkaSource)
tesDS.flatMap(x=>x.split(" ")).map((_,1)).keyBy(0).sum(1).print()
env.execute("job")
}
}