七、核心知识点:实时卡口拥堵分析--典型案例

卡口的实时拥堵情况,其实就是通过卡口的车辆平均车速和通过的车辆的数量,为了统计实时的平均车速,我设定一个滑动窗口,窗口长度是为5分钟,滑动步长为1分钟。平均车速=当前窗口内通过车辆的车速之和 / 当前窗口内通过的车辆数量

滑动窗口: 窗口长度是为5分钟,滑动步长为1分钟

平均车速 = 当前窗口内通过车辆的车速之和 / 当前窗口内通过的车辆数量

测试数据:

{"action_time":1715840361,"monitor_id":"0001","camera_id":"1","car":"豫A12345","speed":50,"road_id":"01","area_id":"20"}
{"action_time":1715840361,"monitor_id":"0001","camera_id":"1","car":"豫A12346","speed":60,"road_id":"01","area_id":"20"}
{"action_time":1715840361,"monitor_id":"0001","camera_id":"1","car":"豫A12347","speed":40,"road_id":"01","area_id":"20"}
{"action_time":1715840361,"monitor_id":"0001","camera_id":"1","car":"豫A12348","speed":90,"road_id":"01","area_id":"20"}
{"action_time":1715840361,"monitor_id":"0002","camera_id":"1","car":"豫A12346","speed":30,"road_id":"01","area_id":"20"}
{"action_time":1715840361,"monitor_id":"0002","camera_id":"1","car":"豫A12347","speed":40,"road_id":"01","area_id":"20"}

实时卡口平均速度需要保存到Mysql数据库中,结果表设计为:

DROP TABLE IF EXISTS `t_average_speed`;
CREATE TABLE `t_average_speed` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `start_time` bigint(20) DEFAULT NULL,   -- 窗口的开始时间
  `end_time` bigint(20) DEFAULT NULL,     -- 窗口的结束时间
  `monitor_id` varchar(255) DEFAULT NULL, -- 卡口编号 
  `avg_speed` double DEFAULT NULL,        -- 平均车速
  `car_count` int(11) DEFAULT NULL,       -- 车的数量
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

使用processingTime语义编写的需求:

package com.bigdata.smart;


import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class AverageSpeed {

    private Long startTime;
    private Long endTime;
    private String monitorId;
    private Double avgSpeed;
    private Integer carCount;

}

使用eventTime时间语义编写的需求:

package com.bigdata.smart;

import com.alibaba.fastjson.JSON;
import com.bigdata.day04._07WaterMarkDemo03;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.Duration;

/**
 * @基本功能:
 * @program:FlinkDemo2
 * @author: 闫哥
 * @create:2025-04-18 15:55:28
 **/
