flink1.15新特性

以下是几个关键的变化领域,我将使用 Java 8 语法进行对比展示。


1. 批执行的明确性:BATCH 执行模式成为首选

在 Flink 1.15 之前,虽然已经引入了流批一体,但执行模式(ExecutionMode)的设置相对繁琐。新版本更加强调和简化了这一点。

旧方式 (Flink 1.14 及之前):
主要通过 ExecutionEnvironment(DataSet API)或配置 RuntimeExecutionMode 来处理批数据。

java
 
// 旧方式 - 使用已不推荐使用的 DataSet API (Java 8)
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> data = env.fromElements("a", "b", "c");
data.print();

新方式/最佳实践 (Flink 1.15+):

  1. 使用 DataStream API 处理有界流(批数据)。

  2. 通过 env.setRuntimeMode(RuntimeExecutionMode.BATCH) 或在提交作业时通过命令行参数 -Dexecution.runtime-mode=BATCH 明确指定执行模式。BATCH 模式会触发许多针对批处理的优化。

java
 
// 新方式 - 使用 DataStream API 处理批数据 (Java 8)
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 关键设置:明确指定为 BATCH 模式
env.setRuntimeMode(RuntimeExecutionMode.BATCH);

// 读取有界数据源(例如文件)作为流
DataStream<String> data = env.readTextFile("path/to/your/batch/data.txt");

data.print();
env.execute("Batch Job with DataStream API");

变化解读:

  • DataSet API 已被标记为 @Deprecated 并计划移除。所有批处理任务都应迁移到 DataStream API 或 Table API。

  • 明确设置 BATCH 模式至关重要,因为 Flink 会为流和批应用不同的优化策略(例如,批处理下的 Join 和聚合操作会更高效)。


2. Sink API 的重构:全新且统一的 Sink

这是一个非常重要的变化。旧的 addSink 方法虽然灵活但过于底层且容易误用。Flink 1.15+ 引入了 Sink V2 API,提供了更统一、更安全、更易于实现恰好一次(exactly-once)语义的抽象。

旧方式 (使用旧的 SinkFunction):

java
 
// 旧方式 - 使用不推荐的自定义 SinkFunction (Java 8)
data.addSink(new SinkFunction<String>() {
    @Override
    public void invoke(String value, Context context) throws Exception {
        // 简单直接地输出,难以保证端到端的恰好一次语义
        System.out.println(value);
    }
});

新方式 (使用新的 Sink API):
新的 sinkTo 方法配合 Sink 实现(如 FileSinkKafkaSinkCassandraSink 等)。

java
 
// 新方式 - 使用新的 SinkTo API (Java 8)
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.parquet.avro.AvroParquetWriters;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

// 1. 定义一个新的 Sink,例如 FileSink
FileSink<String> fileSink = FileSink
    .forRowFormat(new Path("output/path"), new SimpleStringEncoder<String>("UTF-8"))
    .withRollingPolicy(
        DefaultRollingPolicy.builder()
            .withRolloverInterval(Duration.ofMinutes(15).toMillis())
            .build()
    )
    .build();

// 2. 使用新的 sinkTo 方法
data.sinkTo(fileSink);


// 另一个例子:使用 KafkaSink
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;

KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
    .setBootstrapServers("kafka-broker:9092")
    .setRecordSerializer(KafkaRecordSerializationSchema.builder()
        .setTopic("output-topic")
        .setValueSerializationSchema(new SimpleStringSchema())
        .build()
    )
    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // 轻松设置交付保证
    .setTransactionalIdPrefix("my-app-")
    .build();

data.sinkTo(kafkaSink);

变化解读:

  • addSink(SinkFunction) 已被标记为 @Deprecated

  • 新的 Sink API (sinkTo(Sink)) 是推荐的做法。

  • 新 API 将连接器属性、序列化器和交付保证(DeliveryGuarantee)清晰地分离开,使得配置更加直观和安全。

  • 它内置支持了端到端的恰好一次语义,与两阶段提交协议集成得更好。


3. Source API 的演进

