[Flink] Apache Flink 概述

0 缘起

  • 工作中接触和使用Flink,也有2年多了。是时候沉淀些这方面的实践总结了。

1 概述:Apache Flink 的定义、架构及原理

image

  • Flink 官方的定义
  • Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams.
  • Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.
  • Apache Flink是一个①用于对无界有界数据流进行【状态计算】的②框架和分布式处理引擎;其应用于【分布式】、【高性能】、【高可用】的【数据流应用程序】。
  • Flink 被设计为在所有常见的集群环境中运行,以内存中的速度和任何规模执行计算。
  • 总结:
  • Apache Flink 是一个分布式大数据处理引擎,可对有限数据流和无限数据流进行有状态或无状态的计算,能够部署在各种集群环境,对各种规模大小的数据进行快速计算。
  • Flink 是一个以 为核心的高可用高性能分布式计算引擎
  • 其具备 流批一体高吞吐低延迟容错能力大规模复杂计算等特点,在数据流上提供 数据分发、通信等功能。
Flink可以处理【有限数据流】和【无限数据】,即能够处理有边界和无边界的数据流。 
  a、【无边界的数据流】就是真正意义上的【流数据】。所以,Flink是支持流计算的。 
  b、【有边界的数据流】就是【批数据】。所以,也支持批处理的。 

不过Flink在流处理上的应用比在批处理上的应用更加广泛;【统一批处理】和【流处理】也是Flink目标之一。
Flink可以部署在各种集群环境,可以对各种大小规模的数据进行快速计算。 


随着大数据技术在各行各业的广泛应用,要求能对海量数据进行实时处理的需求越来越多,同时数据处理的业务逻辑也越来越复杂,【传统的批处理方式】和【早期的流式处理框架】也越来越难以在【延迟性】、【吞吐量】、【容错能力】以及【使用便捷性】等方面满足业务日益苛刻的要求。
  其中【流式计算】的典型代表是Storm和Flink技术。
  它们数据处理的延迟都是【亚秒级低延迟】,但是Flink相比Storm还有其他的一些优势,比如支持exactly once语义,确保数据不会重复。
  Storm支持at least once语义,保证数据不会丢失。
  保证数据不会重复的代价很高,比如数据下游操作属于幂等操作。
  另外从测试结果来看,Flink在【低延迟】的基础上还能保证高吞吐,优势明显。 

在这种形势下,新型流式处理框架Flink通过创造性地把现代大规模并行处理技术应用到流式处理中来,极大地改善了以前的流式处理框架所存在的问题。 

1.2 计算引擎的发展沿革

  • 大数据计算引擎分为【离线计算】和【实时计算】。
  • 【离线计算】就是我们通常说的【批计算】,代表是Hadoop MapReduceHive等大数据技术。
  • 【实时计算】也被称作【流计算】,代表是StormSpark StreamingFlink等大数据技术。

【计算引擎】也在不断更新迭代,下图展示的是每一代计算引擎的代表,从第1代的Hadoop MapReduce,到第2代的Tez、第3代的Spark,再到第4代的Flink技术,从【批处理】(MR)到【微批】(Spark),再到真正的【流式计算】(Flink)。

image

  • 计算引擎的发展代际
  • 第1代:以hadoop承载的Map Reduce为标志,开启了大数据处理的先河。

mapreduce将处理过程分为map和reduce两个阶段,对于复杂应用,需要拆分为多个mapreduce的job, 并有应用层去管理各个job的依赖关系。

  • 第2代:以支持DAG框架为标志,也是以批处理为主,具体如Tez,以及更上层的Oozie。
  • 第3代:以spark为代表,开启了【内存计算】的时代,支持job内部DAG管理,同时支持流处理、批处理,并提供SQL高层API支持。
  • 第4代:本系列要介绍的Flink,主要是由于Flink对流计算的支持。
  • Flink 于2014年12月成为Apache软件基金会的顶级项目
  • 2015年9月发布第一个稳定版本0.9
  • 2020年12月7日发布Flink 1.12.0版本
  • 至2024年3月15日已发布Flink 1.19.0,社区非常活跃。

全球有越来越多的公司在使用Flink, 国内主流互联网公司都在大规模使用Flink作为企业分布式大数据处理引擎
Flink之所以如此受到青睐,除了其提供【高吞吐】、【低延迟】和【Exactly-once一致性语义】支持外,更重要的是它能以【流数据的处理方式】来处理【批数据】,可以真正意义上实现【批流处理的统一】。
正是由于这些特性,使Flink逐渐成为主流的大数据处理框架,并将在不久的将来成为下一代大数据处理的标准

实时计算

  • 实时计算是相对【离线计算】的概念,重要是【时效性】。

举个例子,我们知道【离线计算】通常是【天级别】的计算任务,比如统计一天的新增用户,商品销量,销售收入等。
但是【实时计算】是只要【有事件发生】,统计结果就会发生变化,比如有一个新用户注册登录了,那么我们的新增用户数就发生了变化,商品只要新增一个销售,销量就会发生变化,销售收入也会变化。
所以,【实时计算】让我们能更及时了解我们的现状,以及根据实时的统计结果做出决策,决策也更加具有时效性。

1.3 Flink的发展历程

1.3.1 时间脉络

  • 诞生于2009年,原来叫Strato Sphere,是柏林工业大学的一个大数据研究性项目中的博士生开发,早期专注于【批计算】。

image

  • 2014年,孵化出Flink并捐给Apache
  • 2014年8月,Flink第1个版本0.6正式发布,与此同时Flink的几位核心开发者创办了Data Artisans公司。
  • 2014 年:Flink 0.6.0 版本发布,标志着 Flink 的正式推出
  • 同年,Flink 0.7 版本发布,推出了 DataStream API,这是目前 Flink 应用最广泛的 API。
  • 2014年12月,Flink项目完成【孵化】。
  • 2015年,Flink开始崭露头角,引起大家注意。
  • Flink 0.9 版本引入了内置的 State 支持和 Global Checkpoint 机制,解决了【流计算系统】中【状态管理】和【一致性】的问题。
    它不仅是一个【高吞吐】,【低延迟】的计算引擎,同时还提供【有状态的计算】,支持【状态管理】,支持【强一致性的语义】,以及支持EventTimewaterMark 对【消息乱序】的处理。
    这也是阿里看上Flink的原因,并决心投入重金研究基于Flink的blink
  • 2015年4月,Flink发布了里程碑式的重要版本0.9.0。
  • 2016年,Flink在阿里得到大规模应用。

  • 2019年1月,长期对Flink投入研发的阿里巴巴,以9000万欧元的价格收购了Data Artisans公司。
  • 2019年8月,阿里巴巴将内部版本Blink开源,合并入Flink 1.9.0版本。

1.9.0版本之后,Flink的能力被极大丰富和改变。

  • 2020年12月7日,发布 Flink 1.12.0 版本
  • 2022年6月17日,发布 Flink 1.15.0 版本
  • 至2024年3月15日,已发布Flink 1.19.0,社区非常活跃。

1.3.2 关键版本功能

  • Flink 0.6.0 版本,于2014年发布,标志着 Flink 的正式推出

  • Flink 0.7 版本,于2014年发布,推出了 DataStream API,这是目前 Flink 应用最广泛的 API。

  • Flink 1.0:引入了基于【事件时间】的计算支持和 Watermark 机制、窗口机制,能够高效处理乱序和迟到数据。

还内置了各种窗口支持,如滚动窗口、滑动窗口和会话窗口。

  • Flink 1.9:进行了较大的架构调整Table APIDataStream API 成为同级 API,并引入了统一的 Catalog APIBlink planner,增强了 SQL 的执行能力。

1.3.3 阿里巴巴的贡献

  • 阿里巴巴在 2015 年开始使用 Flink,并对其进行了大量改进,包括重构分布式架构、引入增量 Checkpoint 机制和基于信用的流控机制。
  • 阿里还在 2019 年将内部的 Blink SQL 贡献给了 Flink 社区,推动了 Flink SQL 的发展。

1.3.4 全球化和未来发展

  • Flink 已成为全球范围内实时流计算的事实标准,吸引了众多企业和开发者的参与。
  • 阿里巴巴和 Ververica 主导了 Flink 的许多核心改进,并推出了全球统一的 Flink 企业版平台 Ververica Platform
  • 未来,Flink 将继续在流批一体流式数仓方向上发展,进一步提升实时计算的能力。