public class _02_智慧交通中卡口拥堵情况统计2 {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);

        //2. source-加载数据
        // 从kafka的topic-car 中获取数据
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("node01:9092,node02:9092,node03:9092")
                .setTopics("topic-car")
                .setGroupId("smart_jiaotong")
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStreamSource<String> dataStreamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");



        SingleOutputStreamOperator<CarInfo> mapStream = dataStreamSource.map(new RichMapFunction<String, CarInfo>() {


            @Override
            public CarInfo map(String jsonStr) throws Exception {

                CarInfo carInfo = JSON.parseObject(jsonStr, CarInfo.class);

                return carInfo;
            }
        });

        // 添加水印
        SingleOutputStreamOperator<CarInfo> outputStreamOperator = mapStream.assignTimestampsAndWatermarks(WatermarkStrategy.<CarInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(
                new SerializableTimestampAssigner<CarInfo>() {
                    // long 是时间戳吗?是秒值还是毫秒呢?年月日时分秒的的字段怎么办呢?
                    @Override
                    public long extractTimestamp(CarInfo carInfo, long recordTimestamp) {
                        // 这个方法的返回值是毫秒,所有的数据只要不是这个毫秒值,都需要转换为毫秒
                        return carInfo.getActionTime()*1000;
                    }
                }
        ));

        SingleOutputStreamOperator<AverageSpeed> resultStream = outputStreamOperator.keyBy(new KeySelector<CarInfo, String>() {

            @Override
            public String getKey(CarInfo carInfo) throws Exception {
                return carInfo.getMonitorId();
            }
        }).window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(30))).apply(new WindowFunction<CarInfo, AverageSpeed, String, TimeWindow>() {
            @Override
            public void apply(String monitorId, TimeWindow window, Iterable<CarInfo> input, Collector<AverageSpeed> out) throws Exception {

                AverageSpeed averageSpeed = new AverageSpeed();
                averageSpeed.setMonitorId(monitorId);
                averageSpeed.setStartTime(window.getStart());
                averageSpeed.setEndTime(window.getEnd());

                double sumSpeed = 0;
                int carCount = 0;
                for (CarInfo carInfo : input) {
                    sumSpeed += carInfo.getSpeed();
                    carCount++;
                }
                averageSpeed.setCarCount(carCount);
                averageSpeed.setAvgSpeed(sumSpeed / carCount);
                out.collect(averageSpeed);
            }
        });

        resultStream.print();

        //3. transformation-数据处理转换
        //4. sink-数据输出
        // 将过滤后的数据写入数据库
        JdbcConnectionOptions jdbcConnectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                .withDriverName("com.mysql.cj.jdbc.Driver")
                .withUrl("jdbc:mysql://node01:3306/smart_transportation")
                .withUsername("root").withPassword("123456").build();
        // 将数据写入到mysql中
        resultStream.addSink(JdbcSink.sink(
                "insert into t_average_speed values(null,?,?,?,?,?)", new JdbcStatementBuilder<AverageSpeed>() {
                    @Override
                    public void accept(PreparedStatement stat, AverageSpeed averageSpeed) throws SQLException {
                        stat.setLong(1,averageSpeed.getStartTime());
                        stat.setLong(2,averageSpeed.getEndTime());
                        stat.setString(3,averageSpeed.getMonitorId());
                        stat.setDouble(4,averageSpeed.getAvgSpeed());
                        stat.setInt(5,averageSpeed.getCarCount());
                    }
                }, JdbcExecutionOptions.builder().withBatchSize(1).build(), jdbcConnectionOptions

        ));


        //5. execute-执行
        env.execute();
    }
}

假如使用了EventTime作为判断依据的话,那么指定我们的实体中哪一列是eventTime时,一定要注意,返回值是秒值还是毫秒值 ,我们要的是毫秒值,假如不是 ,容易出现触发不了窗口计算的问题。

测试数据,最后一条是触发数据,不参与运算的。

每隔30秒,统计前1分钟的数据,触发条件就是30秒,30秒一次计算,所以在造最后一条触发数据的时候,一定要比第一条大30秒以上。

1715840360 ==> 2024-5-16 14:19:20 [14:19:00,14:20:00)

{"action_time":1715840360,"monitor_id":"0001","camera_id":"1","car":"豫A12345","speed":50,"road_id":"01","area_id":"20"}
{"action_time":1715840361,"monitor_id":"0001","camera_id":"1","car":"豫A12345","speed":55,"road_id":"01","area_id":"20"}
{"action_time":1715840362,"monitor_id":"0001","camera_id":"1","car":"豫A12345","speed":60,"road_id":"01","area_id":"20"}
{"action_time":1715840370,"monitor_id":"0001","camera_id":"1","car":"豫A12345","speed":50,"road_id":"01","area_id":"20"}

假如你的水印时间是3秒,此时上面4条数据不会触发,需要再来一条
{"action_time":1715840373,"monitor_id":"0001","camera_id":"1","car":"豫A12345","speed":50,"road_id":"01","area_id":"20"}

假如你的程序已经运行过2025年的数据了,这个窗口已经计算到了2025年,2024年的窗口早就关闭了,你运行上面的数据是没有效果的,需要重新启动一下flink程序。

 

详细代码分析

一、整体架构与功能

