四、核心知识点4:water

需求升级:

实时模拟生成订单数据,格式为: (订单ID,用户ID,时间戳/事件时间,订单金额)

* 要求每隔5s,计算5秒内,每个用户的订单总金额

* 并添加Watermark来解决一定程度上的数据延迟和数据乱序问题。

package com.bigdata.day04;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.Random;
import java.util.UUID;

/**
 * @基本功能:
 * @program:FlinkDemo2
 * @author: 闫哥
 * @create:2025-04-18 14:27:41
 **/
public class _07WaterMarkDemo02 {

    @Data  // set get toString
    @AllArgsConstructor
    @NoArgsConstructor
    public static class OrderInfo{
        private String orderId;
        private int uid;
        private int money;
        private long timeStamp;
    }

    // 自定义source , 每隔1秒钟生成一个订单
    public static class MySource implements SourceFunction<OrderInfo> {

        private boolean flag = true;
        @Override
        public void run(SourceContext<OrderInfo> ctx) throws Exception {
            // 源源不断的产生数据
            Random random = new Random();
            while(flag){
                OrderInfo orderInfo = new OrderInfo();
                orderInfo.setOrderId(UUID.randomUUID().toString());
                orderInfo.setUid(random.nextInt(3));
                orderInfo.setMoney(random.nextInt(101));
                orderInfo.setTimeStamp(System.currentTimeMillis());
                ctx.collect(orderInfo);
                Thread.sleep(1000);// 间隔1s
            }
        }

        @Override
        public void cancel() {
            flag = false;
        }
    }

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);

        //2. source-加载数据
        DataStreamSource<OrderInfo> dataStreamSource = env.addSource(new MySource());

        dataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(
                new SerializableTimestampAssigner<OrderInfo>() {
                    // long 是时间戳吗?是秒值还是毫秒呢?年月日时分秒的的字段怎么办呢?
                    @Override
                    public long extractTimestamp(OrderInfo orderInfo, long recordTimestamp) {
                        // 这个方法的返回值是毫秒,所有的数据只要不是这个毫秒值,都需要转换为毫秒
                        return orderInfo.getTimeStamp();
                    }
                }
        )).keyBy(orderInfo -> orderInfo.getUid()).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new WindowFunction<OrderInfo, String, Integer, TimeWindow>() {
            @Override
            public void apply(Integer integer, TimeWindow window, Iterable<OrderInfo> input, Collector<String> out) throws Exception {
                long start = window.getStart();
                long end = window.getEnd();
                String startStr = DateFormatUtils.format(start,"yyyy-MM-dd HH:mm:ss");
                String endStr = DateFormatUtils.format(end,"yyyy-MM-dd HH:mm:ss");
                int sumMoney = 0;
                for (OrderInfo orderInfo : input) {
                    sumMoney += orderInfo.getMoney();
                }
                out.collect("开始时间:"+startStr+"结束时间:"+endStr+",用户id="+integer+",订单总额:"+sumMoney);
            }
        }).print();
        //4. sink-数据输出


        //5. execute-执行
        env.execute();
    }
}

这是一个Flink流处理程序,模拟生成订单数据,然后按照事件时间(使用Watermark处理乱序)进行5秒的滚动窗口,统计每个用户在5秒内的订单总额。

详细步骤:

  1. 定义了一个订单类OrderInfo,包含订单ID、用户ID、金额和时间戳。

  2. 自定义一个数据源MySource,每隔1秒生成一个随机订单(用户ID为0-2,金额0-100,时间戳为当前系统时间)。

  3. 主程序中,设置Flink执行环境,添加数据源。

  4. 为数据流分配时间戳和水印(Watermark),允许3秒的乱序。

  5. 按照用户ID进行分组,然后开一个5秒的事件时间滚动窗口。

  6. 在窗口上应用一个WindowFunction,计算每个窗口内每个用户的订单总金额,并输出窗口开始结束时间、用户ID和总金额。

  7. 打印结果。