与 Sink API 类似,Source API 也进行了现代化改造,但变化相对 Sink 较小。新的 Source API 在更早的版本(1.13+)就已引入,并在 1.15+ 后持续优化。

旧方式 (使用已弃用的 SourceFunction):

java
 
// 旧方式 - 已弃用
DataStream<String> stream = env.addSource(new SourceFunction<String>() {
    private volatile boolean isRunning = true;
    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (isRunning) {
            // ... 产生数据
        }
    }
    @Override
    public void cancel() {
        isRunning = false;
    }
});

新方式 (使用新的 Source 接口):
新的 API 更模块化,支持更复杂的特性如动态分区发现、水印对齐等。

java
 
// 新方式 - 使用新的 Source 实现 (例如 Kafka)
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;

KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers("kafka-broker:9092")
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

DataStream<String> data = env.fromSource(
    source,
    WatermarkStrategy.noWatermarks(), // 指定水印策略
    "Kafka Source"
);

变化解读:

  • addSource(SourceFunction) 已被标记为 @Deprecated

  • 推荐使用 env.fromSource(Source, WatermarkStrategy, String) 方法。

  • 新的 Source API 提供了更好的可扩展性和监控支持。


4. Table API & SQL 的持续增强

Table API 和 SQL 是 Flink 流批一体的核心,每个版本都在不断增强。虽然语法本身变化不大,但其在架构中的地位空前提高。

  • Catalogs 和 Hive 集成:变得更稳定和成熟,是生产环境管理元数据的标准方式。

  • CDC (Change Data Capture) 集成:对 MySQL, PostgreSQL 等数据库的 CDC 连接器支持越来越好,使得分析实时数据库日志变得非常简单。

  • 窗口 TVF (Table-Valued Functions):语法更加标准化(遵循 SQL 标准),取代了旧的 Grouped Window 语法。

旧窗口语法 (已过时):

sql
 
-- 旧 Group Window 语法 (已过时)
SELECT
  TUMBLE_START(ts, INTERVAL '1' HOUR),
  user,
  COUNT(url)
FROM clicks
GROUP BY TUMBLE(ts, INTERVAL '1' HOUR), user

新窗口 TVF 语法 (推荐):

sql
 
-- 新 Window TVF 语法 (推荐)
SELECT
  window_start,
  window_end,
  user,
  COUNT(url)
FROM TABLE(
  TUMBLE(TABLE clicks, DESCRIPTOR(ts), INTERVAL '1' HOUR)
)
GROUP BY window_start, window_end, user

变化解读:

  • 强烈建议所有新项目优先考虑使用 Table API / SQL 进行开发,特别是批处理任务。

  • 新的 TVF 语法更强大、更符合标准,是未来的方向。

总结与建议

领域旧方式 (已弃用/不推荐)新方式/最佳实践 (Flink 1.15+)
批处理 ExecutionEnvironment + DataSet API StreamExecutionEnvironment + setRuntimeMode(BATCH)
Sink addSink(SinkFunction) sinkTo(Sink) (e.g., FileSinkKafkaSink)
Source addSource(SourceFunction) fromSource(Source, WatermarkStrategy, ...)
统一API 流和批两套API (DataStream/DataSet) 一套 DataStream API 或 Table API / SQL

给你的建议:

  1. 新项目:直接使用 DataStream API (BATCH/STREAMING 模式) 和 新的 Sink/Source API。对于ETL或分析类任务,首选 Table API / SQL。

  2. 老项目迁移:制定计划,将旧的 DataSet 程序、SourceFunction 和 SinkFunction 逐步迁移到新的 API 上,为未来版本升级扫清障碍。

  3. 关注执行模式:处理有界数据源时,务必显式设置 env.setRuntimeMode(RuntimeExecutionMode.BATCH) 以获得最佳性能。

这些变化标志着 Flink 在 API 层面已经真正成熟和统一,为未来的稳定发展奠定了坚实的基础。

 

https://blog.csdn.net/qq_34295546/article/details/124847880

posted @ 2025-09-03 11:18  ---江北  阅读(44)  评论(1)    收藏  举报
TOP