这是一个基于 Flink 的实时交通流分析系统,主要功能是:

  1. 从 Kafka 消费车辆实时数据

  2. 按监控卡口分组,统计每个卡口的平均车速

  3. 使用事件时间窗口(滑动窗口)计算

  4. 将结果存储到 MySQL 数据库


二、代码分层详细分析

1. 环境与执行模式设置

// 1. env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);
  • 执行环境:获取 Flink 执行环境

  • 运行模式AUTOMATIC自动选择运行模式(批处理/流处理),根据数据源自动判断

  • 并行度:设置为 1(测试/开发环境常见,生产环境需要调整)


2. 数据源(Source)配置

KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers("node01:9092,node02:9092,node03:9092")
    .setTopics("topic-car")
    .setGroupId("smart_jiaotong")
    .setStartingOffsets(OffsetsInitializer.latest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

DataStreamSource<String> dataStreamSource = env.fromSource(
    source, 
    WatermarkStrategy.noWatermarks(),  // 问题1:这里不设置水印
    "Kafka Source"
);

分析

  • Kafka 连接:连接3个节点的Kafka集群

  • 消费组smart_jiaotong

  • 起始位置:从最新位置开始消费(latest()),这意味着不会消费历史数据

  • 反序列化:使用简单字符串反序列化器

  • ⚠️ 问题1:这里使用了 WatermarkStrategy.noWatermarks(),但后续重新分配了水印,这会导致事件时间不一致


3. 数据转换与解析

SingleOutputStreamOperator<CarInfo> mapStream = dataStreamSource.map(new RichMapFunction<String, CarInfo>() {
    @Override
    public CarInfo map(String jsonStr) throws Exception {
        CarInfo carInfo = JSON.parseObject(jsonStr, CarInfo.class);
        return carInfo;
    }
});

分析

  • 将 JSON 字符串解析为 CarInfo对象

  • 使用 FastJSON 进行解析

  • 这是一个简单的 map转换

  • 建议:在生产环境中应该添加异常处理,防止解析失败导致任务失败


4. 水印策略与时间戳分配

SingleOutputStreamOperator<CarInfo> outputStreamOperator = mapStream.assignTimestampsAndWatermarks(
    WatermarkStrategy.<CarInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))
    .withTimestampAssigner(new SerializableTimestampAssigner<CarInfo>() {
        @Override
        public long extractTimestamp(CarInfo carInfo, long recordTimestamp) {
            // 将秒转换为毫秒
            return carInfo.getActionTime() * 1000;
        }
    })
);

关键点分析

4.1 水印策略

  • 策略forBoundedOutOfOrderness(Duration.ofSeconds(3))

  • 含义:允许最多3秒的乱序数据

  • 水印计算Watermark = 最大事件时间 - 3秒

4.2 时间戳提取

  • 事件时间字段CarInfo.getActionTime()

  • 单位转换:假设原始数据是秒级时间戳,转换为毫秒

  • 重要假设:代码假设 actionTime是秒值,但需确认数据实际格式

4.3 ⚠️ 潜在问题

  • 时间戳精度:如果原始数据已经是毫秒,这里就重复乘了1000

  • 时区处理:代码没有处理时区问题

  • 水印位置:应该在map之后立即分配水印,但这里的位置是正确的


5. 核心计算逻辑

SingleOutputStreamOperator<AverageSpeed> resultStream = outputStreamOperator
    .keyBy(new KeySelector<CarInfo, String>() {
        @Override
        public String getKey(CarInfo carInfo) throws Exception {
            return carInfo.getMonitorId();
        }
    })
    .window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(30)))
    .apply(new WindowFunction<CarInfo, AverageSpeed, String, TimeWindow>() {
        @Override
        public void apply(String monitorId, TimeWindow window, Iterable<CarInfo> input, 
                         Collector<AverageSpeed> out) throws Exception {
            
            AverageSpeed averageSpeed = new AverageSpeed();
            averageSpeed.setMonitorId(monitorId);
            averageSpeed.setStartTime(window.getStart());
            averageSpeed.setEndTime(window.getEnd());
            
            double sumSpeed = 0;
            int carCount = 0;
            for (CarInfo carInfo : input) {
                sumSpeed += carInfo.getSpeed();
                carCount++;
            }
            averageSpeed.setCarCount(carCount);
            averageSpeed.setAvgSpeed(sumSpeed / carCount);
            out.collect(averageSpeed);
        }
    });