image

1.3.X 推荐文献

1.4 Flink特征

  • 批处理和流处理
  • 流数据更真实地反映了我们的生活方式
  • Flink 的目标
  • 低延迟
  • 高吞吐
  • 结果的准确性和良好的容错性

1. 一切皆为流 => 事件驱动应用

  • 事件驱动应用(Event-driven Applications)

2.正确性保证 | 高容错性/准确性

  • 精确一次状态一致性(Exactly-once state consistency)
  • 基于event time+watermarker,仅处理一次的容错担保
  • 事件-时间处理(Event-time processing)

  • 高超的最近(后期)数据处理(Sophisticated late data handling)

3. 易用性(部署运维)

  • 部署灵活(Flexible deployment)
  • 高可用安装/配置(High-availability setup)
  • 保存点机制(Savepoints) / 检查点机制(Checkpoint)
  • 易用的分层API(Layered API,具体参见:本文档对应专门章节)
  • 语义化窗口:在不同的时间语义(事件时间,摄取时间、处理时间)下支持灵活的窗口(时间,滑动、 翻滚,会话),自定义触发器

4.可扩展性

  • 可扩展架构(Scale-out architecture)
  • 大量状态的支持(Support for very large state)
  • 增量检查点(Incremental checkpointing)

5. 高性能(时效性)

  • 低延迟(Low latency) *
  • 高吞吐量(High throughput) *
  • 内存计算(In-Memory computing): 高效的自定义内存管理,和健壮的切换能力在 in-memoryout-of-core
  • 支持非常大的状态

6. 计算模型

1) 支持批处理

2) 支持流处理

  • 所有流式传输用例
  • 事件驱动的应用程序
  • 流和批处理分析
  • 数据管道和 ETL
  • 支持基于DataFlow模型、SataStream API的事件处理和无序处理

7.扩展性/兼容性

  • 语言支持:优雅流畅地支持java、scala
  • 集成YARN/HDFS/Hbase、MR/Storm和其它hadoop生态组件+丰富的connector
  • 横向扩展架构

8.其他

  • 图处理(批)
  • 机器学习(批)
  • 复杂事件处理(流)
  • 自动背压机制,下游对上游的反压
  • 在dataSet(批处理)API中内置支持迭代程序(BSP)
  • 数据流:所有产生的 【数据】 都天然带有 【时间】概念,把 【事件】 按照时间顺序排列起来,就形成了一个【事件流】,也被称作【数据流】。

  • 流批一体

首先,必须先明白什么是 有界数据无界数据

image

  • 有界数据:就是在一个确定的时间范围内的数据流,有开始、有结束,一旦确定就不会再改变,一般 批处理 用来处理有界数据。(如上图的 bounded stream)
  • 无界数据:就是持续产生的数据流,数据是无限的,有开始、无结束,一般 流处理 用来处理无界数据。(如图 unbounded stream)
  • 容错能力:在分布式系统中,硬件故障、进程异常、应用异常、网络故障等异常无处不在,Flink 引擎必须保证故障发生后 不仅可以 重启应用程序,还要 确保其内部状态保持一致,从最后一次正确的时间点重新出发。
  • Flink 提供 集群级容错 和 应用级容错 能力:
  • 集群级容错: Flink 与 集群管理器 紧密连接,如 YARN、Kubernetes,当进程挂掉后,自动重启新进程接管之前的工作。同时具备 高可用性 ,可消除所有单点故障,
  • 应用级容错:Flink 使用 轻量级分布式快照,设计检查点(checkpoint)实现可靠容错。

Flink 利用检查点特性,在框架层面提供 Exactly-once 语义,即端到端的一致性,确保数据仅处理一次,不会重复也不会丢失,即使出现故障,也能保证数据只写一次。

  • 关键版本和功能
  • Flink 1.0:引入了基于事件时间的计算支持和 Watermark 机制,能够高效处理乱序和迟到数据。还内置了各种窗口支持,如滚动窗口、滑动窗口和会话窗口。

image

  • Flink 和 Spark Sreaming 最大的区别在于:
  • Flink 是标准的实时处理引擎,基于事件驱动,以为核心;
  • 而 Spark Streaming 的 RDD 实际是一组小批次的 RDD 集合,是微批(Micro-Batch)的模型,以为核心。

下面我们介绍两个框架的主要区别:

架构模型

  • Spark Streaming 在运行时的主要角色包括:
  • 服务架构集群和资源管理: Master / Yarn Application Master;
  • 工作节点: Work / Node Manager;
  • 任务调度器: Driver;任务执行器: Executor

image

  • Flink 在运行时主要包含:客户端 Client、作业管理 Jobmanager、任务管理 Taskmanager。

image

数据处理架构

image

  • Spark : (微)批处理
  • 数据模型:采用RDD模型;Spark Streaming 的 DStream 实际上也是一组一组小批数据RDD的集合
  • Flink : 实时处理(流式处理)
  • 数据模型:基本数据模型是数据流(DataStream),及事件(Event)序列

任务调度 / 运行时架构

  • Spark Streaming 连续不断的生成微小的数据批次,构建有向无环图 DAG
  • Spark:批计算,将DAG划分成不同的Stage,一个完成后才可以计算下一个;
  • Spark Streaming : 会依次创建 DStreamGraph、JobScheduler。

image

  • Flink:

标准的流执行模式,一个事件在一个节点处理完成后可以直接发往下一个节点进行处理。

  • 根据用户提交的代码生成 StreamGraph,经过优化生成 JobGraph;然后,提交给 JobManager 进行处理
  • JobManager 会根据 JobGraph 生成 ExecutionGraph
  • ExecutionGraph 是 Flink 调度最核心的数据结构
  • JobManager 根据 ExecutionGraph 对 Job 进行调度,根据物理执行图部署到Taskmanager上形成具体的 Task 执行。

image

时间机制

  • Spark Streaming 支持的时间机制有限,只支持 处理时间。
  • Flink 支持了流处理程序在时间上的三个定义:事件时间 EventTime、摄入时间 IngestionTime 、处理时间 ProcessingTime。同时也支持 watermark 机制来处理滞后数据。

image

容错机制

  • 对于 Spark Streaming 任务,我们可以设置 checkpoint,然后假如发生故障并重启,我们可以从上次 checkpoint 之处恢复,但是这个行为只能使得数据不丢失,可能会重复处理,不能做到恰好一次处理语义。
  • Flink 则使用 两阶段提交协议 来解决这个问题。

2 Flink 工作原理与软件架构

数据架构演变

传统数据处理架构

image

Lambda 架构 := 实时处理 + 批处理

image

图例1

image

图例2

  • 数据摄取层(Ingestion layer)负责收集原始数据并将其复制以分别进行进一步的实时和批处理。

除了数据摄取层以外,Lambda架构还包含其它三个层:

  • 速度或流式层(Speed or Stream): 这里的原始数据是实时传入的,由流式处理框架(例如 Flink)处理,然后传递到服务层(Serving layer)以创建实时视图,以实现接近实时数据访问的低延迟。
  • 批处理层(Batch Layer):使用批处理框架(如 Spark)的对原始数据进行ETL处理,为离线历史数据访问创建可靠的批处理视图。
  • 服务层(Serving Layer):这是处理后的数据暴露给用户的地方。

最新的实时数据可以从实时视图访问或与批处理视图相结合以获得完整的历史记录。历史数据可以通过批处理视图访问。

  • 在批处理层和流式层中,不同技术的代码处理是重复的,会导致逻辑分歧

两套处理的计算资源重复的,这需要让我们管理两个计算基础设施。(存在资源浪费、数据一致性问题)

  • 分布式批处理存储是可靠的和可扩展的,即使系统崩溃,它也很容易恢复且没有错误。

Kappa 架构

  • LambdaKappa架构都是为了解决大量数据的移动,以实现可靠的在线访问而提出的数据架构。

最受欢迎的架构一直是并将继续是Lambda架构。
然而,随着流式处理变得更容易获得,在不久的将来你也会听到更多关于Kappa架构的消息。
让我们看看它们有何不同。

