Netty+Guice-个人开发框架之-IOT框架
📚 目录
🎯 核心概念
什么是IOT框架?
IOT框架是一个用于管理和控制各种工业设备的通信框架,支持多种协议(TCP、Modbus、PLC等)。
核心组件
- Device: 设备抽象,代表一个物理设备
- Handler: 协议处理器,处理具体的通信逻辑
- Channel: 网络连接通道,基于Netty
- Future: 异步结果容器,实现异步转同步
🌐 Netty基础知识
Netty是什么?
Netty是一个高性能的网络通信框架,简化了网络编程的复杂性。
核心概念
1. Channel(通道)
// Channel就像一条电话线
Channel channel = bootstrap.connect("192.168.1.100", 8080).sync().channel();
// 可以发送数据
channel.writeAndFlush("Hello World");
// 可以接收数据(通过Handler)
2. Pipeline(管道)
// Pipeline是数据处理流水线
ChannelPipeline pipeline = channel.pipeline();
// 添加处理器(按顺序执行)
pipeline.addLast("decoder", new StringDecoder()); // 字节→字符串
pipeline.addLast("encoder", new StringEncoder()); // 字符串→字节
pipeline.addLast("handler", new MyBusinessHandler()); // 业务处理
3. Handler(处理器)
public class MyHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 当数据到达时,Netty自动调用这个方法
String data = (String) msg;
System.out.println("收到数据: " + data);
}
}
4. EventLoop(事件循环)
// EventLoop是Netty的线程模型核心
// 一个EventLoop处理多个Channel的IO事件
// 所有IO操作都在EventLoop线程中执行
TCP连接建立过程
// 1. 创建Bootstrap(客户端启动器)
Bootstrap bootstrap = new Bootstrap();
// 2. 配置EventLoopGroup(线程组)
EventLoopGroup group = new NioEventLoopGroup();
bootstrap.group(group);
// 3. 设置Channel类型
bootstrap.channel(NioSocketChannel.class);
// 4. 配置Pipeline
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new MyHandler());
}
});
// 5. 连接服务器
ChannelFuture future = bootstrap.connect("192.168.1.100", 8080);
Channel channel = future.sync().channel();
🔮 异步编程与Future
为什么需要异步编程?
问题:网络通信的延迟
// 用户期望:
String result = device.execute("scan"); // 立即得到结果
System.out.println(result);
// 现实:
// 1. 发送请求到网络 (10ms)
// 2. 网络传输 (50ms)
// 3. 设备处理 (100ms)
// 4. 响应传输 (50ms)
// 总计:210ms延迟
Future的作用
1. 异步结果容器
// Future就像一个"未来的盒子"
CompletableFuture<String> future = new CompletableFuture<>();
// 现在盒子是空的
System.out.println("Future创建了,但还没有结果");
// 将来某个时候,放入结果
future.complete("扫码结果:BARCODE_123456");
// 等待结果
String result = future.get(); // 阻塞等待,直到有结果
2. 线程间通信
CompletableFuture<String> future = new CompletableFuture<>();
// 线程A:等待结果
new Thread(() -> {
try {
System.out.println("开始等待...");
String result = future.get(); // 阻塞等待
System.out.println("收到结果: " + result);
} catch (Exception e) {
e.printStackTrace();
}
}).start();
// 线程B:设置结果
new Thread(() -> {
try {
Thread.sleep(2000); // 模拟处理时间
future.complete("处理完成"); // 唤醒线程A
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
3. 超时控制
try {
String result = future.get(5, TimeUnit.SECONDS); // 最多等5秒
System.out.println("结果: " + result);
} catch (TimeoutException e) {
System.out.println("超时了!");
} catch (Exception e) {
System.out.println("出错了: " + e.getMessage());
}
CompletableFuture高级用法
// 1. 链式处理
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "原始数据")
.thenApply(data -> data.toUpperCase())
.thenApply(data -> "处理后: " + data);
// 2. 异常处理
future.exceptionally(throwable -> {
System.out.println("出错了: " + throwable.getMessage());
return "默认值";
});
// 3. 组合多个Future
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "数据1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "数据2");
CompletableFuture<String> combined = future1.thenCombine(future2,
(data1, data2) -> data1 + " + " + data2);
🏗️ IOT框架启动流程
1. Spring Boot启动
@SpringBootApplication
public class Application {
@PostConstruct
public void initIOT() {
// Spring Boot启动后,初始化IOT框架
Iot.get().start();
}
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
2. IOT框架启动
public class Iot {
public void start() {
// 创建并启动IOT引导程序
IotBootstrap bootstrap = new IotBootstrap();
bootstrap.start();
}
}
3. 设备配置加载
public class IotBootstrap {
public void start() {
// 1. 从数据库加载设备配置
List<Device> devices = loadDevicesFromDatabase();
// 2. 为每个设备建立连接
for (Device device : devices) {
connectToDevice(device);
}
// 3. 依赖注入
inject();
}
private List<Device> loadDevicesFromDatabase() {
// 从hcms_device表加载设备配置
// 包括:id, name, protocol, host, port等
return deviceRepository.findAll();
}
}
4. 依赖注入配置
private void inject() {
try {
// 1. 创建Guice注入器
this.injector = Guice.createInjector(iotModule);
// 2. 绑定成员变量
iotModule.bindMembers(this.injector);
// 3. 获取绑定信息
this.bindings = this.injector.getBindings();
// 4. 绑定业务逻辑
bindBusinessLogic();
} catch (Exception e) {
log.error("IOT框架依赖注入失败", e);
this.bindings = null; // 标记失败
}
}
🔌 设备连接与管理
设备连接过程
1. 创建Netty客户端
private void connectToDevice(Device device) {
// 1. 创建Bootstrap
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class);
// 2. 配置连接处理器
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
setupPipeline(ch, device);
}
});
// 3. 连接设备
ChannelFuture future = bootstrap.connect(device.getHost(), device.getPort());
Channel channel = future.sync().channel();
// 4. 绑定Channel到设备
device.setChannel(channel);
}
2. 协议插件选择
private void setupPipeline(SocketChannel ch, Device device) {
// 1. 根据协议类型选择插件
String protocolType = device.getProtocolType(); // "tcp", "modbus", etc.
ProtocolPlugin plugin = getProtocolPlugin(protocolType);
// 2. 插件配置Pipeline
DeviceHandler handler = plugin.initChannel(ch);
// 3. 绑定Handler到设备
device.setHandler(handler);
}
3. TCP协议插件配置
public class HIKV_ID6_Plugin implements ProtocolPlugin {
@Override
public DeviceHandler initChannel(Channel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 1. 帧解码器(处理TCP粘包)
DelimiterBasedFrameDecoder frameDecoder = new DelimiterBasedFrameDecoder(
1024, Delimiters.lineDelimiter() // 以\r\n分割
);
pipeline.addLast("frameDecoder", frameDecoder);
// 2. 字符串解码器
pipeline.addLast("stringDecoder", new StringDecoder(StandardCharsets.UTF_8));
// 3. 字符串编码器
pipeline.addLast("stringEncoder", new StringEncoder(StandardCharsets.UTF_8));
// 4. 业务处理器
HIKV_ID6_Handler handler = new HIKV_ID6_Handler();
handler.bindChannel(ch);
pipeline.addLast("handler", handler);
return handler;
}
}
设备管理
Device对象结构
public class Device {
private String id; // 设备ID,如"tcp-scanner-001"
private String name; // 设备名称
private String protocolType; // 协议类型:tcp, modbus等
private String host; // IP地址
private int port; // 端口号
private Channel channel; // Netty连接通道
private DeviceHandler handler; // 协议处理器
// 执行命令的统一接口
public String execute(String command) {
return handler.execute(command);
}
}
设备注册到Guice
// 在IOT模块中注册设备
public class IotModule extends AbstractModule {
@Override
protected void configure() {
// 为每个设备创建绑定
for (Device device : devices) {
bind(Device.class)
.annotatedWith(Names.named(device.getId()))
.toInstance(device);
}
}
}
业务服务注入设备
@DeviceControl
public class BcssTcpService {
@Inject
@Named("tcp-scanner-001") // 注入指定ID的设备
private Device scannerDevice;
public String performScan() {
// 直接调用设备的execute方法
return scannerDevice.execute("scan");
}
}
📡 TCP通信完整流程
用户调用流程
// 用户调用
String result = scannerDevice.execute("scan");
详细执行步骤
1. Device.execute()
public String execute(String command) {
// 委托给Handler处理
return this.handler.execute(command);
}
2. Handler.execute()
public String execute(String cmd) {
// 委托给TcpClient处理
return super.send(cmd);
}
3. TcpClient.send()
protected String send(String cmd) {
// 1. 生成唯一命令ID
int cmdId = cmdIdGenerator.getAndIncrement(); // 1001
String cmdKey = String.valueOf(cmdId);
// 2. 使用虚拟线程异步转同步
return vthreadPool.submit(() -> {
try {
return sendAsync(cmdKey, cmd).get(readTimeout, TimeUnit.SECONDS);
} catch (Exception e) {
pendingTasks.remove(cmdKey); // 清理失败的任务
throw new RuntimeException(e);
}
}).get();
}
4. TcpClient.sendAsync()
private CompletableFuture<String> sendAsync(String cmdKey, String cmd) {
// 1. 创建Future
CompletableFuture<String> future = new CompletableFuture<>();
// 2. 检查连接
Channel ch = getChannel();
if (ch == null) {
future.completeExceptionally(new Exception("TCP连接未建立"));
return future;
}
// 3. 转换命令格式
String tcpCommand = mapCommandToTcp(cmd); // "scan" → "SCAN\r\n"
// 4. 创建数据包
ByteBuf buf = Unpooled.buffer();
buf.writeBytes(tcpCommand.getBytes(StandardCharsets.UTF_8));
// 5. 注册任务(关键步骤!)
pendingTasks.put(cmdKey, future);
// 6. 发送数据
ch.pipeline().writeAndFlush(buf);
log.debug("发送TCP命令: cmdId={}, 命令={}", cmdKey, tcpCommand.trim());
return future;
}
响应处理流程
1. 网络数据到达
// 扫码枪发送: "BARCODE_123456\r\n"
// Netty接收字节数据: [66, 65, 82, 67, 79, 68, 69, ...]
2. Pipeline处理
// 数据流经Pipeline各个处理器:
原始字节 → DelimiterBasedFrameDecoder → 按\r\n分割
分割数据 → StringDecoder → 转换为String "BARCODE_123456"
String → HIKV_ID6_Handler.channelRead() → 业务处理
3. Handler处理响应
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
String response = (String) msg; // "BARCODE_123456"
// 1. 找到最早的待处理任务
String oldestTaskKey = findOldestPendingTaskKey(); // "1001"
if (oldestTaskKey != null) {
// 2. 完成对应的Future
boolean completed = completeTask(oldestTaskKey, response.trim());
if (completed) {
log.info("TCP响应处理完成: cmdId={}, 响应={}",
oldestTaskKey, response.trim());
}
} else {
log.warn("没有待处理的任务,忽略响应: {}", response);
}
} catch (Exception e) {
log.error("处理TCP响应时发生错误", e);
}
}
4. 完成Future
protected boolean completeTask(String cmdKey, String response) {
// 1. 从待处理任务中移除
CompletableFuture<String> future = pendingTasks.remove(cmdKey);
if (future != null) {
// 2. 完成Future,唤醒等待的线程
future.complete(response);
return true;
}
return false;
}
任务匹配机制
FIFO顺序处理
private String findOldestPendingTaskKey() {
int currentCmdId = getCurrentTcpCmdId(); // 当前最新的cmdId
// 从最新的cmdId往前查找,找到最早的待处理任务
for (int i = 0; i < 1000; i++) {
String possibleKey = String.valueOf(currentCmdId - i);
if (getPendingTask(possibleKey) != null) {
return possibleKey; // 找到最早的任务
}
}
return null; // 没有待处理任务
}
🧵 线程模型详解
虚拟线程(Java 21)
什么是虚拟线程?
// 传统线程:
Thread thread = new Thread(() -> {
// 每个线程占用1-2MB内存
// 创建成本高,数量有限
});
// 虚拟线程:
Thread vthread = Thread.ofVirtual().start(() -> {
// 每个线程只占用几KB内存
// 创建成本极低,可以创建百万级
});
虚拟线程的优势
// 1. 轻量级
ExecutorService vthreadPool = Executors.newVirtualThreadPerTaskExecutor();
// 2. 自动调度
// 当虚拟线程阻塞时(如调用future.get()),
// 底层的平台线程会被释放去处理其他虚拟线程
// 3. 简化异步编程
String result = vthreadPool.submit(() -> {
return someAsyncOperation().get(); // 看起来像同步,实际是异步
}).get();
线程协作模型
多线程协作流程
// 线程1: 用户线程(Spring MVC线程)
@GetMapping("/scan")
public Result<String> scan() {
String result = tcpService.performScan(); // 调用业务方法
return Result.OK(result);
}
// 线程2: 虚拟线程(IOT框架创建)
String performScan() {
return vthreadPool.submit(() -> {
return sendAsync(cmdKey, cmd).get(timeout, TimeUnit.SECONDS);
}).get(); // 在虚拟线程中等待
}
// 线程3: Netty IO线程(EventLoop线程)
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 处理网络响应
future.complete(response); // 唤醒虚拟线程
}
线程安全保证
// 1. ConcurrentHashMap保证线程安全
private final ConcurrentMap<String, CompletableFuture<String>> pendingTasks
= new ConcurrentHashMap<>();
// 2. AtomicInteger保证ID唯一性
private static final AtomicInteger cmdIdGenerator = new AtomicInteger(1000);
// 3. CompletableFuture内部线程安全
CompletableFuture<String> future = new CompletableFuture<>();
// 多个线程可以安全地调用complete()和get()
🎯 实战案例分析
案例:扫码枪扫码流程
1. 用户发起HTTP请求
GET /hcms/bcss/tcp/scan
2. Controller处理
@GetMapping("/scan")
public Result<String> performScan() {
try {
// 获取TCP服务实例
BcssTcpService tcpService = Iot.get().getDeviceControl(BcssTcpService.class);
// 执行扫码
String result = tcpService.performScan();
return Result.OK(result);
} catch (Exception e) {
return Result.error("扫码失败: " + e.getMessage());
}
}
3. 业务服务处理
@DeviceControl
public class BcssTcpService {
@Inject
@Named("tcp-scanner-001")
private Device scannerDevice;
public String performScan() {
// 调用设备执行扫码命令
return scannerDevice.execute("scan");
}
}
4. 设备执行命令
// Device.execute("scan")
public String execute(String command) {
return handler.execute(command); // 委托给Handler
}
// HIKV_ID6_Handler.execute("scan")
public String execute(String cmd) {
return super.send(cmd); // 委托给TcpClient
}
5. TCP客户端发送
// TcpClient.send("scan")
protected String send(String cmd) {
int cmdId = 1001; // 生成ID
String cmdKey = "1001";
// 虚拟线程异步转同步
return vthreadPool.submit(() -> {
return sendAsync(cmdKey, cmd).get(10, TimeUnit.SECONDS);
}).get();
}
6. 异步发送
// TcpClient.sendAsync("1001", "scan")
private CompletableFuture<String> sendAsync(String cmdKey, String cmd) {
CompletableFuture<String> future = new CompletableFuture<>();
// 存储任务
pendingTasks.put("1001", future);
// 发送 "SCAN\r\n" 到扫码枪
channel.writeAndFlush(createBuffer("SCAN\r\n"));
return future; // 返回Future,等待响应
}
7. 等待响应
// 虚拟线程在这里阻塞等待
future.get(10, TimeUnit.SECONDS); // 最多等10秒
8. 扫码枪响应
扫码枪发送: "BARCODE_123456\r\n"
9. Netty接收处理
// HIKV_ID6_Handler.channelRead()
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String response = "BARCODE_123456"; // Pipeline已解码
// 找到最早的任务
String oldestTaskKey = "1001";
// 完成Future
CompletableFuture<String> future = pendingTasks.remove("1001");
future.complete("BARCODE_123456"); // 唤醒等待的虚拟线程
}
10. 返回结果
// 虚拟线程被唤醒,future.get()返回结果
String result = "BARCODE_123456";
// 层层返回到用户
return Result.OK("BARCODE_123456");
时序图总结
用户 → Controller → Service → Device → Handler → TcpClient
↓
生成cmdId=1001
创建Future
存储到pendingTasks
发送"SCAN\r\n"
future.get()等待...
↓
扫码枪响应"BARCODE_123456\r\n" → Netty → Pipeline → Handler
↓
找到cmdId=1001
取出Future
future.complete()
↓
TcpClient ← Handler ← Device ← Service ← Controller ← 用户
↑ ↓
future.get()返回结果 返回JSON响应
🎓 学习建议
1. 基础知识
- Java并发编程(Future、CompletableFuture)
- Netty网络编程基础
- 依赖注入原理(Guice)
2. 实践练习
- 创建简单的Netty客户端/服务器
- 使用CompletableFuture实现异步转同步
- 理解Pipeline的数据流转
3. 深入理解
- 虚拟线程的调度机制
- TCP协议和粘包处理
- 异步编程模式
4. 调试技巧
- 使用日志跟踪数据流
- 理解线程模型和切换
- 监控Future的状态变化
📝 总结
IOT框架通过以下核心技术实现了高效的设备通信:
- Netty - 提供高性能网络通信能力
- Future - 实现异步转同步的编程模型
- 虚拟线程 - 支持大量并发连接
- 依赖注入 - 实现松耦合的组件管理
- 协议插件 - 支持多种通信协议
这个设计既保证了性能,又提供了简洁的API,是现代异步编程的优秀实践。

浙公网安备 33010602011771号