详细分析

5.1 分组(KeyBy)

  • monitorId(监控点ID)

  • 含义:按监控卡口分组统计

5.2 窗口定义

  • 窗口类型:滑动事件时间窗口

  • 参数SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(30))

    • 窗口长度:1分钟

    • 滑动间隔:30秒

  • 窗口计算频率:每30秒计算一次过去1分钟的数据

窗口示例

时间轴:0s   30s   60s   90s   120s
窗口1:[0s, 60s)    ← 第一次触发
窗口2:[30s, 90s)   ← 30秒后第二次触发
窗口3:[60s, 120s)  ← 60秒后第三次触发

5.3 窗口函数

  • 函数类型WindowFunction(全量窗口函数)

  • 计算逻辑

    1. 遍历窗口内所有数据

    2. 计算总速度和车辆数

    3. 计算平均速度

  • 输出AverageSpeed对象,包含监控点ID、窗口起止时间、车辆数、平均速度

5.4 ⚠️ 性能优化建议

// 建议使用增量聚合 + 全量窗口的方式,提高性能
.window(...)
.reduce(
    new ReduceFunction<CarInfo>() {
        @Override
        public CarInfo reduce(CarInfo value1, CarInfo value2) {
            // 增量聚合:累加速度和计数
        }
    },
    new ProcessWindowFunction<CarInfo, AverageSpeed, String, TimeWindow>() {
        @Override
        public void process(String key, Context context, 
                          Iterable<CarInfo> elements, Collector<AverageSpeed> out) {
            // 窗口关闭时计算最终结果
        }
    }
)

6. 输出与存储

6.1 控制台输出

resultStream.print();
  • 用于调试和监控

  • 生产环境建议使用日志系统替代

6.2 MySQL 存储

JdbcConnectionOptions jdbcConnectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
    .withDriverName("com.mysql.cj.jdbc.Driver")
    .withUrl("jdbc:mysql://node01:3306/smart_transportation")
    .withUsername("root")
    .withPassword("123456")
    .build();

resultStream.addSink(JdbcSink.sink(
    "insert into t_average_speed values(null,?,?,?,?,?)", 
    new JdbcStatementBuilder<AverageSpeed>() {
        @Override
        public void accept(PreparedStatement stat, AverageSpeed averageSpeed) throws SQLException {
            stat.setLong(1, averageSpeed.getStartTime());
            stat.setLong(2, averageSpeed.getEndTime());
            stat.setString(3, averageSpeed.getMonitorId());
            stat.setDouble(4, averageSpeed.getAvgSpeed());
            stat.setInt(5, averageSpeed.getCarCount());
        }
    }, 
    JdbcExecutionOptions.builder().withBatchSize(1).build(),  // 问题2:批量大小为1
    jdbcConnectionOptions
));

分析

  • 表结构:包含自增ID、窗口开始时间、窗口结束时间、监控点ID、平均速度、车辆数

  • 连接配置:使用MySQL 8.x驱动(com.mysql.cj.jdbc.Driver

  • ⚠️ 问题2withBatchSize(1)表示每条数据都立即写入数据库,性能极差

    • 建议:设置为合适的批处理大小(如1000)


三、数据流转完整流程

Kafka (topic-car)
    ↓ (JSON字符串)
Flink Source (String)
    ↓ (map转换,JSON→CarInfo)
CarInfo Stream
    ↓ (分配水印,3秒乱序容忍)
CarInfo Stream with Watermark
    ↓ (按monitorId分组)
KeyedStream
    ↓ (滑动窗口:1分钟窗口,30秒滑动)
WindowedStream
    ↓ (窗口函数:计算平均速度)
AverageSpeed Stream
    ↓
控制台输出 + MySQL存储

四、核心业务逻辑梳理

1. 时间语义

  • 使用事件时间:基于车辆通过时间(actionTime

  • 乱序处理:允许3秒乱序

  • 窗口触发:基于事件时间窗口

2. 窗口策略

  • 滑动窗口:每30秒计算一次过去1分钟的数据

  • 重叠窗口:相邻窗口有30秒重叠

  • 应用场景:适合需要观察数据变化趋势的场景

3. 统计指标

  • 按监控点:每个卡口独立统计

  • 平均速度:窗口内所有车辆速度的平均值

  • 车辆计数:窗口内通过的车辆数


五、代码优化建议

1. 水印策略优化

// 当前:在source后不设置水印,在map后设置
// 建议:统一在source处设置
DataStreamSource<CarInfo> source = env.fromSource(
    kafkaSource,
    WatermarkStrategy
        .<CarInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))
        .withTimestampAssigner((element, recordTimestamp) -> 
            element.getActionTime() * 1000L)
        .withIdleness(Duration.ofMinutes(5)),  // 添加空闲检测
    "Kafka Source"
);