image

  • Kappa架构将批处理实时处理负载都视为【流式处理】的问题。

  • 使用速度层(Speed Layer)仅为了实时和批处理访问提供数据:

主要包含两个层:

  • 速度或流式层(Speed or Stream):类似于Lambda架构,但通常会包含分层存储。这意味着所有进入系统的数据都【无限期地存储】在不同的存储层中。

例如,将S3或GCS用于历史数据,而磁盘日志用于热数据。

  • 服务层(Serving Layer):与Lambda架构相同,但所有转换都在速度层(Speed Layer)中执行。

有些转换很难在速度层(Speed Layer)中执行(例如复杂的连接join),最终会被推送到 批处理存储中进行实现。

  • Kappa架构需要强大的流式处理技能。只需要使用【单个流式处理引擎】处理一次数据。我们也只需要管理一套套基础设施。

Flink系统架构

  • Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行状态计算。

Flink 被设计为在所有常见的集群环境中运行,以内存中的速度和任何规模执行计算。
在这里,我们解释下 Flink 架构的重要方面。

处理无界和有界数据

  • 任何类型的数据都是作为【事件流】产生的。信用卡交易、传感器测量、机器日志或网站或移动应用程序上的用户交互,所有这些数据都以【流【的形式生成。

  • 【数据】可以作为【无界】或【有界流】进行处理。

1、无界流有一个开始但没有定义的结束。它们不会终止并在生成数据时提供数据。必须连续处理无界流,即事件必须在被摄取后立即处理。不可能等待所有输入数据到达,因为输入是无界的并且不会在任何时间点完成。处理无界数据通常需要以特定顺序(例如事件发生的顺序)摄取事件,以便能够推断结果的完整性。
2、有界流具有定义的开始和结束。可以通过在执行任何计算之前摄取所有数据来处理有界流。处理有界流不需要有序摄取,因为始终可以对有界数据集进行排序。有界流的处理也称为批处理。

image

  • Apache Flink 擅长处理无界和有界数据集。

对【时间】和【状态】的【精确控制】,使 Flink 的【运行时】能够在【无限流】上运行任何类型的应用程序。
【有界流】由专门为固定大小的数据集设计的算法和数据结构在内部进行处理,从而产生出色的性能。

可通过探索基于 Flink 构建的用例来印证。

推荐文献

任务提交过程

  • Flink 作为一个分布式计算系统,它的运行涉及到多个进程,这些进程会分布到集群的多台不同机器上。分布式系统就需要面对这样的几个问题:

1、分配和管理集群计算资源;
2、进程协调;
3、持久且高可用的数据存储及故障恢复;
...

以YARN作为资源管理器为例.

  • Flink 任务提交后,Client向HDFS上传Flink的Jar包和配置,之后向Yarn资源管理器提交任务。
  • 资源管理器分配容器资源并通知对应的NodeManager启动ApplicationMaster
  • ApplicationMaster启动后加载Flink的Jar包和配置构建环境,然后启动JobManager
  • 之后 ApplicationMaster资源管理器申请资源启动TaskManager
  • 资源管理器分配容器资源后,由ApplicationMaster通知资源所在节点的NodeManager启动TaskManager
  • NodeManager加载Flink的Jar包和配置构建环境并启动TaskManager
  • TaskManager在启动后,向JobManager发送心跳包,并等待JobManager向其分配任务。

image

运行架构

image

image

从上面的Flink任务执行图可以看到,Flink运行时有两个主要的进程:JobManagerTaskManager
Client: 不是Flink程序运行时程序执行的一部分,它主要负责准备和提交dataFlowJobManager, 并接收JobManager返回的程序执行结果。

  • JobManager : 负责协调Flink程序的执行,包括:任务的调度、任务运行完成与失败的处理,协调检查点与恢复等,主要包括以下项职能:
  • ResourceManager:负责资源分配,管理任务slot, 这个是flink集群资源管理的单位。
  • Dispatcher:提供应用程序提交的REST接口,对每一个提交的作业启动JobMaster,并运行Flink WebUI提供作业执行的信息。
  • JobMaster:负责单个作业图(JobGraph)的执行。一个集群可以同时运行多个作业,每个作业都有自己的JobMaster。
  • TaskManager : 负责执行dataflow中的任务,缓存和交换数据流。

一个作业执行时,至少要有一个TaskManager,TaskManager资源调度的最小单位slot
一个TaskManager中的slot数表示的是该TaskManager中可以【并行执行任务】的数量。

image

image

任务和算子链 / 子任务(subtask) = Thread

  • 为方便执行,Flink将不同算子子任务subtask)链接到一个【任务】里,每一个【任务】在一个【线程】中执行。

这是一个非常有用的优化方式,它减小了【进程间】数据交换和缓存的开销,而且在减少延迟同时增加了吞吐量
这种链接关系是【可配置】的,下面是一个有5个子任务(subtask)的示例,有5个线程【并行执行】。

image

  • operatr chain : 算子链。显然地,图中的source 算子和 map 算子是合并了算子链的,当然也可选择不合并。
  • condensed view : 压缩后的视图; parallelized view : 并行化的视图

任务 Slot 和(内存)资源

  • 每个 worker(TaskManager)都是一个JVM进程,可以执行一个或多个子任务subtask)。
  • 【任务槽】(task slot)就是为了控制一个worker能同时运行多少个任务的(至少一个)。
  • 每个任务槽(task slot)代表TaskManager一个设定的资源子集。

比如, 一个TaskManager有3个槽,会将其管理的1/3的内存分给每个槽位。
将资源分成不同的槽位意味着一个子任务subtask)不会跟其他作业的子任务竞争资源,而是会拥有一定量的保留资源。
需要注意的是,这里不涉及CPU隔离,目前【任务槽】仅仅分割task管理的【内存】。

  • 为了适配【任务槽】(task slot)的数量,用户可以定义【子任务】(subtask)是如何隔离的。

如果每个TaskManager有一个槽,就意味着: Task组运行在不同的【JVM】里。
如果每个TaskManager有多个槽意味着多个子任务subtask)共享同一个JVM
任务在同一JVM运行可以共享TCP连接和心跳信息。
它们可以共享数据集和数据结构,因此可以减少每个任务的开销。

image

  • 越顶层越抽象,表达含义越简明,使用越方便;
  • 越底层越具体,表达能力越丰富,使用越灵活。

image

Flink提供不同级别的抽象来开发流/批处理应用程序。

  • Flink API 最底层的抽象为【有状态实时流处理】。

其抽象实现是 Process Function,并且 Process Function 被 Flink 框架集成到了 DataStream API 中来为我们使用。
它允许用户在应用程序中自由地处理来自单流或多流的事件(数据),并提供具有全局一致性容错保障的状态。
此外,用户可以在此层抽象中注册【事件时间】(event time)和【处理时间】(processing time)回调方法,从而允许程序可以实现复杂计算

  • Flink API 第二层抽象Core APIs

实际上,许多应用程序不需要使用到上述最底层抽象的 API,而是可以使用 Core APIs 进行编程:
其中包含 DataStream API(应用于有界/无界数据流场景)。
Core APIs 提供的流式 API(Fluent API)为数据处理提供了通用的模块组件,例如各种形式的用户自定义转换(transformations)、联接(joins)、聚合(aggregations)、窗口(windows)和状态(state)操作等。
此层 API 中处理的数据类型在每种编程语言中都有其对应的类。

Process Function 这类底层抽象DataStream API相互集成使得用户可以选择使用更底层的抽象 API 来实现自己的需求。

  • 特别说明:DataSet API

DataSet API(慢慢在被弃用) 还额外提供了一些原语,比如循环/迭代(loop/iteration)操作。
Flink 从 1.12 版本开始逐步弃用 DataSet API,并在 1.14 版本正式宣布废弃。
未来的 Flink 2.0 将完全移除该 API,推荐用户迁移到 DataStream API 或 Table API/SQL 来实现【批处理】和【流处理】的编程模型统一。

  • Flink API 第三层抽象Table API

