四、核心知识点4:water
需求升级:
实时模拟生成订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)
* 要求每隔5s,计算5秒内,每个用户的订单总金额
* 并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。
package com.bigdata.day04; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.time.Duration; import java.util.Random; import java.util.UUID; /** * @基本功能: * @program:FlinkDemo2 * @author: 闫哥 * @create:2025-04-18 14:27:41 **/ public class _07WaterMarkDemo02 { @Data // set get toString @AllArgsConstructor @NoArgsConstructor public static class OrderInfo{ private String orderId; private int uid; private int money; private long timeStamp; } // 自定义source , 每隔1秒钟生成一个订单 public static class MySource implements SourceFunction<OrderInfo> { private boolean flag = true; @Override public void run(SourceContext<OrderInfo> ctx) throws Exception { // 源源不断的产生数据 Random random = new Random(); while(flag){ OrderInfo orderInfo = new OrderInfo(); orderInfo.setOrderId(UUID.randomUUID().toString()); orderInfo.setUid(random.nextInt(3)); orderInfo.setMoney(random.nextInt(101)); orderInfo.setTimeStamp(System.currentTimeMillis()); ctx.collect(orderInfo); Thread.sleep(1000);// 间隔1s } } @Override public void cancel() { flag = false; } } public static void main(String[] args) throws Exception { //1. env-准备环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); env.setParallelism(1); //2. source-加载数据 DataStreamSource<OrderInfo> dataStreamSource = env.addSource(new MySource()); dataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner( new SerializableTimestampAssigner<OrderInfo>() { // long 是时间戳吗?是秒值还是毫秒呢?年月日时分秒的的字段怎么办呢? @Override public long extractTimestamp(OrderInfo orderInfo, long recordTimestamp) { // 这个方法的返回值是毫秒,所有的数据只要不是这个毫秒值,都需要转换为毫秒 return orderInfo.getTimeStamp(); } } )).keyBy(orderInfo -> orderInfo.getUid()).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new WindowFunction<OrderInfo, String, Integer, TimeWindow>() { @Override public void apply(Integer integer, TimeWindow window, Iterable<OrderInfo> input, Collector<String> out) throws Exception { long start = window.getStart(); long end = window.getEnd(); String startStr = DateFormatUtils.format(start,"yyyy-MM-dd HH:mm:ss"); String endStr = DateFormatUtils.format(end,"yyyy-MM-dd HH:mm:ss"); int sumMoney = 0; for (OrderInfo orderInfo : input) { sumMoney += orderInfo.getMoney(); } out.collect("开始时间:"+startStr+"结束时间:"+endStr+",用户id="+integer+",订单总额:"+sumMoney); } }).print(); //4. sink-数据输出 //5. execute-执行 env.execute(); } }
这是一个Flink流处理程序,模拟生成订单数据,然后按照事件时间(使用Watermark处理乱序)进行5秒的滚动窗口,统计每个用户在5秒内的订单总额。
详细步骤:
-
定义了一个订单类OrderInfo,包含订单ID、用户ID、金额和时间戳。
-
自定义一个数据源MySource,每隔1秒生成一个随机订单(用户ID为0-2,金额0-100,时间戳为当前系统时间)。
-
主程序中,设置Flink执行环境,添加数据源。
-
为数据流分配时间戳和水印(Watermark),允许3秒的乱序。
-
按照用户ID进行分组,然后开一个5秒的事件时间滚动窗口。
-
在窗口上应用一个WindowFunction,计算每个窗口内每个用户的订单总金额,并输出窗口开始结束时间、用户ID和总金额。
-
打印结果。
下面我们逐行解释关键代码:
-
主类定义和OrderInfo类:
-
OrderInfo是一个简单的POJO,使用Lombok注解生成getter、setter、全参构造和无参构造。
-
-
自定义数据源MySource:
-
实现SourceFunction接口,重写run和cancel方法。
-
run方法中,每秒钟生成一个随机订单,使用ctx.collect()发送数据。
-
cancel方法通过设置flag为false来终止数据生成。
-
-
主方法main:
a. 设置执行环境:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); // 设置运行时模式为自动(根据输入源决定是流还是批)
env.setParallelism(1); // 设置并行度为1,方便调试
b. 加载数据源:
DataStreamSource<OrderInfo> dataStreamSource = env.addSource(new MySource());
c. 分配时间戳和水印:
dataStreamSource.assignTimestampsAndWatermarks(
WatermarkStrategy.<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(new SerializableTimestampAssigner<OrderInfo>() {
@Override
public long extractTimestamp(OrderInfo orderInfo, long recordTimestamp) {
return orderInfo.getTimeStamp();
}
})
)
这里使用WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)),表示允许3秒的乱序。
然后指定时间戳分配器,从OrderInfo的timeStamp字段提取事件时间(毫秒)。
d. 分组、开窗、聚合:
.keyBy(orderInfo -> orderInfo.getUid()) // 按照用户ID分组
.window(TumblingEventTimeWindows.of(Time.seconds(5))) // 5秒的事件时间滚动窗口
.apply(new WindowFunction<OrderInfo, String, Integer, TimeWindow>() {
@Override
public void apply(Integer integer, TimeWindow window, Iterable<OrderInfo> input, Collector<String> out) throws Exception {
// 获取窗口的开始和结束时间
long start = window.getStart();
long end = window.getEnd();
String startStr = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss");
String endStr = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss");
int sumMoney = 0;
for (OrderInfo orderInfo : input) {
sumMoney += orderInfo.getMoney();
}
out.collect("开始时间:"+startStr+"结束时间:"+endStr+",用户id="+integer+",订单总额:"+sumMoney);
}
})
.print();
这里使用apply方法,传入一个WindowFunction,对窗口内的数据进行处理。注意,WindowFunction的输入类型是OrderInfo,输出类型是String,分组键的类型是Integer(即用户ID),窗口类型是TimeWindow。
在apply方法中,我们遍历窗口中的所有订单,累加金额,然后输出一个字符串,包含窗口开始结束时间、用户ID和总金额。
e. 执行程序:
env.execute();
几个关键的知识点分析:
-
assignTimestampsAndWatermarks:这个方法用于为数据流分配时间戳和水印。水印是一种衡量事件时间进展的机制,用于处理乱序事件。水印会告诉系统,在某个时间点之前的所有数据应该都已经到达了,之后可以触发窗口计算。
-
forBoundedOutOfOrderness:这是水印策略的一种,允许设置一个固定的乱序时间。例如,设置允许3秒的乱序,那么水印的时间就是当前最大事件时间减去3秒。这意味着,水印时间之后3秒内到达的数据(事件时间小于等于水印时间+3秒)仍然可以被分配到正确的窗口中。
-
withTimestampAssigner:这个方法用于从数据中提取事件时间戳。我们需要告诉Flink数据中的哪个字段是事件时间,并将其转换为毫秒时间戳。
-
TumblingEventTimeWindows:这是基于事件时间的滚动窗口。窗口的大小是固定的,并且不会重叠。窗口的触发是由水印决定的,当水印时间超过窗口的结束时间时,窗口就会触发计算。
关键时间节点:
-
事件时间(Event Time):数据实际发生的时间,由数据本身携带,通过
withTimestampAssigner提取。 -
处理时间(Processing Time):数据被处理的时间,即系统时间。
-
水印时间(Watermark):水印是一个时间戳,表示所有事件时间小于这个时间戳的数据都已经到达。水印时间 = 当前最大事件时间 - 允许的乱序时间。
窗口触发条件:当水印时间大于等于窗口的结束时间时,窗口触发计算。
在本例中,我们设置了5秒的滚动窗口,允许3秒的乱序。这意味着,每个窗口会在其结束时间之后3秒(即水印时间达到窗口结束时间)触发。例如,窗口[0, 5000)会在水印时间达到5000时触发,而水印时间达到5000意味着当前最大事件时间至少是8000(因为允许3秒乱序,水印时间=最大事件时间-3000)。所以,窗口[0,5000)会在事件时间8000的数据到达时触发。
但是,本例中的数据源使用的是当前系统时间,并且每秒产生一条数据。由于数据是实时产生的,事件时间接近处理时间,所以窗口触发可能会比较准时(但注意,由于允许3秒乱序,窗口触发会有3秒的延迟)。然而,如果数据有延迟,那么窗口触发会等待延迟的数据,直到水印时间推进。
posted on 2026-03-23 10:57 luzhouxiaoshuai 阅读(1) 评论(0) 收藏 举报
浙公网安备 33010602011771号