flink1.15新特性
以下是几个关键的变化领域,我将使用 Java 8 语法进行对比展示。
1. 批执行的明确性:BATCH 执行模式成为首选
在 Flink 1.15 之前,虽然已经引入了流批一体,但执行模式(ExecutionMode)的设置相对繁琐。新版本更加强调和简化了这一点。
旧方式 (Flink 1.14 及之前):
主要通过 ExecutionEnvironment(DataSet API)或配置 RuntimeExecutionMode 来处理批数据。
// 旧方式 - 使用已不推荐使用的 DataSet API (Java 8)
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> data = env.fromElements("a", "b", "c");
data.print();
新方式/最佳实践 (Flink 1.15+):
-
使用 DataStream API 处理有界流(批数据)。
-
通过
env.setRuntimeMode(RuntimeExecutionMode.BATCH)或在提交作业时通过命令行参数-Dexecution.runtime-mode=BATCH明确指定执行模式。BATCH模式会触发许多针对批处理的优化。
// 新方式 - 使用 DataStream API 处理批数据 (Java 8)
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 关键设置:明确指定为 BATCH 模式
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// 读取有界数据源(例如文件)作为流
DataStream<String> data = env.readTextFile("path/to/your/batch/data.txt");
data.print();
env.execute("Batch Job with DataStream API");
变化解读:
-
DataSet API已被标记为@Deprecated并计划移除。所有批处理任务都应迁移到 DataStream API 或 Table API。 -
明确设置
BATCH模式至关重要,因为 Flink 会为流和批应用不同的优化策略(例如,批处理下的 Join 和聚合操作会更高效)。
2. Sink API 的重构:全新且统一的 Sink
这是一个非常重要的变化。旧的 addSink 方法虽然灵活但过于底层且容易误用。Flink 1.15+ 引入了 Sink V2 API,提供了更统一、更安全、更易于实现恰好一次(exactly-once)语义的抽象。
旧方式 (使用旧的 SinkFunction):
// 旧方式 - 使用不推荐的自定义 SinkFunction (Java 8)
data.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value, Context context) throws Exception {
// 简单直接地输出,难以保证端到端的恰好一次语义
System.out.println(value);
}
});
新方式 (使用新的 Sink API):
新的 sinkTo 方法配合 Sink 实现(如 FileSink, KafkaSink, CassandraSink 等)。
// 新方式 - 使用新的 SinkTo API (Java 8)
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.parquet.avro.AvroParquetWriters;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
// 1. 定义一个新的 Sink,例如 FileSink
FileSink<String> fileSink = FileSink
.forRowFormat(new Path("output/path"), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(Duration.ofMinutes(15).toMillis())
.build()
)
.build();
// 2. 使用新的 sinkTo 方法
data.sinkTo(fileSink);
// 另一个例子:使用 KafkaSink
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers("kafka-broker:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("output-topic")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // 轻松设置交付保证
.setTransactionalIdPrefix("my-app-")
.build();
data.sinkTo(kafkaSink);
变化解读:
-
addSink(SinkFunction)已被标记为@Deprecated。 -
新的
SinkAPI (sinkTo(Sink)) 是推荐的做法。 -
新 API 将连接器属性、序列化器和交付保证(
DeliveryGuarantee)清晰地分离开,使得配置更加直观和安全。 -
它内置支持了端到端的恰好一次语义,与两阶段提交协议集成得更好。
3. Source API 的演进
与 Sink API 类似,Source API 也进行了现代化改造,但变化相对 Sink 较小。新的 Source API 在更早的版本(1.13+)就已引入,并在 1.15+ 后持续优化。
旧方式 (使用已弃用的 SourceFunction):
// 旧方式 - 已弃用
DataStream<String> stream = env.addSource(new SourceFunction<String>() {
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (isRunning) {
// ... 产生数据
}
}
@Override
public void cancel() {
isRunning = false;
}
});
新方式 (使用新的 Source 接口):
新的 API 更模块化,支持更复杂的特性如动态分区发现、水印对齐等。
// 新方式 - 使用新的 Source 实现 (例如 Kafka)
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("kafka-broker:9092")
.setTopics("input-topic")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStream<String> data = env.fromSource(
source,
WatermarkStrategy.noWatermarks(), // 指定水印策略
"Kafka Source"
);
变化解读:
-
addSource(SourceFunction)已被标记为@Deprecated。 -
推荐使用
env.fromSource(Source, WatermarkStrategy, String)方法。 -
新的
SourceAPI 提供了更好的可扩展性和监控支持。
4. Table API & SQL 的持续增强
Table API 和 SQL 是 Flink 流批一体的核心,每个版本都在不断增强。虽然语法本身变化不大,但其在架构中的地位空前提高。
-
Catalogs 和 Hive 集成:变得更稳定和成熟,是生产环境管理元数据的标准方式。
-
CDC (Change Data Capture) 集成:对 MySQL, PostgreSQL 等数据库的 CDC 连接器支持越来越好,使得分析实时数据库日志变得非常简单。
-
窗口 TVF (Table-Valued Functions):语法更加标准化(遵循 SQL 标准),取代了旧的
Grouped Window语法。
旧窗口语法 (已过时):
-- 旧 Group Window 语法 (已过时)
SELECT
TUMBLE_START(ts, INTERVAL '1' HOUR),
user,
COUNT(url)
FROM clicks
GROUP BY TUMBLE(ts, INTERVAL '1' HOUR), user
新窗口 TVF 语法 (推荐):
-- 新 Window TVF 语法 (推荐)
SELECT
window_start,
window_end,
user,
COUNT(url)
FROM TABLE(
TUMBLE(TABLE clicks, DESCRIPTOR(ts), INTERVAL '1' HOUR)
)
GROUP BY window_start, window_end, user
变化解读:
-
强烈建议所有新项目优先考虑使用 Table API / SQL 进行开发,特别是批处理任务。
-
新的 TVF 语法更强大、更符合标准,是未来的方向。
总结与建议
| 领域 | 旧方式 (已弃用/不推荐) | 新方式/最佳实践 (Flink 1.15+) |
|---|---|---|
| 批处理 | ExecutionEnvironment + DataSet API |
StreamExecutionEnvironment + setRuntimeMode(BATCH) |
| Sink | addSink(SinkFunction) |
sinkTo(Sink) (e.g., FileSink, KafkaSink) |
| Source | addSource(SourceFunction) |
fromSource(Source, WatermarkStrategy, ...) |
| 统一API | 流和批两套API (DataStream/DataSet) |
一套 DataStream API 或 Table API / SQL |
给你的建议:
-
新项目:直接使用 DataStream API (BATCH/STREAMING 模式) 和 新的
Sink/SourceAPI。对于ETL或分析类任务,首选 Table API / SQL。 -
老项目迁移:制定计划,将旧的
DataSet程序、SourceFunction和SinkFunction逐步迁移到新的 API 上,为未来版本升级扫清障碍。 -
关注执行模式:处理有界数据源时,务必显式设置
env.setRuntimeMode(RuntimeExecutionMode.BATCH)以获得最佳性能。
这些变化标志着 Flink 在 API 层面已经真正成熟和统一,为未来的稳定发展奠定了坚实的基础。
https://blog.csdn.net/qq_34295546/article/details/124847880

浙公网安备 33010602011771号