Table API 是以(Table)为中心的声明式编程(DSL)API,例如在流式数据场景下,它可以表示一张正在动态改变的表。
Table API 遵循(扩展)关系模型:即表拥有 schema(类似于关系型数据库中的 schema),并且 Table API 也提供了类似于关系模型中的操作,比如 select、project、join、group-by 和 aggregate 等。
Table API 程序是以声明的方式定义应执行的逻辑操作,而不是确切地指定程序应该执行的代码。
尽管 Table API 使用起来很简洁并且可以由各种类型的用户自定义函数扩展功能,但还是比 Core API 的表达能力差。
此外,Table API 程序在执行之前还会使用优化器中的优化规则对用户编写的表达式进行优化。

表(Table)和 DataStream/DataSet 可以进行无缝切换,Flink 允许用户在编写应用程序时将 Table APIDataStream/DataSet API 混合使用。

  • Flink API 最顶层抽象SQL

这层抽象在语义和程序表达式上都类似于 Table API,但是其程序实现都是 SQL 查询表达式。
SQL 抽象与 Table API 抽象之间的关联是非常紧密的,并且 SQL 查询语句可以在 Table API 中定义的表上执行。

Stateful Stream Processing

  • 最低级的抽象接口是状态化的数据流接口(stateful streaming)。

这个接口是通过 ProcessFunction 集成到 DataStream API 中的。该接口允许用户自由的处理来自一个或多个流中的事件,并使用一致的容错状态。另外,用户也可以通过注册 event time 和 processing time 处理回调函数的方法来实现复杂的计算。

DataStream/DataSet API [80%的应用程序]

  • 大部分程序,通常会使用以 DataStream API(有界/无界数据流)、DataSet API(有界数据集)为代表的 Core APIs,并不会使用低级抽象接口
  • 这些API为数据处理提供了大量的通用模块(common building block),包括用户定义的各种各样的变换(transformations)、连接(joins)、聚合(aggregations)、窗口(windows)、状态(state)等等。
  • DataStream API 集成了 low level 处理函数,使得对一些特定的操作提供更低层次的抽象。
  • 此外,DataSet API(社区在弃用中) 也为有界数据集提供了一些补充的编程原语,例如循环(loops)、迭代(iterations)等。

Table API

  • Table API 是一种以数据表为核心的声明式 DSL,能够动态地修改表(表示流时)。

Table API 的是一种(扩展的)关系型模型:每个都有一个 schema(类似于关系型数据库中的表结构),API也提供以下操作: select,project,join,group by,aggregate等。

  • Table API 程序定义的是应该执行什么样的逻辑操作,而不是直接准确地指定程序代码运行的具体步骤。尽管 Table API 可以通过各式各样的自定义函数进行扩展,但是它在表达能力上仍然比不上 Core APIs,不过在使用上更简练(可以减少很多代码)。

  • 此外,Table API 程序在运行之前也会使用一个优化器对程序进行优化。用户可以在 tables 与 DataStream/DataSet 之间进行无缝切换,程序也可以混合使用 Table API 和 DataStream/DataSet APIs。

SQL

  • Flink 提供的最高级接口是 SQL
  • 这个层次的抽象接口和 Table API 非常相似,包括语法和接口的表现能力,唯一的区别是通过 SQL 查询语言实现程序。
  • SQL 抽象接口和 Table API 的交互非常紧密,而且 SQL 查询也可以在 Table API 中定义的表上执行。
  • 了解 Flink 应用开发需要先理解 Flink 的 Streams、State、Time 等基础处理语义以及 Flink 兼顾灵活性和方便性的多层次 API。

Streams/数据流

  • Streams,流,分为有限数据流无限数据流
  • unbounded stream 是有始无终的数据流,即无限数据流
  • bounded stream限定大小的有始有终的数据集合,即有限数据流
  • 二者的区别在于无限数据流的数据会随时间的推演而持续增加,计算持续进行且不存在结束的状态,相对的有限数据流数据大小固定,计算最终会完成并处于结束的状态。

State/状态

  • State,状态是计算过程中的数据信息,在容错恢复和 Checkpoint 中有重要的作用,流计算在本质上是 Incremental Processing,因此需要不断查询保持状态;另外,为了确保 Exactly- once 语义,需要数据能够写入到状态中;而持久化存储,能够保证在整个分布式系统运行失败或者挂掉的情况下做到 Exactly- once,这是状态的另外一个价值。

Time/时间

  • Time,分为 Event time、Ingestion time、Processing time
  • Flink 的无限数据流是一个持续的过程,时间是我们判断业务状态是否滞后,数据处理是否及时的重要依据。

API/接口

  • API,API 通常分为三层,由上而下可分为 SQL / Table API、DataStream API、ProcessFunction 三层
  • API 的表达能力及业务抽象能力都非常强大,但越接近 SQL 层,表达能力会逐步减弱,抽象能力会增强;
  • 反之,ProcessFunction 层 API 的表达能力非常强,可以进行多种灵活方便的操作,但抽象能力也相对越小。

image

  • Flink API

  • Flink SQL

  • 不同于诸多开源的提供非常底层的、编程API的流式数据处理系统,实时计算 Flink提供更加高层、更加面向业务化的Flink SQL(标准SQL语法上提供了关于流式处理的语法扩展)。
  • Flink SQL能够方便数据开发人员使用标准化的SQL,完成流式数据计算加工的业务流程。

因此,实时计算 Flink适合更大众的数据分析人员快速、方便地完成一个流式数据处理业务。

  • Flink 程序的基本构建块是转换。从概念上讲,是(可能永无止境的)数据记录流,而转换是将一个或多个流作为一个或多个流的操作。输入,并产生一个或多个输出流。

请注意,Flink 的 DataSet API 中使用的 DataSet 也是内部流 。

image

  • Flink 应用程序结构: (如上图所示)
  • Source: 数据源

Flink 在流处理和批处理上的 source 大概有 4 类:基于本地集合的 source、基于文件的 source、基于网络套接字的 source、自定义的 source。自定义的 source 常见的有 Apache kafka、RabbitMQ 等,当然你也可以定义自己的 source。

  • Transformation数据转换的各种操作/算子
    有 Map / FlatMap / Filter / KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select等,操作很多,可以将数据转换计算成你想要的数据。
  • Sink接收器(输出器),Flink 将转换计算后的数据发送的地点 ,你可能需要持久化/存储下来
  • Flink 常见的 Sink 大概有如下几类:写入文件、打印出来、写入 socket 、自定义的 sink 。
    自定义的 sink 常见的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等,同理你也可以定义自己的 sink。

Flink的架构优势

  • 在架构部分,主要分为以下4点:

image

  • 第一, Flink 具备统一的框架处理有界数据流无界数据流2种数据流的能力

  • 第二, 部署灵活。Flink 底层支持多种资源调度器,包括 Yarn、Kubernetes 等。

Flink 自身带的 Standalone 的调度器,在部署上也十分灵活。

  • 第三, 极高的可伸缩性,可伸缩性对于分布式系统十分重要,阿里巴巴双 11 大屏采用 Flink 处理海量数据,使用过程中测得 Flink 峰值可达 17 亿 / 秒。

  • 第四, 极致的流式处理性能。Flink 相对于 Storm 最大的特点是将状态语义完全抽象到框架中,支持本地状态读取,避免了大量网络 IO,可以极大提升状态存取的性能。

总体架构

image

image

https://developer.aliyun.com/article/756370

image

https://developer.aliyun.com/article/756370

任务调度与部署执行模型

Job Client / Job Manager / Task Manager / Actor System / ...

image

  1. 当 Flink 执行 executor 会自动根据程序代码生成 DAG 数据流图;

  2. ActorSystem 创建 Actor 将数据流图发送给 JobManager 中的 Actor;

  3. JobManager 会不断接收 TaskManager 的心跳消息,从而可以获取到有效的 TaskManager;

  4. JobManager 通过调度器在 TaskManager 中调度执行 Task(在 Flink 中,最小的调度单元就是 task,对应就是一个线程);

  5. 在程序运行过程中,task 与 task 之间是可以进行数据传输的。

Job Client

  • 主要职责是提交任务, 提交后可以结束进程, 也可以等待结果返回;

  • Job Client 不是 Flink 程序执行的内部部分,但它是任务执行的起点;

  • Job Client 负责接受用户的程序代码,然后创建数据流,将数据流提交给 Job Manager 以便进一步执行。 执行完成后,Job Client 将结果返回给用户。