下面我们逐行解释关键代码:

  1. 主类定义和OrderInfo类:

    • OrderInfo是一个简单的POJO,使用Lombok注解生成getter、setter、全参构造和无参构造。

  2. 自定义数据源MySource:

    • 实现SourceFunction接口,重写run和cancel方法。

    • run方法中,每秒钟生成一个随机订单,使用ctx.collect()发送数据。

    • cancel方法通过设置flag为false来终止数据生成。

  3. 主方法main:

    a. 设置执行环境:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); // 设置运行时模式为自动(根据输入源决定是流还是批)

    env.setParallelism(1); // 设置并行度为1,方便调试

    b. 加载数据源:

    DataStreamSource<OrderInfo> dataStreamSource = env.addSource(new MySource());

    c. 分配时间戳和水印:

    dataStreamSource.assignTimestampsAndWatermarks(

    WatermarkStrategy.<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))

    .withTimestampAssigner(new SerializableTimestampAssigner<OrderInfo>() {

    @Override

    public long extractTimestamp(OrderInfo orderInfo, long recordTimestamp) {

    return orderInfo.getTimeStamp();

    }

    })

    )

    这里使用WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)),表示允许3秒的乱序。

    然后指定时间戳分配器,从OrderInfo的timeStamp字段提取事件时间(毫秒)。

    d. 分组、开窗、聚合:

    .keyBy(orderInfo -> orderInfo.getUid()) // 按照用户ID分组

    .window(TumblingEventTimeWindows.of(Time.seconds(5))) // 5秒的事件时间滚动窗口

    .apply(new WindowFunction<OrderInfo, String, Integer, TimeWindow>() {

    @Override

    public void apply(Integer integer, TimeWindow window, Iterable<OrderInfo> input, Collector<String> out) throws Exception {

    // 获取窗口的开始和结束时间

    long start = window.getStart();

    long end = window.getEnd();

    String startStr = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss");

    String endStr = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss");

    int sumMoney = 0;

    for (OrderInfo orderInfo : input) {

    sumMoney += orderInfo.getMoney();

    }

    out.collect("开始时间:"+startStr+"结束时间:"+endStr+",用户id="+integer+",订单总额:"+sumMoney);

    }

    })

    .print();

    这里使用apply方法,传入一个WindowFunction,对窗口内的数据进行处理。注意,WindowFunction的输入类型是OrderInfo,输出类型是String,分组键的类型是Integer(即用户ID),窗口类型是TimeWindow。

    在apply方法中,我们遍历窗口中的所有订单,累加金额,然后输出一个字符串,包含窗口开始结束时间、用户ID和总金额。

    e. 执行程序:

    env.execute();

几个关键的知识点分析:

  1. assignTimestampsAndWatermarks:这个方法用于为数据流分配时间戳和水印。水印是一种衡量事件时间进展的机制,用于处理乱序事件。水印会告诉系统,在某个时间点之前的所有数据应该都已经到达了,之后可以触发窗口计算。

  2. forBoundedOutOfOrderness:这是水印策略的一种,允许设置一个固定的乱序时间。例如,设置允许3秒的乱序,那么水印的时间就是当前最大事件时间减去3秒。这意味着,水印时间之后3秒内到达的数据(事件时间小于等于水印时间+3秒)仍然可以被分配到正确的窗口中。

  3. withTimestampAssigner:这个方法用于从数据中提取事件时间戳。我们需要告诉Flink数据中的哪个字段是事件时间,并将其转换为毫秒时间戳。

  4. TumblingEventTimeWindows:这是基于事件时间的滚动窗口。窗口的大小是固定的,并且不会重叠。窗口的触发是由水印决定的,当水印时间超过窗口的结束时间时,窗口就会触发计算。

关键时间节点:

  • 事件时间(Event Time):数据实际发生的时间,由数据本身携带,通过withTimestampAssigner提取。

  • 处理时间(Processing Time):数据被处理的时间,即系统时间。

  • 水印时间(Watermark):水印是一个时间戳,表示所有事件时间小于这个时间戳的数据都已经到达。水印时间 = 当前最大事件时间 - 允许的乱序时间。

窗口触发条件:当水印时间大于等于窗口的结束时间时,窗口触发计算。

在本例中,我们设置了5秒的滚动窗口,允许3秒的乱序。这意味着,每个窗口会在其结束时间之后3秒(即水印时间达到窗口结束时间)触发。例如,窗口[0, 5000)会在水印时间达到5000时触发,而水印时间达到5000意味着当前最大事件时间至少是8000(因为允许3秒乱序,水印时间=最大事件时间-3000)。所以,窗口[0,5000)会在事件时间8000的数据到达时触发。

但是,本例中的数据源使用的是当前系统时间,并且每秒产生一条数据。由于数据是实时产生的,事件时间接近处理时间,所以窗口触发可能会比较准时(但注意,由于允许3秒乱序,窗口触发会有3秒的延迟)。然而,如果数据有延迟,那么窗口触发会等待延迟的数据,直到水印时间推进。

 

posted on 2026-03-23 10:57  luzhouxiaoshuai  阅读(1)  评论(0)    收藏  举报

导航