淘先锋技术网

首页 1 2 3 4 5 6 7

#flink连接kafka并读取数据

前提条件

flink版本1.11.2

直接上代码

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;

public class TestMain {
	protected static StreamExecutionEnvironment env;
	protected static StreamTableEnvironment tEnv;
	protected static int ff = 0;


	public static void main(String[] args) throws Exception {
		env = StreamExecutionEnvironment.getExecutionEnvironment();

		tEnv = StreamTableEnvironment.create(
			env,
			EnvironmentSettings.newInstance()
				// Watermark is only supported in blink planner
				.useBlinkPlanner()
				.inStreamingMode()
				.build()
		);
		env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
		env.enableCheckpointing(1000);
		env.setParallelism(1);
String createTable = String.format(
			//"CREATE TABLE UserScores (type STRING, orderNo STRING,productName STRING,money FLOAT,name STRING,zoneCode STRING,zoneName STRING)\n" +
			"CREATE TABLE UserScores (name STRING)\n" +
				"WITH (\n" +
				"  'connector' = 'kafka',\n" +
				"  'topic' = 'nima',\n" +
				"  'properties.bootstrap.servers' = '192.168.120.130:9092',\n" +
				"  'properties.group.id' = 'testGroup',\n" +
				"  'format' = 'json',\n" +
				//"  'scan.startup.timestamp-millis' = '1605147648000',\n" +
				// "  'csv.field-delimiter' = '\t',\n" +
				"  'scan.startup.mode' = 'group-offsets'\n" +
				")");


		TableResult tableResult = tEnv.executeSql(createTable);

		Table table = tEnv.sqlQuery("SELECT * FROM UserScores");


		DataStream<Row> infoDataStream1 = tEnv.toAppendStream(table, Row.class);
		infoDataStream1.print();
    }
}

读取hive的代码

具体说明参考官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html