Job Manager

  • 主要职责是调度工作并协调任务做检查点;
  • 集群中至少要有一个 master,master 负责调度 task,协调 checkpoints 和容错;
  • 高可用设置的话可以有多个 master,但要保证一个是 leader, 其他是 standby;
  • Job Manager 包含 Actor System、Scheduler、CheckPoint 三个重要的组件;
  • Job Manager 从客户端接收到任务以后, 首先生成优化过的执行计划, 再调度到 TaskManager 中执行。

Task Manager

  • 主要职责是从 Job Manager 处接收任务, 并部署和启动任务, 接收上游的数据并处理;
  • Task Manager 是在 JVM 中的一个或多个线程中执行任务的工作节点;
  • Task Manager 在创建之初就设置好了 Slot, 每个 Slot 可以执行一个任务。

Operator(算子)、Operator Chain(算子链)、Task(任务/线程)

  • Operator(算子)

详情参见:本文档: Stream 章节。

Source Operator / Transformation Operator / Sink Operator
  • (Operator Chain)算子链的定义

Flink 算子链Operator Chain)是一种优化机制
客户端在提交任务时————生成JobGraph阶段,会对代码中可进行优化合并的 Operator 进行优化操作,它将多个算子Operator串联在一起,优化成一个算子链Operator Chains)以放到一个 task(同一个线程)中执行,从而减少数据交换序列化上下文切换/线程切换缓冲的开销,提高作业的执行效率。

下面以官网中的例子进行说明,如下图所示:

image

图中,source、map、[keyBy|window|apply]、sink算子的并行度分别是2、2、2、2、1,经过Flink优化后,source和map算子组成一个算子链,作为一个task运行在一个线程上,其简图如图中condensed view所示,并行图如parallelized view所示。

  • 算子之间是否可以组成一个Operator Chains,看是否满足以下条件:
  • 上下游算子的并行度一致
  • 下游节点的入度为1
  • 上下游节点都在同一个 slot group 中

上下游算子实例处于同一个 SlotSharingGroup 中。

  • 下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS)

下游算子的链接策略(ChainingStrategy)为 ALWAYS。

  • 上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD)

上游算子的链接策略为 HEAD 或 ALWAYS。

  • 2个节点间数据分区方式是 forward
    2个算子间的物理分区逻辑是 ForwardPartitioner
  • 2个算子间的 shuffle 方式不是批处理模式
  • 用户没有禁用 chain(代码中是否配置disableChain()

若是对此还有疑惑,可以参看Operator Chains一文

  • 禁用算子链

用户可以通过以下方法禁用算子链:

  • 在算子上调用 disableChaining() 方法。
// 设置算子链
sum.print().disableChaining();
  • 在算子上调用 startNewChain() 方法,强制开始一个新的算子链
  • 调用 StreamExecutionEnvironment.disableOperatorChaining() 方法,在整个运行时环境禁用算子链
// 禁用算子链
env.disableOperatorChaining();

任务槽(Task Slot)、槽共享机制

image

TaskManager 与 JVM 的关系 / 线程 与 子任务的关系

  • 每个 TaskManager 是一个独立的 JVM 进程, 可以在不同的线程中执行一个或多个子任务

任务槽(Task Slot)

  • 为了控制一个 worker 能接收多少个 task————worker 通过 task slot 来进行控制(一个 worker 至少有一个 task slot)。

  • 每个 task slot 表示 TaskManager 拥有资源的一个固定大小的子集。

  • flink 将进程的内存进行了划分到多个 slot 中。

图中有 2 个 TaskManager,每个 TaskManager 有 3 个 slot 的,每个 slot 占有 1/3 的内存。

  • 内存被划分到不同的 slot 之后可以获得如下好处:
  • TaskManager 最多能同时并发执行的任务是可以控制的,那就是 3 个,因为不能超过 slot 的数量。
  • slot独占的内存空间,这样在一个 TaskManager 中可以运行多个不同的作业,作业之间不受影响。

槽共享(同一作业的子任务间)

  • 默认情况下,Flink 允许【子任务】共享插槽,即使它们是不同任务的子任务,只要它们来自同一个作业

结果是一个可以保存作业的整个管道。允许插槽共享有2个主要好处:

  • 只需计算 Job 中最高并行度parallelism)的 task slot,只要这个满足,其他的 job 也都能满足。
  • 资源分配更加公平,如果有比较空闲的 slot 可以将更多的任务分配给它。

图中若没有任务槽共享,负载不高的 Source/Map 等 subtask 将会占据许多资源,而负载较高的窗口 subtask 则会缺乏资源。

  • 有了任务槽共享,可以将基本并行度(base parallelism)从 2 提升到 6,提高了分槽资源的利用率。同时它还可以保障 TaskManagersubtask 的分配的 slot 方案更加公平。

image

  • 后面会有专门讲解,此处简单分享 Flink 关于运维及业务监控的内容:

  • Flink 具备 7 X 24 小时高可用SOA(面向服务架构)

  • 原因:在实现上 Flink 提供了一致性的 Checkpoint
  • Checkpoint 是 Flink 实现容错机制的核心————它周期性的记录计算过程中 Operator 的状态,并生成快照持久化存储
  • 当 Flink 作业发生故障崩溃时,可以有选择的从 Checkpoint 中恢复,保证了计算的一致性。
  • Flink 本身提供监控、运维等功能或接口,并有内置的 Web UI,对运行的作业提供 DAG 图以及各种 Metric 等,协助用户管理作业状态。

深度解读:有状态的流式处理

传统批处理

image

  • 传统批处理方法是持续收取数据,以时间作为划分多个批次的依据,再周期性地执行批次运算

但假设需要计算每小时出现事件转换的次数,如果事件转换跨越了所定义的时间划分,传统批处理会将中介运算结果带到下一个批次进行计算;
除此之外,当出现接收到的事件顺序颠倒情况下,传统批处理仍会将中介状态带到下一批次的运算结果中,这种处理方式也不尽如人意。

理想方法

image

  • 第一,要有理想方法,这个理想方法是引擎必须要有能力可以累积状态维护状态

累积状态代表着过去历史中接收过的所有事件,会影响到输出。

  • 第二,时间。时间意味着引擎对于数据完整性有机制可以操控,当所有数据都完全接受到后,输出计算结果。

  • 第三,理想方法模型需要实时产生结果,但更重要的是采用新的持续性数据处理模型来处理实时数据,这样才最符合 continuous data 的特性。

流式处理

image

  • 流式处理,简单来讲,即:有一个无穷无尽的数据源在持续收取数据,以代码作为数据处理的基础逻辑,数据源的数据经过代码处理后产生出结果,然后输出,这就是流式处理的基本原理

分布式流式处理

image

  • 假设 Input Streams 有很多个使用者,每个使用者都有自己的 ID

如果计算每个使用者出现的次数,我们需要让同一个使用者的出现事件流到同一运算代码,这跟其他批次需要做 group by 是同样的概念。
所以,跟 Stream 一样需要做分区,设定相应的 key,然后让同样的 key 流到同一个 computation instance 做同样的运算。

有状态分布式流式处理

image

如图,上述代码中定义了变数 X,X 在数据处理过程中会进行读和写,在最后输出结果时,可以依据变数 X 决定输出的内容,即状态 X 会影响最终的输出结果。

  • 这个过程中,第一个重点是先进行了状态 co-partitioned key by,同样的 key 都会流到 computation instance,与使用者出现次数的原理相同,次数即所谓的状态,这个状态一定会跟同一个 key 的事件累积在同一个 computation instance。

image

相当于: 根据输入流的 key 重新分区的 状态,当分区进入 stream 之后,这个 stream 会累积起来的状态也变成 copartiton 了。

  • 第二个重点embeded local state backend有状态分散式流式处理的引擎,状态可能会累积到非常大,当 key 非常多时,状态可能就会超出单一节点的 memory 的负荷量

这时候状态必须有状态后端(State Backend)去维护它;
在这个状态后端在正常状况下,用 in-memory 维护即可。

image

image

  • 电商和市场营销
  • 实时报表
  • 广告投放
  • 实时推荐
  • 物联网
  • 实时数据采集
  • 实时告警
  • 物流配送及服务
  • 订单状态跟踪
  • 物流信息推送
  • 金融及银行行业
  • 风险检测
  • 实时结算

1) Data Pipeline

image

  • Data Pipeline 的核心场景