2. 批处理优化

// 当前:每条数据都写数据库
JdbcExecutionOptions.builder().withBatchSize(1).build()

// 建议:使用批量写入
JdbcExecutionOptions.builder()
    .withBatchSize(1000)           // 每批1000条
    .withBatchIntervalMs(5000)     // 或每5秒
    .withMaxRetries(3)             // 重试3次
    .build();

3. 异常处理增强

// 在map转换中添加异常处理
SingleOutputStreamOperator<CarInfo> mapStream = dataStreamSource
    .map(new RichMapFunction<String, CarInfo>() {
        @Override
        public CarInfo map(String jsonStr) throws Exception {
            try {
                return JSON.parseObject(jsonStr, CarInfo.class);
            } catch (Exception e) {
                // 记录到侧输出流
                context.output(errorTag, jsonStr);
                return null;
            }
        }
    })
    .filter(Objects::nonNull);  // 过滤掉解析失败的数据

4. 性能优化

// 使用增量聚合优化窗口计算
.window(...)
.aggregate(new AvgSpeedAggregate(), new AvgSpeedWindowFunction())

// 或者使用ProcessWindowFunction优化内存
.window(...)
.process(new ProcessWindowFunction<CarInfo, AverageSpeed, String, TimeWindow>() {
    // 可以使用状态管理
});

5. 配置参数化

// 将硬编码参数提取为配置
public static final Duration MAX_OUT_OF_ORDER = Duration.ofSeconds(3);
public static final Time WINDOW_SIZE = Time.minutes(1);
public static final Time WINDOW_SLIDE = Time.seconds(30);
public static final int JDBC_BATCH_SIZE = 1000;

六、潜在问题与解决

1. 时间戳单位问题

  • 假设actionTime是秒值

  • 风险:如果数据源提供毫秒值,会错误地乘1000

  • 验证:检查数据源实际格式

2. 水印生成位置

  • 当前代码在source和map后分别处理水印,可能不一致

  • 建议统一处理

3. 数据库写入性能

  • 批量大小为1严重影响性能

  • 无重试机制,网络波动可能导致数据丢失

4. 无迟到数据处理

  • 只有3秒乱序容忍,超过3秒的数据会被丢弃

  • 可考虑添加侧输出流捕获迟到数据

5. 无状态清理

  • 长时间运行可能导致状态无限增长

  • 可考虑配置状态TTL


七、总结

这是一个典型的实时交通流分析系统,具有以下特点:

优点

  1. 架构清晰,数据流转明确

  2. 使用事件时间处理乱序数据

  3. 结果同时输出到控制台和数据库

  4. 使用滑动窗口适合趋势分析

改进空间

  1. 性能优化(批处理、增量聚合)

  2. 异常处理增强

  3. 配置参数化

  4. 监控和告警集成

适用场景

  • 城市交通实时监控

  • 卡口拥堵预警

  • 交通流量分析

  • 实时路况统计

posted on 2026-03-23 15:28  luzhouxiaoshuai  阅读(0)  评论(0)    收藏  举报

导航