flink不同环境切换

代码:

package com.yourcompany.flink;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Properties;

/**
* 最简版 - 所有代码在一个文件中
*/
public class MinimalFlinkJob {

public static void main(String[] args) throws Exception {
// 1. 检查环境参数
String env = System.getProperty("env");
if (env == null) {
System.err.println("错误: 使用-Denv=dev|test|prod");
System.exit(1);
}

// 2. 加载配置
Properties config = loadConfig(env);

// 3. 创建Flink环境
StreamExecutionEnvironment flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment();
flinkEnv.setParallelism(Integer.parseInt(config.getProperty("parallelism", "1")));

// 4. 执行作业
System.out.println("启动Flink作业,环境: " + env);
flinkEnv.execute("Job-" + env);
}

private static Properties loadConfig(String env) {
Properties props = new Properties();
try {
props.load(MinimalFlinkJob.class.getClassLoader()
.getResourceAsStream("config/application-" + env + ".properties"));
} catch (Exception e) {
System.err.println("加载配置失败: " + e.getMessage());
System.exit(1);
}
return props;
}
}

 

工具类:

public class EnvironmentConfig {
private static final String DEFAULT_ENV = "dev";
private Properties properties;

public EnvironmentConfig() {
String env = System.getProperty("env", DEFAULT_ENV);
loadProperties(env);
}

private void loadProperties(String env) {
properties = new Properties();
try {
String configFile = "application-" + env + ".properties";
InputStream inputStream = getClass().getClassLoader()
.getResourceAsStream(configFile);
if (inputStream != null) {
properties.load(inputStream);
}
} catch (IOException e) {
throw new RuntimeException("Failed to load config for env: " + env, e);
}
}

public String getProperty(String key) {
return properties.getProperty(key);
}
}

部署时使用参数:

# 提交到Flink集群
flink run -d \
-c com.yourcompany.flink.SimpleFlinkJob \
-Denv=prod \
-Dparallelism=8 \
your-job.jar

posted @ 2025-09-24 17:22  ---江北  阅读(7)  评论(0)    收藏  举报
TOP