类似于数据搬运并在搬运的过程中进行部分数据清洗或者处理,而整个业务架构图的左边是 Periodic ETL,它提供了流式 ETL 或者实时 ETL,能够订阅消息队列的消息并进行处理,清洗完成后实时写入到下游的 Database 或 File system 中。场景举例:

  • 实时数仓
  • 当下游要构建实时数仓时,上游则可能需要实时的 Stream ETL
  • 这个过程会进行实时清洗或扩展数据,清洗完成后写入到下游的实时数仓的整个链路中,可保证数据查询的时效性,形成实时数据采集、实时数据处理以及下游的实时 Query。
  • 搜索引擎推荐
  • 搜索引擎这块以淘宝为例,当卖家上线新商品时,后台会实时产生消息流,该消息流经过 Flink 系统时会进行数据的处理、扩展。
  • 然后,将处理及扩展后的数据生成实时索引,写入到搜索引擎中。

这样当淘宝卖家上线新商品时,能在秒级或者分钟级实现搜索引擎的搜索。

  • ...

2) Data Analytics

image

  • Data Analytics,如图,左边是 Batch Analytics,右边是 Streaming Analytics。
  • Batch Analysis 就是传统意义上使用类似于 Map Reduce、Hive、Spark Batch 等,对作业进行分析、处理、生成离线报表
  • Streaming Analytics 使用流式分析引擎如 Storm,Flink 实时处理分析数据,应用较多的场景如实时大屏、实时报表。

3) Data Driven

image

  • 从某种程度上来说,所有的实时的数据处理或者是流式数据处理都是属于 Data Driven流计算本质上是 Data Driven 计算

  • 应用较多的如:风控系统,当风控系统需要处理各种各样复杂的规则时,Data Driven 就会把处理的规则和逻辑写入到 Datastream 的 API 或者是 ProcessFunction 的 API 中,然后将逻辑抽象到整个 Flink 引擎中,当外面的数据流或者是事件进入就会触发相应的规则,这就是 Data Driven 的原理。

  • 在触发某些规则后,Data Driven 会进行处理或者是进行预警,这些预警会发到下游产生业务通知,这是 Data Driven 的应用场景,Data Driven 在应用上更多应用于复杂事件的处理。

2 Apache Flink 的核心机制

2.0 Streams/数据流

  • Streams,流,分为有限数据流无限数据流
  • unbounded stream 是有始无终的数据流,即无限数据流
  • bounded stream限定大小的有始有终的数据集合,即有限数据流
  • 二者的区别在于无限数据流的数据会随时间的推演而持续增加,计算持续进行且不存在结束的状态,相对的有限数据流数据大小固定,计算最终会完成并处于结束的状态。
  • Flink 程序在执行的时候,会被映射成一个 Streaming Dataflow
  • 一个 Streaming Dataflow 是由一组 StreamTransformation Operator 组成的。
  • 在启动时,从一个或多个 Source Operator 开始,结束于一个或多个 Sink Operator
Source Operator / Transformation Operator / Sink Operator
  • Flink 程序本质上是并行的和分布式的,在执行过程中,一个(stream)包含一个或多个流分区(Stream partition)
  • 而每一个 operator 包含一个或多个 operator 子任务(Sub Task)。
  • 操作子任务间彼此独立,在不同的线程中执行,甚至是在不同的机器不同的容器上。
  • operator 子任务的数量是这一特定 operator并行度
  • 相同程序中的不同 operator 有不同级别的并行度

image

  • 一个 Stream 可以被分成多个 Stream 的分区,也就是 Stream Partition
  • 一个 Operator 也可以被分为多个 Operator Subtask。(如上图中,Source 被分成 Source1 和 Source2,它们分别为 Source 的 Operator Subtask。)
  • 每一个 Operator Subtask 都是在不同的线程当中独立执行的。
  • 一个 Operator并行度,就等于 Operator Subtask 的个数。(上图 Source 的并行度为 2)。
  • 而一个 Stream并行度就等于它生成的 Operator 的并行度
  • 数据在2个 operator 之间传递的时候有2种模式:
  • One to One 模式:2个 operator 用此模式传递的时候,会保持数据的分区数和数据的排序

如上图中的 Source1 到 Map1,它就保留的 Source 的分区特性,以及分区元素处理的有序性。

  • Redistributing重新分配)模式:这种模式会改变数据的分区数;每个一个 operator subtask 会根据选择 transformation 把数据发送到不同的目标 subtasks,

比如, keyBy() 会通过 hashcode 重新分区, broadcast()rebalance() 方法会随机重新分区

2.1 状态容错 : Checkpoint / Savepoint

  • 当我们考虑状态容错时难免会想到精确一次的状态容错,应用在运算时累积的状态,每笔输入的事件都反映到状态更改状态都是精确一次,如果修改超过一次的话也意味着数据引擎产生的结果是不可靠的。
  • 如何确保状态拥有精确一次(Exactly-once guarantee)的容错保证?
  • 如何在分散式场景下替多个拥有本地状态的运算子产生一个全域一致的快照(Global consistent snapshot)?
  • 更重要的是,如何在不中断运算的前提下产生快照?

2.1.1 简单场景的精确一次容错方法

  • 还是以使用者出现次数来看,如果某个使用者出现的次数计算不准确,不是精确一次,那么产生的结果是无法作为参考的。
  • 在考虑精确的容错保证前,我们先考虑最简单的使用场景,如:
  • 无限流的数据进入,后面单一的 Process 进行运算,每处理完一笔计算即会累积一次状态。

这种情况下,如果要确保 Process 产生精确一次的状态容错,每处理完一笔数据,更改完状态后进行一次快照快照包含在队列中并与相应的状态进行对比,完成一致的快照,就能确保精确一次。

2.1.2 分布式状态容错

  • Flink 作为分布式的处理引擎,在分布式的场景下,进行多个本地状态的运算,只产生一个全域一致的快照,如需要在不中断运算值的前提下产生全域一致的快照,就涉及到分散式状态容错

Global consistent snapshot/全局一致性快照

image

关于 Global consistent snapshot,当 Operator分布式的环境中,在各个节点做运算。
首先产生 Global consistent snapshot 的方式就是处理每一笔数据的快照点是连续的,这笔运算流过所有的运算值,更改完所有的运算值后,能够看到每一个运算值的状态与该笔运算的位置,即可称为 consistent snapshot,当然,Global consistent snapshot 也是简易场景的延伸。

容错恢复

image

  • 首先,了解一下 Checkpoint,上面提到连续性快照,每个 Operator 运算值本地的状态后端都要维护状态。

也就是每次将产生检查点时会将它们传入共享的 DFS ** 中。
当任何一个 Process 挂掉后,可以直接从三个完整的 Checkpoint 将所有的运算值的状态恢复,重新设定到相应位置。
Checkpoint 的存在使,整个 Process 能够实现
分散式环境中的 Exactly-once**。

2.1.3 分散式快照(Distributed Snapshots)方法

image

关于 Flink 如何在不中断运算的状况下持续产生 Global consistent snapshot,其方式是基于用 simple lamport 演算法机制下延伸的。
已知的一个点 Checkpoint barrier, Flink 在某个 Datastream 中会一直安插 Checkpoint barrierCheckpoint barrier 也会 N — 1 等等,Checkpoint barrier N 代表着所有在这个范围里面的数据都是 Checkpoint barrier N

image

举例:假设现在需要产生 Checkpoint barrier N,但实际上在 Flink 中是由 job manager 触发 Checkpoint,Checkpoint 被触发后开始从数据源产生 Checkpoint barrier。当 job 开始做 Checkpoint barrier N 的时候,可以理解为 Checkpoint barrier N 需要逐步填充左下角的表格。

image

如图,当部分事件标为红色,Checkpoint barrier N 也是红色时,代表着这些数据或事件都由 Checkpoint barrier N 负责。Checkpoint barrier N 后面白色部分的数据或事件则不属于 Checkpoint barrier N。
在以上的基础上,当数据源收到 Checkpoint barrier N 之后会先将自己的状态保存,以读取 Kafka 资料为例,数据源的状态就是目前它在 Kafka 分区的位置,这个状态也会写入到上面提到的表格中。下游的 Operator 1 会开始运算属于 Checkpoint barrier N 的数据,当 Checkpoint barrier N 跟着这些数据流动到 Operator 1 之后,Operator 1 也将属于 Checkpoint barrier N 的所有数据都反映在状态中,当收到 Checkpoint barrier N 时也会直接对 Checkpoint 去做快照。

