七、核心知识点:实时卡口拥堵分析--典型案例
卡口的实时拥堵情况,其实就是通过卡口的车辆平均车速和通过的车辆的数量,为了统计实时的平均车速,我设定一个滑动窗口,窗口长度是为5分钟,滑动步长为1分钟。平均车速=当前窗口内通过车辆的车速之和 / 当前窗口内通过的车辆数量
滑动窗口: 窗口长度是为5分钟,滑动步长为1分钟
平均车速 = 当前窗口内通过车辆的车速之和 / 当前窗口内通过的车辆数量
测试数据:
{"action_time":1715840361,"monitor_id":"0001","camera_id":"1","car":"豫A12345","speed":50,"road_id":"01","area_id":"20"}
{"action_time":1715840361,"monitor_id":"0001","camera_id":"1","car":"豫A12346","speed":60,"road_id":"01","area_id":"20"}
{"action_time":1715840361,"monitor_id":"0001","camera_id":"1","car":"豫A12347","speed":40,"road_id":"01","area_id":"20"}
{"action_time":1715840361,"monitor_id":"0001","camera_id":"1","car":"豫A12348","speed":90,"road_id":"01","area_id":"20"}
{"action_time":1715840361,"monitor_id":"0002","camera_id":"1","car":"豫A12346","speed":30,"road_id":"01","area_id":"20"}
{"action_time":1715840361,"monitor_id":"0002","camera_id":"1","car":"豫A12347","speed":40,"road_id":"01","area_id":"20"}
实时卡口平均速度需要保存到Mysql数据库中,结果表设计为:
DROP TABLE IF EXISTS `t_average_speed`; CREATE TABLE `t_average_speed` ( `id` int(11) NOT NULL AUTO_INCREMENT, `start_time` bigint(20) DEFAULT NULL, -- 窗口的开始时间 `end_time` bigint(20) DEFAULT NULL, -- 窗口的结束时间 `monitor_id` varchar(255) DEFAULT NULL, -- 卡口编号 `avg_speed` double DEFAULT NULL, -- 平均车速 `car_count` int(11) DEFAULT NULL, -- 车的数量 PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
使用processingTime语义编写的需求:
package com.bigdata.smart; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @Data @NoArgsConstructor @AllArgsConstructor public class AverageSpeed { private Long startTime; private Long endTime; private String monitorId; private Double avgSpeed; private Integer carCount; }
使用eventTime时间语义编写的需求:
package com.bigdata.smart; import com.alibaba.fastjson.JSON; import com.bigdata.day04._07WaterMarkDemo03; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.connector.jdbc.JdbcConnectionOptions; import org.apache.flink.connector.jdbc.JdbcExecutionOptions; import org.apache.flink.connector.jdbc.JdbcSink; import org.apache.flink.connector.jdbc.JdbcStatementBuilder; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.sql.PreparedStatement; import java.sql.SQLException; import java.time.Duration; /** * @基本功能: * @program:FlinkDemo2 * @author: 闫哥 * @create:2025-04-18 15:55:28 **/ public class _02_智慧交通中卡口拥堵情况统计2 { public static void main(String[] args) throws Exception { //1. env-准备环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); env.setParallelism(1); //2. source-加载数据 // 从kafka的topic-car 中获取数据 KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers("node01:9092,node02:9092,node03:9092") .setTopics("topic-car") .setGroupId("smart_jiaotong") .setStartingOffsets(OffsetsInitializer.latest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); DataStreamSource<String> dataStreamSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source"); SingleOutputStreamOperator<CarInfo> mapStream = dataStreamSource.map(new RichMapFunction<String, CarInfo>() { @Override public CarInfo map(String jsonStr) throws Exception { CarInfo carInfo = JSON.parseObject(jsonStr, CarInfo.class); return carInfo; } }); // 添加水印 SingleOutputStreamOperator<CarInfo> outputStreamOperator = mapStream.assignTimestampsAndWatermarks(WatermarkStrategy.<CarInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner( new SerializableTimestampAssigner<CarInfo>() { // long 是时间戳吗?是秒值还是毫秒呢?年月日时分秒的的字段怎么办呢? @Override public long extractTimestamp(CarInfo carInfo, long recordTimestamp) { // 这个方法的返回值是毫秒,所有的数据只要不是这个毫秒值,都需要转换为毫秒 return carInfo.getActionTime()*1000; } } )); SingleOutputStreamOperator<AverageSpeed> resultStream = outputStreamOperator.keyBy(new KeySelector<CarInfo, String>() { @Override public String getKey(CarInfo carInfo) throws Exception { return carInfo.getMonitorId(); } }).window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(30))).apply(new WindowFunction<CarInfo, AverageSpeed, String, TimeWindow>() { @Override public void apply(String monitorId, TimeWindow window, Iterable<CarInfo> input, Collector<AverageSpeed> out) throws Exception { AverageSpeed averageSpeed = new AverageSpeed(); averageSpeed.setMonitorId(monitorId); averageSpeed.setStartTime(window.getStart()); averageSpeed.setEndTime(window.getEnd()); double sumSpeed = 0; int carCount = 0; for (CarInfo carInfo : input) { sumSpeed += carInfo.getSpeed(); carCount++; } averageSpeed.setCarCount(carCount); averageSpeed.setAvgSpeed(sumSpeed / carCount); out.collect(averageSpeed); } }); resultStream.print(); //3. transformation-数据处理转换 //4. sink-数据输出 // 将过滤后的数据写入数据库 JdbcConnectionOptions jdbcConnectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withDriverName("com.mysql.cj.jdbc.Driver") .withUrl("jdbc:mysql://node01:3306/smart_transportation") .withUsername("root").withPassword("123456").build(); // 将数据写入到mysql中 resultStream.addSink(JdbcSink.sink( "insert into t_average_speed values(null,?,?,?,?,?)", new JdbcStatementBuilder<AverageSpeed>() { @Override public void accept(PreparedStatement stat, AverageSpeed averageSpeed) throws SQLException { stat.setLong(1,averageSpeed.getStartTime()); stat.setLong(2,averageSpeed.getEndTime()); stat.setString(3,averageSpeed.getMonitorId()); stat.setDouble(4,averageSpeed.getAvgSpeed()); stat.setInt(5,averageSpeed.getCarCount()); } }, JdbcExecutionOptions.builder().withBatchSize(1).build(), jdbcConnectionOptions )); //5. execute-执行 env.execute(); } }
假如使用了EventTime作为判断依据的话,那么指定我们的实体中哪一列是eventTime时,一定要注意,返回值是秒值还是毫秒值 ,我们要的是毫秒值,假如不是 ,容易出现触发不了窗口计算的问题。
测试数据,最后一条是触发数据,不参与运算的。
每隔30秒,统计前1分钟的数据,触发条件就是30秒,30秒一次计算,所以在造最后一条触发数据的时候,一定要比第一条大30秒以上。
1715840360 ==> 2024-5-16 14:19:20 [14:19:00,14:20:00)
{"action_time":1715840360,"monitor_id":"0001","camera_id":"1","car":"豫A12345","speed":50,"road_id":"01","area_id":"20"}
{"action_time":1715840361,"monitor_id":"0001","camera_id":"1","car":"豫A12345","speed":55,"road_id":"01","area_id":"20"}
{"action_time":1715840362,"monitor_id":"0001","camera_id":"1","car":"豫A12345","speed":60,"road_id":"01","area_id":"20"}
{"action_time":1715840370,"monitor_id":"0001","camera_id":"1","car":"豫A12345","speed":50,"road_id":"01","area_id":"20"}
假如你的水印时间是3秒,此时上面4条数据不会触发,需要再来一条
{"action_time":1715840373,"monitor_id":"0001","camera_id":"1","car":"豫A12345","speed":50,"road_id":"01","area_id":"20"}
假如你的程序已经运行过2025年的数据了,这个窗口已经计算到了2025年,2024年的窗口早就关闭了,你运行上面的数据是没有效果的,需要重新启动一下flink程序。
详细代码分析
一、整体架构与功能
这是一个基于 Flink 的实时交通流分析系统,主要功能是:
-
从 Kafka 消费车辆实时数据
-
按监控卡口分组,统计每个卡口的平均车速
-
使用事件时间窗口(滑动窗口)计算
-
将结果存储到 MySQL 数据库
二、代码分层详细分析
1. 环境与执行模式设置
// 1. env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);
-
执行环境:获取 Flink 执行环境
-
运行模式:
AUTOMATIC自动选择运行模式(批处理/流处理),根据数据源自动判断 -
并行度:设置为 1(测试/开发环境常见,生产环境需要调整)
2. 数据源(Source)配置
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("node01:9092,node02:9092,node03:9092")
.setTopics("topic-car")
.setGroupId("smart_jiaotong")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStreamSource<String> dataStreamSource = env.fromSource(
source,
WatermarkStrategy.noWatermarks(), // 问题1:这里不设置水印
"Kafka Source"
);
分析:
-
Kafka 连接:连接3个节点的Kafka集群
-
消费组:
smart_jiaotong -
起始位置:从最新位置开始消费(
latest()),这意味着不会消费历史数据 -
反序列化:使用简单字符串反序列化器
-
⚠️ 问题1:这里使用了
WatermarkStrategy.noWatermarks(),但后续重新分配了水印,这会导致事件时间不一致
3. 数据转换与解析
SingleOutputStreamOperator<CarInfo> mapStream = dataStreamSource.map(new RichMapFunction<String, CarInfo>() {
@Override
public CarInfo map(String jsonStr) throws Exception {
CarInfo carInfo = JSON.parseObject(jsonStr, CarInfo.class);
return carInfo;
}
});
分析:
-
将 JSON 字符串解析为
CarInfo对象 -
使用 FastJSON 进行解析
-
这是一个简单的
map转换 -
建议:在生产环境中应该添加异常处理,防止解析失败导致任务失败
4. 水印策略与时间戳分配
SingleOutputStreamOperator<CarInfo> outputStreamOperator = mapStream.assignTimestampsAndWatermarks(
WatermarkStrategy.<CarInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(new SerializableTimestampAssigner<CarInfo>() {
@Override
public long extractTimestamp(CarInfo carInfo, long recordTimestamp) {
// 将秒转换为毫秒
return carInfo.getActionTime() * 1000;
}
})
);
关键点分析:
4.1 水印策略
-
策略:
forBoundedOutOfOrderness(Duration.ofSeconds(3)) -
含义:允许最多3秒的乱序数据
-
水印计算:
Watermark = 最大事件时间 - 3秒
4.2 时间戳提取
-
事件时间字段:
CarInfo.getActionTime() -
单位转换:假设原始数据是秒级时间戳,转换为毫秒
-
重要假设:代码假设
actionTime是秒值,但需确认数据实际格式
4.3 ⚠️ 潜在问题
-
时间戳精度:如果原始数据已经是毫秒,这里就重复乘了1000
-
时区处理:代码没有处理时区问题
-
水印位置:应该在
map之后立即分配水印,但这里的位置是正确的
5. 核心计算逻辑
SingleOutputStreamOperator<AverageSpeed> resultStream = outputStreamOperator
.keyBy(new KeySelector<CarInfo, String>() {
@Override
public String getKey(CarInfo carInfo) throws Exception {
return carInfo.getMonitorId();
}
})
.window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(30)))
.apply(new WindowFunction<CarInfo, AverageSpeed, String, TimeWindow>() {
@Override
public void apply(String monitorId, TimeWindow window, Iterable<CarInfo> input,
Collector<AverageSpeed> out) throws Exception {
AverageSpeed averageSpeed = new AverageSpeed();
averageSpeed.setMonitorId(monitorId);
averageSpeed.setStartTime(window.getStart());
averageSpeed.setEndTime(window.getEnd());
double sumSpeed = 0;
int carCount = 0;
for (CarInfo carInfo : input) {
sumSpeed += carInfo.getSpeed();
carCount++;
}
averageSpeed.setCarCount(carCount);
averageSpeed.setAvgSpeed(sumSpeed / carCount);
out.collect(averageSpeed);
}
});
详细分析:
5.1 分组(KeyBy)
-
键:
monitorId(监控点ID) -
含义:按监控卡口分组统计
5.2 窗口定义
-
窗口类型:滑动事件时间窗口
-
参数:
SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(30))-
窗口长度:1分钟
-
滑动间隔:30秒
-
-
窗口计算频率:每30秒计算一次过去1分钟的数据
窗口示例:
时间轴:0s 30s 60s 90s 120s
窗口1:[0s, 60s) ← 第一次触发
窗口2:[30s, 90s) ← 30秒后第二次触发
窗口3:[60s, 120s) ← 60秒后第三次触发
5.3 窗口函数
-
函数类型:
WindowFunction(全量窗口函数) -
计算逻辑:
-
遍历窗口内所有数据
-
计算总速度和车辆数
-
计算平均速度
-
-
输出:
AverageSpeed对象,包含监控点ID、窗口起止时间、车辆数、平均速度
5.4 ⚠️ 性能优化建议
// 建议使用增量聚合 + 全量窗口的方式,提高性能
.window(...)
.reduce(
new ReduceFunction<CarInfo>() {
@Override
public CarInfo reduce(CarInfo value1, CarInfo value2) {
// 增量聚合:累加速度和计数
}
},
new ProcessWindowFunction<CarInfo, AverageSpeed, String, TimeWindow>() {
@Override
public void process(String key, Context context,
Iterable<CarInfo> elements, Collector<AverageSpeed> out) {
// 窗口关闭时计算最终结果
}
}
)
6. 输出与存储
6.1 控制台输出
resultStream.print();
-
用于调试和监控
-
生产环境建议使用日志系统替代
6.2 MySQL 存储
JdbcConnectionOptions jdbcConnectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUrl("jdbc:mysql://node01:3306/smart_transportation")
.withUsername("root")
.withPassword("123456")
.build();
resultStream.addSink(JdbcSink.sink(
"insert into t_average_speed values(null,?,?,?,?,?)",
new JdbcStatementBuilder<AverageSpeed>() {
@Override
public void accept(PreparedStatement stat, AverageSpeed averageSpeed) throws SQLException {
stat.setLong(1, averageSpeed.getStartTime());
stat.setLong(2, averageSpeed.getEndTime());
stat.setString(3, averageSpeed.getMonitorId());
stat.setDouble(4, averageSpeed.getAvgSpeed());
stat.setInt(5, averageSpeed.getCarCount());
}
},
JdbcExecutionOptions.builder().withBatchSize(1).build(), // 问题2:批量大小为1
jdbcConnectionOptions
));
分析:
-
表结构:包含自增ID、窗口开始时间、窗口结束时间、监控点ID、平均速度、车辆数
-
连接配置:使用MySQL 8.x驱动(
com.mysql.cj.jdbc.Driver) -
⚠️ 问题2:
withBatchSize(1)表示每条数据都立即写入数据库,性能极差-
建议:设置为合适的批处理大小(如1000)
-
三、数据流转完整流程
Kafka (topic-car)
↓ (JSON字符串)
Flink Source (String)
↓ (map转换,JSON→CarInfo)
CarInfo Stream
↓ (分配水印,3秒乱序容忍)
CarInfo Stream with Watermark
↓ (按monitorId分组)
KeyedStream
↓ (滑动窗口:1分钟窗口,30秒滑动)
WindowedStream
↓ (窗口函数:计算平均速度)
AverageSpeed Stream
↓
控制台输出 + MySQL存储
四、核心业务逻辑梳理
1. 时间语义
-
使用事件时间:基于车辆通过时间(
actionTime) -
乱序处理:允许3秒乱序
-
窗口触发:基于事件时间窗口
2. 窗口策略
-
滑动窗口:每30秒计算一次过去1分钟的数据
-
重叠窗口:相邻窗口有30秒重叠
-
应用场景:适合需要观察数据变化趋势的场景
3. 统计指标
-
按监控点:每个卡口独立统计
-
平均速度:窗口内所有车辆速度的平均值
-
车辆计数:窗口内通过的车辆数
五、代码优化建议
1. 水印策略优化
// 当前:在source后不设置水印,在map后设置
// 建议:统一在source处设置
DataStreamSource<CarInfo> source = env.fromSource(
kafkaSource,
WatermarkStrategy
.<CarInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((element, recordTimestamp) ->
element.getActionTime() * 1000L)
.withIdleness(Duration.ofMinutes(5)), // 添加空闲检测
"Kafka Source"
);
2. 批处理优化
// 当前:每条数据都写数据库
JdbcExecutionOptions.builder().withBatchSize(1).build()
// 建议:使用批量写入
JdbcExecutionOptions.builder()
.withBatchSize(1000) // 每批1000条
.withBatchIntervalMs(5000) // 或每5秒
.withMaxRetries(3) // 重试3次
.build();
3. 异常处理增强
// 在map转换中添加异常处理
SingleOutputStreamOperator<CarInfo> mapStream = dataStreamSource
.map(new RichMapFunction<String, CarInfo>() {
@Override
public CarInfo map(String jsonStr) throws Exception {
try {
return JSON.parseObject(jsonStr, CarInfo.class);
} catch (Exception e) {
// 记录到侧输出流
context.output(errorTag, jsonStr);
return null;
}
}
})
.filter(Objects::nonNull); // 过滤掉解析失败的数据
4. 性能优化
// 使用增量聚合优化窗口计算
.window(...)
.aggregate(new AvgSpeedAggregate(), new AvgSpeedWindowFunction())
// 或者使用ProcessWindowFunction优化内存
.window(...)
.process(new ProcessWindowFunction<CarInfo, AverageSpeed, String, TimeWindow>() {
// 可以使用状态管理
});
5. 配置参数化
// 将硬编码参数提取为配置
public static final Duration MAX_OUT_OF_ORDER = Duration.ofSeconds(3);
public static final Time WINDOW_SIZE = Time.minutes(1);
public static final Time WINDOW_SLIDE = Time.seconds(30);
public static final int JDBC_BATCH_SIZE = 1000;
六、潜在问题与解决
1. 时间戳单位问题
-
假设:
actionTime是秒值 -
风险:如果数据源提供毫秒值,会错误地乘1000
-
验证:检查数据源实际格式
2. 水印生成位置
-
当前代码在source和map后分别处理水印,可能不一致
-
建议统一处理
3. 数据库写入性能
-
批量大小为1严重影响性能
-
无重试机制,网络波动可能导致数据丢失
4. 无迟到数据处理
-
只有3秒乱序容忍,超过3秒的数据会被丢弃
-
可考虑添加侧输出流捕获迟到数据
5. 无状态清理
-
长时间运行可能导致状态无限增长
-
可考虑配置状态TTL
七、总结
这是一个典型的实时交通流分析系统,具有以下特点:
优点:
-
架构清晰,数据流转明确
-
使用事件时间处理乱序数据
-
结果同时输出到控制台和数据库
-
使用滑动窗口适合趋势分析
改进空间:
-
性能优化(批处理、增量聚合)
-
异常处理增强
-
配置参数化
-
监控和告警集成
适用场景:
-
城市交通实时监控
-
卡口拥堵预警
-
交通流量分析
-
实时路况统计
posted on 2026-03-23 15:28 luzhouxiaoshuai 阅读(0) 评论(0) 收藏 举报
浙公网安备 33010602011771号