flink 环境构建工具类
public class ExecutionEnvUtil {
/**
* 从配置文件中读取配置(生效优先级:配置文件<命令行参数<系统参数)
*
* @param args
* @return org.apache.flink.api.java.utils.ParameterTool
* @date 2023/8/4 - 10:05 AM
*/
public static ParameterTool createParameterTool(final String[] args) throws Exception {
return ParameterTool
.fromPropertiesFile(ExecutionEnvUtil.class.getResourceAsStream(BaseConstants.PROPERTIES_FILE_NAME))
.mergeWith(ParameterTool.fromArgs(args))
.mergeWith(ParameterTool.fromSystemProperties());
}
/**
* flink 环境配置
*
* @param parameterTool
* @return org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
* @date 2023/8/4 - 11:10 AM
*/
public static StreamExecutionEnvironment prepare(ParameterTool parameterTool) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(parameterTool.getInt(PropertiesConstants.STREAM_PARALLELISM, 12));
env.getConfig();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, Time.seconds(60)));
if (parameterTool.getBoolean(PropertiesConstants.STREAM_CHECKPOINT_ENABLE, true)) {
CheckPointUtil.setCheckpointConfig(env,parameterTool);
// 取消作业时保留外部化 Checkpoint 数据
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
}
env.getConfig().setGlobalJobParameters(parameterTool);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
return env;
}
}
checkpoint 工具类
public class CheckPointUtil {
private static final String CHECKPOINT_MEMORY = "memory";
private static final String CHECKPOINT_FS = "fs";
private static final String CHECKPOINT_ROCKETSDB = "rocksdb";
/**
* 默认的checkpoint 存储地址
*/
private static final String CHECKPOINT_DEFAULT = "default";
/**
* 设置flink check point
*
* @param env
* @param parameterTool
* @return org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
* @date 2023/8/4 - 10:49 AM
*/
public static StreamExecutionEnvironment setCheckpointConfig(StreamExecutionEnvironment env, ParameterTool parameterTool) throws Exception{
// 根据类型,设置合适的状态后端
String stateBackendType = parameterTool.get(PropertiesConstants.STREAM_CHECKPOINT_TYPE, CHECKPOINT_DEFAULT);
if (CHECKPOINT_MEMORY.equalsIgnoreCase(stateBackendType)) {
//1、state 存放在内存中,默认是 5M
StateBackend stateBackend = new MemoryStateBackend(5 * 1024 * 1024 * 100);
env.setStateBackend(stateBackend);
}
else if (CHECKPOINT_FS.equalsIgnoreCase(stateBackendType)) {
StateBackend stateBackend = new FsStateBackend(new URI(parameterTool.get(PropertiesConstants.STREAM_CHECKPOINT_DIR)), 0, true);
env.setStateBackend(stateBackend);
}
else if (CHECKPOINT_ROCKETSDB.equalsIgnoreCase(stateBackendType)) {
RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(parameterTool.get(PropertiesConstants.STREAM_CHECKPOINT_DIR), true);
env.setStateBackend(rocksDBStateBackend);
}
//设置 checkpoint 周期时间
env.enableCheckpointing(parameterTool.getLong(PropertiesConstants.STREAM_CHECKPOINT_INTERVAL, 60000));
//高级设置(这些配置也建议写成配置文件中去读取,优先环境变量)
// 设置 exactly-once 模式
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 设置 checkpoint 最小间隔 500 ms
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2*60000);
// 设置 checkpoint 必须在n分钟内完成,否则会被丢弃
env.getCheckpointConfig().setCheckpointTimeout(15*60000);
// 设置 checkpoint 失败时,任务不会 fail,可容忍3次连续失败
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
// 设置 checkpoint 的并发度为 1
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
return env;
}
}
构建kafak source 、sink
/**
* 构建 source kafka
*
* @param parameterTool
* @return org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer<java.lang.String>
* @date 2023/8/4 - 2:41 PM
*/
private static FlinkKafkaConsumer<String> buildSourceKafka(ParameterTool parameterTool){
Properties props = KafkaConfigUtil.buildSourceKafkaProps(parameterTool);
// 正则表达式消费
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
Pattern.compile(parameterTool.get(PropertiesConstants.KAFKA_SOURCE_TOPIC)),
new SimpleStringSchema(),
props);
kafkaConsumer.setCommitOffsetsOnCheckpoints(true);
// 从最开始的位置开始消费
if(parameterTool.getBoolean(PropertiesConstants.KAFKA_START_FROM_FIRST, false)){
kafkaConsumer.setStartFromEarliest();
}else{
kafkaConsumer.setStartFromGroupOffsets();
}
return kafkaConsumer;
}
/**
* 构建 sink kafka
*
* @param parameterTool
* @return org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer<com.alibaba.fastjson.JSONObject>
* @date 2023/8/16 - 11:38 AM
*/
private static FlinkKafkaProducer<JSONObject> buildSinkKafka(ParameterTool parameterTool){
Properties props = KafkaConfigUtil.buildSinkKafkaProps(parameterTool);
return new FlinkKafkaProducer<>(parameterTool.get(PropertiesConstants.KAFKA_SINK_DEFAULT_TOPIC)
, (KafkaSerializationSchema<JSONObject>) (element, timestamp) ->
new ProducerRecord<>(element.getString(BaseConstants.PARAM_LOG_TYPE), element.toJSONString().getBytes())
,props, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
}
kafka 工具类
public class KafkaConfigUtil {
/**
* 设置 kafka 配置
*
* @param parameterTool
* @return java.util.Properties
* @date 2023/8/4 - 2:39 PM
*/
public static Properties buildSourceKafkaProps(ParameterTool parameterTool) {
Properties props = parameterTool.getProperties();
props.put("bootstrap.servers", parameterTool.get(PropertiesConstants.KAFKA_BROKERS, DEFAULT_KAFKA_BROKERS));
props.put("group.id", parameterTool.get(PropertiesConstants.KAFKA_GROUP_ID, DEFAULT_KAFKA_GROUP_ID));
props.put("flink.partition-discovery.interval-millis", "10000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
//0817 - 消费kafka数据超时时间和尝试次数
props.put("request.timeout.ms", "30000");
props.put("retries", 5);
return props;
}
/**
* 构建 sink kafka 配置
*
* @param parameterTool
* @return java.util.Properties
* @date 2023/8/14 - 5:54 PM
*/
public static Properties buildSinkKafkaProps(ParameterTool parameterTool) {
Properties props = parameterTool.getProperties();
props.put("bootstrap.servers", parameterTool.get(PropertiesConstants.KAFKA_SINK_BROKERS));
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
props.setProperty(ProducerConfig.RETRIES_CONFIG, "5");
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "300");
return props;
}
}
jdbc 工具类
public class JdbcDatasourceUtils {
public static volatile Map<String, HikariDataSource> DATASOURCES = new ConcurrentHashMap<>();
/**
* 获取hikari数据库链接池
*
* @param jdbcUrl
* @param dsUname
* @param dsPwd
* @param dsDriver
* @return com.zaxxer.hikari.HikariDataSource
* @date 2023/8/9 - 2:23 PM
*/
public static HikariDataSource getHikariDataSource(String jdbcUrl, String dsUname, String dsPwd, String dsDriver) {
String md5Key = Md5Util.encrypt(jdbcUrl + " " + dsUname + " " + dsPwd + " " + dsDriver);
if (!DATASOURCES.containsKey(md5Key)) {
synchronized (JdbcDatasourceUtils.class) {
if (!DATASOURCES.containsKey(md5Key)) {
DATASOURCES.put(md5Key, createHikariDataSource(jdbcUrl, dsUname, dsPwd, dsDriver));
}
}
}
return DATASOURCES.get(md5Key);
}
/**
* 构建hikari数据库链接池
*
* @param jdbcUrl
* @param dsUname
* @param dsPwd
* @param dsDriver
* @return com.zaxxer.hikari.HikariDataSource
* @date 2023/8/9 - 2:14 PM
*/
private static HikariDataSource createHikariDataSource(String jdbcUrl, String dsUname, String dsPwd, String dsDriver) {
HikariConfig config = new HikariConfig();
config.setJdbcUrl(jdbcUrl);
config.setUsername(dsUname);
config.setPassword(dsPwd);
config.setDriverClassName(dsDriver);
// 从池返回的连接的默认自动提交,默认值:true
config.setAutoCommit(true);
//只读
config.setReadOnly(true);
// 连接超时时间:毫秒,默认值30秒
config.setConnectionTimeout(10000);
// 最大连接数
config.setMaximumPoolSize(32);
// 最小空闲连接
config.setMinimumIdle(16);
// 空闲连接超时时间
config.setIdleTimeout(600000);
// 连接最大存活时间
config.setMaxLifetime(540000);
// 连接测试查询
config.setConnectionTestQuery("SELECT 1");
return new HikariDataSource(config);
}
/**
* 按列加载数据
*
* @param dataSource
* @param sql
* @return java.util.List<java.util.Map<java.lang.String,java.lang.Object>>
* @date 2023/8/15 - 6:03 PM
*/
public static List<Map<String, Object>> loadDatas(HikariDataSource dataSource, String sql) {
return loadSql(dataSource, sql, resultSet -> {
List<Map<String, Object>> datas = new ArrayList<>();
try {
if (null == resultSet){
return datas;
}
ResultSetMetaData metaData = resultSet.getMetaData();
//组装返回值
Map<String, Object> entry;
while (resultSet.next()) {
entry = new LinkedHashMap<>();
// getColumnLabel 取重命名,getColumnName 原始字段名
for (int i = 1; i <= metaData.getColumnCount(); i++) {
entry.put(metaData.getColumnLabel(i), resultSet.getObject(i));
}
datas.add(entry);
}
} catch (Exception e) {
e.printStackTrace();
}
return datas;
});
}
/**
* 加载数据遍历放入set集合
*
* @param dataSource
* @param sql
* @param function
* @return java.util.Set<R>
* @date 2023/8/15 - 6:03 PM
*/
public static <R> Set<R> loadSetDatas(HikariDataSource dataSource, String sql, Function<Object, R> function) {
return loadSql(dataSource, sql, resultSet -> {
Set<R> datas = new LinkedHashSet<>();
try {
if (null == resultSet){
return datas;
}
ResultSetMetaData metaData = resultSet.getMetaData();
while (resultSet.next()) {
for (int i = 1; i <= metaData.getColumnCount(); i++) {
datas.add(function.apply(resultSet.getObject(i)));
}
}
} catch (Exception e) {
e.printStackTrace();
}
return datas;
});
}
/**
* 执行查询sql
*
* @param dataSource
* @param sql
* @param function
* @return R
* @date 2023/8/15 - 6:03 PM
*/
private static <R> R loadSql(HikariDataSource dataSource, String sql, Function<ResultSet, R> function) {
Connection connection = null;
PreparedStatement preparedStatement = null;
ResultSet resultSet = null;
try {
connection = dataSource.getConnection();
preparedStatement = connection.prepareStatement(sql);
resultSet = preparedStatement.executeQuery();
return function.apply(resultSet);
} catch (Exception e){
e.printStackTrace();
} finally {
if (connection != null){
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (preparedStatement != null){
try {
preparedStatement.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (resultSet != null){
try {
resultSet.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
return function.apply(null);
}
}