image

当快照完成后继续往下游走,Operator 2 也会接收到所有数据,然后搜索 Checkpoint barrier N 的数据并直接反映到状态,当状态收到 Checkpoint barrier N 之后也会直接写入到 Checkpoint N 中。以上过程到此可以看到 Checkpoint barrier N 已经完成了一个完整的表格,这个表格叫做 Distributed Snapshots,即分布式快照。分布式快照可以用来做状态容错,任何一个节点挂掉的时候可以在之前的 Checkpoint 中将其恢复。继续以上 Process,当多个 Checkpoint 同时进行,Checkpoint barrier N 已经流到 job manager 2,Flink job manager 可以触发其他的 Checkpoint,比如 Checkpoint N + 1,Checkpoint N + 2 等等也同步进行,利用这种机制,可以在不阻挡运算的状况下持续地产生 Checkpoint。

2.1.4 状态保存与迁移:保存点(SavePoint)——手动产生1个Checkpoint

  • 流式处理应用无时无刻不在运行,运维上有几个重要考量:
  • 更改应用逻辑 / 修 bug 等,如何将前一执行的状态迁移到新的执行?
  • 如何重新定义运行的平行化程度?
  • 如何升级运算丛集的版本号?
  • Checkpoint 完美符合以上需求,不过 Flink 中还有另外一个名词:保存点Savepoint),当手动产生一个 Checkpoint 的时候,就叫做一个 Savepoint

  • SavepointCheckpoint 的差别在于:检查点是 Flink 对于一个有状态应用在运行中利用分布式快照持续周期性的产生 Checkpoint,而 Savepoint 则是人工手动产生CheckpointSavepoint 记录着流式应用中所有运算元的状态。

image

如图,Savepoint A 和 Savepoint B,无论是变更底层代码逻辑、修 bug 或是升级 Flink 版本,重新定义应用、计算的平行化程度等,最先需要做的事情就是产生 Savepoint

  • Savepoint 产生的原理是在 Checkpoint barrier 流动到所有的 Pipeline手动插入从而产生分布式快照,这些分布式快照点即 Savepoint
  • Savepoint 可以放在任何位置保存,当完成变更时,可以直接从 Savepoint 恢复、执行。
  • Savepoint恢复执行时,需要注意,在变更应用的过程中时间在持续,如 Kafka 在持续收集资料,当从 Savepoint 恢复时,Savepoint 保存着 Checkpoint 产生的时间以及 Kafka 的相应位置,因此它需要恢复到最新的数据。

无论是任何运算,Event - Time 都可以确保产生的结果完全一致。

  • 假设恢复后的重新运算用 Process Event - Time,将 windows 窗口设为 1 小时,重新运算能够在 10 分钟内将所有的运算结果都包含到单一的 windows 中。

而如果使用 Event – Time,则类似于做 Bucketing。
在 Bucketing 的状况下,无论重新运算的数量多大,最终重新运算的时间以及 windows 产生的结果都一定能保证完全一致。

2.2 状态维护 : StateBackend

  • 状态维护,即用一段代码在本地维护状态值,当状态值非常大时需要本地的状态后端来支持。
  • 在 Apache Flink 中,状态后端StateBackend)是管理状态存储、访问和维护的核心组件。
  • 状态后端定义了状态的存储方式,以及在发生故障时如何恢复状态
  • Flink 提供了3种类型的状态后端:MemoryStateBackendFsStateBackendRocksDBStateBackend
  • 维护状态时可以根据状态的数量选择相应的状态后端

如果状态数据量不大,可以使用 FsStateBackend
对于大规模状态数据的处理,建议使用 RocksDBStateBackend

image

如图,在 Flink 程序中,可以采用 getRuntimeContext().getState(desc); 这组 API 去注册状态。
Flink 有多种状态后端,采用 API 注册状态后,读取状态时都是通过状态后端来读取的。Flink 有3种不同的状态值,也有3种不同的状态后端:

JVM Heap 状态后端(MemoryStateBackend)

image

  • MemoryStateBackend
  • 将所有状态数据存储在 JVM 堆内存中,适用于小规模状态数据的场景,如本地测试和调试。
  • JVM Heap 状态后端,适合数量较小的状态,当状态量不大时就可以采用 JVM Heap 的状态后端。
    但由于内存容量的限制,不推荐在【生产环境】中使用。它是 Flink 默认的状态后端
  • JVM Heap 状态后端会在每一次运算值需要读取状态时,用 Java object read / writes 进行读或写,不会产生较大代价,但当 Checkpoint 需要将每一个运算值的本地状态放入 Distributed Snapshots 的时候,就需要进行序列化了。

image

RocksDB 状态后端(RocksDBStateBackend)

  • RocksDBStateBackend :
  • 它是一种 out of core 的状态后端。
  • 使用 RocksDB 作为本地数据库存储状态数据,并支持增量 Checkpoint
  • 它适合处理大规模状态数据的场景,如天级窗口聚合。

RocksDBStateBackend 在性能上优于 FsStateBackend,因为它存储了最新的热数据,并通过异步方式同步到文件系统中。

  • Runtime本地状态后端让使用者去读取状态的时候会经过磁盘

相当于将状态维护在磁盘里,与之对应的代价可能就是每次读取状态时,都需要经过序列化反序列化的过程。
当需要进行快照时只将应用序列化即可,序列化后的数据直接传输到中央的共享 DFS 中。

文件系统状态后端(FsStateBackend)

  • FsStateBackend 基于文件系统,支持本地或分布式文件系统(如 HDFS / OSS)。
  • 它适用于状态数据较大的场景,如长时间窗口的计算或大量 Key/Value 状态数据。
  • FsStateBackend 通过异步方式状态数据持久化到文件系统中,提高了状态数据的安全性

配置状态后端

  • Flink 默认使用 MemoryStateBackend无需显式配置
  • 对于其他状态后端,需要在应用级别集群级别进行配置。
  • 应用级别的配置通过 StreamExecutionEnvironment 设置:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));

如果使用 RocksDBStateBackend,则需要引入相关依赖,并进行类似的配置。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 'true' 是启用增量检查点
env.setStateBackend(new RocksDBStateBackend("hdfs:///flink-checkpoints", true));

RocksDB 的优化和最佳实践

  • 1 定时器优化:将定时器放到堆内存中,以提高访问效率。
  • 2 磁盘性能:将 state.backend.rocksdb.localdir 目录配置到本地磁盘上,以提高 RocksDB 的性能。
  • 3 后台刷新和压缩的并行性:在多核 CPU 机器上,通过设置 state.backend.rocksdb.thread.num 来增加后台刷新和压缩的并行性。
  • 集群级别的配置通过 flink-conf.yaml 文件进行,可以设置状态后端类型Checkpoint 存储路径。例如:
state.backend: rocksdb
state.backend.incremental: true
state.checkpoints.dir: hdfs:///flink-checkpoints
  • 在生产环境中,应根据状态数据的大小和应用场景选择合适的状态后端,以确保性能和数据的安全性。

2.3 Event - Time

2.3.1 不同时间种类: Event Time <== Process Time

  • 在 Flink 及其他进阶的流式处理引擎出现之前,大数据处理引擎一直只支持 Processing-time 的处理。

假设定义一个运算 windows 的窗口,windows 运算设定每小时进行结算。
Processing-time 进行运算时可以发现数据引擎将 3 点至 4 点间收到的数据做结算。
实际上在做报表或者分析结果时是想了解真实世界中 3 点至 4 点之间实际产生数据的输出结果,了解实际数据的输出结果就必须采用 Event – Time 了。

image

如图,Event - Time 相当于事件,它在数据最源头产生时带有时间戳,后面都需要用时间戳来进行运算。
用图来表示,最开始的队列收到数据,每小时对数据划分一个批次,这就是 Event - Time Process 在做的事情。

2.3.2 Event-Time 处理

image

  • Event-Time 是用事件真实产生的时间戳去做 Re-bucketing,把对应时间 3 点到 4 点的数据放在 3 点到 4 点的 Bucket,然后 Bucket 产生结果。

所以, Event - TimeProcessing - time 的概念是这样对比的存在。

image

  • Event - Time 的重要性在于记录引擎输出运算结果的时间。

简单来说,流式引擎连续 24 小时在运行、搜集资料,假设 Pipeline 里有一个 windows Operator 正在做运算,每小时能产生结果,何时输出 windows 的运算值,这个时间点就是 Event - Time 处理的精髓,用来表示该收的数据已经收到。

2.3.3 Watermarks

image

  • Flink 实际上是用 watermarks 来实现 Event - Time 的功能。
  • Watermarks 在 Flink 中也属于特殊事件,其精髓在于当某个运算值收到带有时间戳“ T ”的 watermarks 时,就意味着它不会接收到新的数据了。
  • 使用 watermarks 的好处在于:可以准确预估收到数据的截止时间。
  • 举例,假设预期收到数据时间与输出结果时间的时间差延迟 5 分钟,那么 Flink 中所有的 windows Operator 搜索 3 点至 4 点的数据,但因为存在延迟需要再多等 5 分钟直至收集完 4:05 分的数据,此时方能判定 4 点钟的资料收集完成了,然后才会产出 3 点至 4 点的数据结果。这个时间段的结果对应的就是 watermarks 的部分。

Z 概念补充

作业(Job)

类似于MaxCompute或Hadoop Job,一个实时计算的作业描述了一个完整的流式数据处理业务逻辑,是流式计算的基础业务单元

UDF (User Define Function)

  • 实时计算 Flink支持UDF函数。类似于Hive UDF函数,Flink SQL提供了标准化的流式数据处理能力同时,对于部分业务特殊自定义处理逻辑,建议您使用UDF函数表达。
  • 目前实时计算 Flink仅支持Java的UDF函数扩展。

资源 (Resource)

当前UDF函数仅支持使用Java语言表达,对于您上传的每个JAR,实时计算定义为一个Resource。

数据采集 (Data Collection)

  • 广义的数据采集指,将数据从数据产生方收集并传输进入到大数据处理引擎的过程。
  • 在实时计算 Flink,数据采集原则上遵循上述定义,但更加聚焦为将流式数据数据产生方收集并传输进入数据总线的过程。

数据存储 (Data Storage)

  • 实时计算 Flink 定义为一种轻量级计算引擎,本身不带有任何业务数据存储系统。
  • 实时计算 Flink 均是使用外部数据存储作为数据来源和数据目的端进行使用。
  • 实时计算将数据存储均定义为外部的数据存储

例如,将您RDS作为结果表,那么RDS即是实时计算的一类DataStore。

  • 支持流式输入表类型包括
  • 大数据总线(DataHub)
  • 日志服务(LogService)
  • 消息服务(MQ)
  • 支持静态输入表类型包括
  • 表格存储(TableStore)
  • 云数据库(RDS)
  • 支持输出表类型包括
  • 大数据总线(DataHub)
  • 日志服务(LogService)
  • 表格存储(TableStore)
  • 云数据库(RDS)
  • 消息服务(MQ)

数据加工 (Data Development)

  • 流式计算的开发过程(编写Flink SQL Job、Flink API Job的过程)定义为数据加工。
  • 实时计算 Flink可提供一整套包括开发、调试的在线IDE,服务流式数据加工过程。

数据运维 (Data Operation)

  • 实时计算作业的在线运维定义为数据运维。
  • 实时计算 Flink可提供一整套管控平台,方便您进行流式数据的运维管控。

资源估算类

Compute Unit/CU/实时计算单元 := 1 CPU / 4GB

  • Flink工作空间的基本计量单位Compute Unit(CU)
  • 即: 计算资源,1 CU=1核CPU+4 GiB内存+20
  • 华为云(DLI)、阿里云(实时计算Flink版),对1CU的定义,均是 1Cpu 、4GB
  • 在实时计算 Flink中,作业的实时计算单元为CU。一个CU描述了一个实时计算作业最小运行能力,即在限定的CPU、内存、I/O情况下对于事件流处理的能力。一个实时计算作业可以指定在一个或者多个CU上运行。

当前实时计算定义,1CU的处理能力大概为 1000条数据 / 秒。 (阿里云)

  • GB本地存储(放置日志、系统检查点等信息),CU对应实时计算底层系统的CPU计算能力。

1个实时计算作业(Job)的CU使用量取决于此Job输入数据流的QPS、计算复杂程度,以及具体的输入数据分布情况。
您可以根据业务规模以及实时计算的计算能力,估算所需购买的资源数量。
实时计算1 CU的处理能力如下表所示。

处理场景 处理能力
简单的流式压测处理
例如,过滤、清洗等操作。
1 CU每秒可以处理40000~55000条数据。
复杂的流式压测处理
例如,聚合操作、复杂UDF计算等。
1 CU每秒可以处理5000~10000条数据。
  • 计算能力的特别说明
  • 上述计算能力估值仅限于实时计算内部处理能力,不包括对外数据读取写入部分
  • 外部数据的读写效率会影响您对实时计算能力的评估。

例如:如果实时计算需要从日志服务(LogService)读取数据,但LogService对于请求调用配额(Quota)存在一定限制,则:实时计算整体的计算能力将被限制在LogService允许的范围内。

  • 如果实时计算引用的RDS数据存储存在连接数或者TPS限制,则:实时计算Job的吞吐能力将受限于RDS本身的流控限制。
  • 如果作业中使用窗口函数CU的使用量会比简单作业高,建议至少购买 4 CU

Slot/任务槽

  • taskslot静态概念 , 是指 单 TaskManager 具有的并发执行能力
  • 可以通过参数 taskmanager.numberOfTaskSlots 进行配置

并行度/parallelism

  • 并行度是动态概念,也就是 TaskManager 运行程序时实际使用的并发能力,可以通过参数 parallelism.default 进行配置

  • 如果并行度 <= 集群中可用 slot 的总数,则:Flink Job 可正常执行

因为 slot 不一定要全部占用,有十分力气可以只用八分;

  • 而如果并行度 > 可用 集群中 slot 的总数,则: 导致超出了并行能力上限————心有余力不足,Flink Job 就只好等待资源管理器分配更多的资源了。
  • 【单TM所占CU数】假定值n = 1

华为云DLI给Flink Job实际分配CPU时,会虚拟CPU给 Flink Job 的 Task Manager。
这个虚拟的比例是2n + 1
真实的资源根据DLI Job配置界面中配置的【单TM Slot数】的资源来计算

假定【单TM所占CU数】为1,则:Flink Web UI中 Task Manager 的 CPU 数会显示为 3

  • 【CU数量】 = 作业占用资源总CU数,需配置与实际占用资源一致,作业实际占用资源根据算子并行数按需申请。

CU数量 = 管理单元 + (算子总并行数 / 单TM Slot数) * 单TM所占CU数
即: TaskManager 数量 = 算子总并行数 / 单TM Slot数

TaskManager 数量,是按需的、动态变化的

  • 【实际CU数量】= 弹性资源池当前分配的CU

  • 【管理单元】 = Job Manager

  • 【并行数】

算子默认最大并行数,优先级比代码中低。

并行数为作业每个算子的并行数,适度增加并行数会提高作业整体算力,但也须考虑线程增多带来的切换开销,其上限是计算单元CU数的4倍;

最佳实践为计算单元CU数的1-2倍。

注意:该并行数设置优先级低于代码中并行数设置。
==============	
Slot(任务槽)是Flink中用于执行任务的资源单位,每个TaskManager可以提供多个Slot,每个Slot可以运行一个并行任务。
并行度(parallelism)是指作业执行的并行程度,即:作业中所有并行子任务的总数。
Slot和并行度的关系:
	需 TaskManager 的Slot数 * TaskManager 的数量 >= 作业的并行度;否则,作业无法充分利用所有资源。
	若 作业的并行度 > 集群中所有TaskManager提供的Slot总数,则:作业将无法启动更多的并行任务,即使资源足够。

参考文献

关于 CU 的说明

关于 CU 的说明

Y 推荐文献

  • Apache Flink
  • Flink CDC

X 参考文献

posted @ 2025-09-04 02:07  千千寰宇  阅读(78)  评论(0)    收藏  举报