Netty+Guice-个人开发框架之-IOT框架

📚 目录

  1. 核心概念
  2. Netty基础知识
  3. 异步编程与Future
  4. IOT框架启动流程
  5. 设备连接与管理
  6. TCP通信完整流程
  7. 线程模型详解
  8. 实战案例分析

🎯 核心概念

什么是IOT框架?

IOT框架是一个用于管理和控制各种工业设备的通信框架,支持多种协议(TCP、Modbus、PLC等)。

核心组件

  • Device: 设备抽象,代表一个物理设备
  • Handler: 协议处理器,处理具体的通信逻辑
  • Channel: 网络连接通道,基于Netty
  • Future: 异步结果容器,实现异步转同步

🌐 Netty基础知识

Netty是什么?

Netty是一个高性能的网络通信框架,简化了网络编程的复杂性。

核心概念

1. Channel(通道)

// Channel就像一条电话线
Channel channel = bootstrap.connect("192.168.1.100", 8080).sync().channel();

// 可以发送数据
channel.writeAndFlush("Hello World");

// 可以接收数据(通过Handler)

2. Pipeline(管道)

// Pipeline是数据处理流水线
ChannelPipeline pipeline = channel.pipeline();

// 添加处理器(按顺序执行)
pipeline.addLast("decoder", new StringDecoder());     // 字节→字符串
pipeline.addLast("encoder", new StringEncoder());     // 字符串→字节
pipeline.addLast("handler", new MyBusinessHandler()); // 业务处理

3. Handler(处理器)

public class MyHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 当数据到达时,Netty自动调用这个方法
        String data = (String) msg;
        System.out.println("收到数据: " + data);
    }
}

4. EventLoop(事件循环)

// EventLoop是Netty的线程模型核心
// 一个EventLoop处理多个Channel的IO事件
// 所有IO操作都在EventLoop线程中执行

TCP连接建立过程

// 1. 创建Bootstrap(客户端启动器)
Bootstrap bootstrap = new Bootstrap();

// 2. 配置EventLoopGroup(线程组)
EventLoopGroup group = new NioEventLoopGroup();
bootstrap.group(group);

// 3. 设置Channel类型
bootstrap.channel(NioSocketChannel.class);

// 4. 配置Pipeline
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new StringDecoder());
        pipeline.addLast(new StringEncoder());
        pipeline.addLast(new MyHandler());
    }
});

// 5. 连接服务器
ChannelFuture future = bootstrap.connect("192.168.1.100", 8080);
Channel channel = future.sync().channel();

🔮 异步编程与Future

为什么需要异步编程?

问题:网络通信的延迟

// 用户期望:
String result = device.execute("scan");  // 立即得到结果
System.out.println(result);

// 现实:
// 1. 发送请求到网络 (10ms)
// 2. 网络传输 (50ms)
// 3. 设备处理 (100ms)
// 4. 响应传输 (50ms)
// 总计:210ms延迟

Future的作用

1. 异步结果容器

// Future就像一个"未来的盒子"
CompletableFuture<String> future = new CompletableFuture<>();

// 现在盒子是空的
System.out.println("Future创建了,但还没有结果");

// 将来某个时候,放入结果
future.complete("扫码结果:BARCODE_123456");

// 等待结果
String result = future.get(); // 阻塞等待,直到有结果

2. 线程间通信

CompletableFuture<String> future = new CompletableFuture<>();

// 线程A:等待结果
new Thread(() -> {
    try {
        System.out.println("开始等待...");
        String result = future.get(); // 阻塞等待
        System.out.println("收到结果: " + result);
    } catch (Exception e) {
        e.printStackTrace();
    }
}).start();

// 线程B:设置结果
new Thread(() -> {
    try {
        Thread.sleep(2000); // 模拟处理时间
        future.complete("处理完成"); // 唤醒线程A
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}).start();

3. 超时控制

try {
    String result = future.get(5, TimeUnit.SECONDS); // 最多等5秒
    System.out.println("结果: " + result);
} catch (TimeoutException e) {
    System.out.println("超时了!");
} catch (Exception e) {
    System.out.println("出错了: " + e.getMessage());
}

CompletableFuture高级用法

// 1. 链式处理
CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> "原始数据")
    .thenApply(data -> data.toUpperCase())
    .thenApply(data -> "处理后: " + data);

// 2. 异常处理
future.exceptionally(throwable -> {
    System.out.println("出错了: " + throwable.getMessage());
    return "默认值";
});

// 3. 组合多个Future
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "数据1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "数据2");

CompletableFuture<String> combined = future1.thenCombine(future2, 
    (data1, data2) -> data1 + " + " + data2);

🏗️ IOT框架启动流程

1. Spring Boot启动

@SpringBootApplication
public class Application {

    @PostConstruct
    public void initIOT() {
        // Spring Boot启动后,初始化IOT框架
        Iot.get().start();
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

2. IOT框架启动

public class Iot {
    public void start() {
        // 创建并启动IOT引导程序
        IotBootstrap bootstrap = new IotBootstrap();
        bootstrap.start();
    }
}

3. 设备配置加载

public class IotBootstrap {
    public void start() {
        // 1. 从数据库加载设备配置
        List<Device> devices = loadDevicesFromDatabase();

        // 2. 为每个设备建立连接
        for (Device device : devices) {
            connectToDevice(device);
        }

        // 3. 依赖注入
        inject();
    }

    private List<Device> loadDevicesFromDatabase() {
        // 从hcms_device表加载设备配置
        // 包括:id, name, protocol, host, port等
        return deviceRepository.findAll();
    }
}

4. 依赖注入配置

private void inject() {
    try {
        // 1. 创建Guice注入器
        this.injector = Guice.createInjector(iotModule);

        // 2. 绑定成员变量
        iotModule.bindMembers(this.injector);

        // 3. 获取绑定信息
        this.bindings = this.injector.getBindings();

        // 4. 绑定业务逻辑
        bindBusinessLogic();

    } catch (Exception e) {
        log.error("IOT框架依赖注入失败", e);
        this.bindings = null; // 标记失败
    }
}

🔌 设备连接与管理

设备连接过程

1. 创建Netty客户端

private void connectToDevice(Device device) {
    // 1. 创建Bootstrap
    Bootstrap bootstrap = new Bootstrap();
    bootstrap.group(eventLoopGroup)
             .channel(NioSocketChannel.class);

    // 2. 配置连接处理器
    bootstrap.handler(new ChannelInitializer<SocketChannel>() {
        @Override
        protected void initChannel(SocketChannel ch) {
            setupPipeline(ch, device);
        }
    });

    // 3. 连接设备
    ChannelFuture future = bootstrap.connect(device.getHost(), device.getPort());
    Channel channel = future.sync().channel();

    // 4. 绑定Channel到设备
    device.setChannel(channel);
}

2. 协议插件选择

private void setupPipeline(SocketChannel ch, Device device) {
    // 1. 根据协议类型选择插件
    String protocolType = device.getProtocolType(); // "tcp", "modbus", etc.
    ProtocolPlugin plugin = getProtocolPlugin(protocolType);

    // 2. 插件配置Pipeline
    DeviceHandler handler = plugin.initChannel(ch);

    // 3. 绑定Handler到设备
    device.setHandler(handler);
}

3. TCP协议插件配置

public class HIKV_ID6_Plugin implements ProtocolPlugin {
    @Override
    public DeviceHandler initChannel(Channel ch) {
        ChannelPipeline pipeline = ch.pipeline();

        // 1. 帧解码器(处理TCP粘包)
        DelimiterBasedFrameDecoder frameDecoder = new DelimiterBasedFrameDecoder(
            1024, Delimiters.lineDelimiter() // 以\r\n分割
        );
        pipeline.addLast("frameDecoder", frameDecoder);

        // 2. 字符串解码器
        pipeline.addLast("stringDecoder", new StringDecoder(StandardCharsets.UTF_8));

        // 3. 字符串编码器
        pipeline.addLast("stringEncoder", new StringEncoder(StandardCharsets.UTF_8));

        // 4. 业务处理器
        HIKV_ID6_Handler handler = new HIKV_ID6_Handler();
        handler.bindChannel(ch);
        pipeline.addLast("handler", handler);

        return handler;
    }
}

设备管理

Device对象结构

public class Device {
    private String id;              // 设备ID,如"tcp-scanner-001"
    private String name;            // 设备名称
    private String protocolType;    // 协议类型:tcp, modbus等
    private String host;            // IP地址
    private int port;               // 端口号
    private Channel channel;        // Netty连接通道
    private DeviceHandler handler;  // 协议处理器

    // 执行命令的统一接口
    public String execute(String command) {
        return handler.execute(command);
    }
}

设备注册到Guice

// 在IOT模块中注册设备
public class IotModule extends AbstractModule {
    @Override
    protected void configure() {
        // 为每个设备创建绑定
        for (Device device : devices) {
            bind(Device.class)
                .annotatedWith(Names.named(device.getId()))
                .toInstance(device);
        }
    }
}

业务服务注入设备

@DeviceControl
public class BcssTcpService {

    @Inject
    @Named("tcp-scanner-001")  // 注入指定ID的设备
    private Device scannerDevice;

    public String performScan() {
        // 直接调用设备的execute方法
        return scannerDevice.execute("scan");
    }
}

📡 TCP通信完整流程

用户调用流程

// 用户调用
String result = scannerDevice.execute("scan");

详细执行步骤

1. Device.execute()

public String execute(String command) {
    // 委托给Handler处理
    return this.handler.execute(command);
}

2. Handler.execute()

public String execute(String cmd) {
    // 委托给TcpClient处理
    return super.send(cmd);
}

3. TcpClient.send()

protected String send(String cmd) {
    // 1. 生成唯一命令ID
    int cmdId = cmdIdGenerator.getAndIncrement(); // 1001
    String cmdKey = String.valueOf(cmdId);

    // 2. 使用虚拟线程异步转同步
    return vthreadPool.submit(() -> {
        try {
            return sendAsync(cmdKey, cmd).get(readTimeout, TimeUnit.SECONDS);
        } catch (Exception e) {
            pendingTasks.remove(cmdKey); // 清理失败的任务
            throw new RuntimeException(e);
        }
    }).get();
}

4. TcpClient.sendAsync()

private CompletableFuture<String> sendAsync(String cmdKey, String cmd) {
    // 1. 创建Future
    CompletableFuture<String> future = new CompletableFuture<>();

    // 2. 检查连接
    Channel ch = getChannel();
    if (ch == null) {
        future.completeExceptionally(new Exception("TCP连接未建立"));
        return future;
    }

    // 3. 转换命令格式
    String tcpCommand = mapCommandToTcp(cmd); // "scan" → "SCAN\r\n"

    // 4. 创建数据包
    ByteBuf buf = Unpooled.buffer();
    buf.writeBytes(tcpCommand.getBytes(StandardCharsets.UTF_8));

    // 5. 注册任务(关键步骤!)
    pendingTasks.put(cmdKey, future);

    // 6. 发送数据
    ch.pipeline().writeAndFlush(buf);

    log.debug("发送TCP命令: cmdId={}, 命令={}", cmdKey, tcpCommand.trim());

    return future;
}

响应处理流程

1. 网络数据到达

// 扫码枪发送: "BARCODE_123456\r\n"
// Netty接收字节数据: [66, 65, 82, 67, 79, 68, 69, ...]

2. Pipeline处理

// 数据流经Pipeline各个处理器:
原始字节 → DelimiterBasedFrameDecoder → 按\r\n分割
分割数据 → StringDecoder → 转换为String "BARCODE_123456"
String → HIKV_ID6_Handler.channelRead() → 业务处理

3. Handler处理响应

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    try {
        String response = (String) msg; // "BARCODE_123456"

        // 1. 找到最早的待处理任务
        String oldestTaskKey = findOldestPendingTaskKey(); // "1001"

        if (oldestTaskKey != null) {
            // 2. 完成对应的Future
            boolean completed = completeTask(oldestTaskKey, response.trim());

            if (completed) {
                log.info("TCP响应处理完成: cmdId={}, 响应={}", 
                        oldestTaskKey, response.trim());
            }
        } else {
            log.warn("没有待处理的任务,忽略响应: {}", response);
        }
    } catch (Exception e) {
        log.error("处理TCP响应时发生错误", e);
    }
}

4. 完成Future

protected boolean completeTask(String cmdKey, String response) {
    // 1. 从待处理任务中移除
    CompletableFuture<String> future = pendingTasks.remove(cmdKey);

    if (future != null) {
        // 2. 完成Future,唤醒等待的线程
        future.complete(response);
        return true;
    }

    return false;
}

任务匹配机制

FIFO顺序处理

private String findOldestPendingTaskKey() {
    int currentCmdId = getCurrentTcpCmdId(); // 当前最新的cmdId

    // 从最新的cmdId往前查找,找到最早的待处理任务
    for (int i = 0; i < 1000; i++) {
        String possibleKey = String.valueOf(currentCmdId - i);
        if (getPendingTask(possibleKey) != null) {
            return possibleKey; // 找到最早的任务
        }
    }

    return null; // 没有待处理任务
}

🧵 线程模型详解

虚拟线程(Java 21)

什么是虚拟线程?

// 传统线程:
Thread thread = new Thread(() -> {
    // 每个线程占用1-2MB内存
    // 创建成本高,数量有限
});

// 虚拟线程:
Thread vthread = Thread.ofVirtual().start(() -> {
    // 每个线程只占用几KB内存
    // 创建成本极低,可以创建百万级
});

虚拟线程的优势

// 1. 轻量级
ExecutorService vthreadPool = Executors.newVirtualThreadPerTaskExecutor();

// 2. 自动调度
// 当虚拟线程阻塞时(如调用future.get()),
// 底层的平台线程会被释放去处理其他虚拟线程

// 3. 简化异步编程
String result = vthreadPool.submit(() -> {
    return someAsyncOperation().get(); // 看起来像同步,实际是异步
}).get();

线程协作模型

多线程协作流程

// 线程1: 用户线程(Spring MVC线程)
@GetMapping("/scan")
public Result<String> scan() {
    String result = tcpService.performScan(); // 调用业务方法
    return Result.OK(result);
}

// 线程2: 虚拟线程(IOT框架创建)
String performScan() {
    return vthreadPool.submit(() -> {
        return sendAsync(cmdKey, cmd).get(timeout, TimeUnit.SECONDS);
    }).get(); // 在虚拟线程中等待
}

// 线程3: Netty IO线程(EventLoop线程)
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    // 处理网络响应
    future.complete(response); // 唤醒虚拟线程
}

线程安全保证

// 1. ConcurrentHashMap保证线程安全
private final ConcurrentMap<String, CompletableFuture<String>> pendingTasks 
    = new ConcurrentHashMap<>();

// 2. AtomicInteger保证ID唯一性
private static final AtomicInteger cmdIdGenerator = new AtomicInteger(1000);

// 3. CompletableFuture内部线程安全
CompletableFuture<String> future = new CompletableFuture<>();
// 多个线程可以安全地调用complete()和get()

🎯 实战案例分析

案例:扫码枪扫码流程

1. 用户发起HTTP请求

GET /hcms/bcss/tcp/scan

2. Controller处理

@GetMapping("/scan")
public Result<String> performScan() {
    try {
        // 获取TCP服务实例
        BcssTcpService tcpService = Iot.get().getDeviceControl(BcssTcpService.class);

        // 执行扫码
        String result = tcpService.performScan();

        return Result.OK(result);
    } catch (Exception e) {
        return Result.error("扫码失败: " + e.getMessage());
    }
}

3. 业务服务处理

@DeviceControl
public class BcssTcpService {

    @Inject
    @Named("tcp-scanner-001")
    private Device scannerDevice;

    public String performScan() {
        // 调用设备执行扫码命令
        return scannerDevice.execute("scan");
    }
}

4. 设备执行命令

// Device.execute("scan")
public String execute(String command) {
    return handler.execute(command); // 委托给Handler
}

// HIKV_ID6_Handler.execute("scan")  
public String execute(String cmd) {
    return super.send(cmd); // 委托给TcpClient
}

5. TCP客户端发送

// TcpClient.send("scan")
protected String send(String cmd) {
    int cmdId = 1001; // 生成ID
    String cmdKey = "1001";

    // 虚拟线程异步转同步
    return vthreadPool.submit(() -> {
        return sendAsync(cmdKey, cmd).get(10, TimeUnit.SECONDS);
    }).get();
}

6. 异步发送

// TcpClient.sendAsync("1001", "scan")
private CompletableFuture<String> sendAsync(String cmdKey, String cmd) {
    CompletableFuture<String> future = new CompletableFuture<>();

    // 存储任务
    pendingTasks.put("1001", future);

    // 发送 "SCAN\r\n" 到扫码枪
    channel.writeAndFlush(createBuffer("SCAN\r\n"));

    return future; // 返回Future,等待响应
}

7. 等待响应

// 虚拟线程在这里阻塞等待
future.get(10, TimeUnit.SECONDS); // 最多等10秒

8. 扫码枪响应

扫码枪发送: "BARCODE_123456\r\n"

9. Netty接收处理

// HIKV_ID6_Handler.channelRead()
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    String response = "BARCODE_123456"; // Pipeline已解码

    // 找到最早的任务
    String oldestTaskKey = "1001";

    // 完成Future
    CompletableFuture<String> future = pendingTasks.remove("1001");
    future.complete("BARCODE_123456"); // 唤醒等待的虚拟线程
}

10. 返回结果

// 虚拟线程被唤醒,future.get()返回结果
String result = "BARCODE_123456";

// 层层返回到用户
return Result.OK("BARCODE_123456");

时序图总结

用户 → Controller → Service → Device → Handler → TcpClient
                                                      ↓
                                              生成cmdId=1001
                                              创建Future
                                              存储到pendingTasks
                                              发送"SCAN\r\n"
                                              future.get()等待...
                                                      ↓
扫码枪响应"BARCODE_123456\r\n" → Netty → Pipeline → Handler
                                                      ↓
                                              找到cmdId=1001
                                              取出Future
                                              future.complete()
                                                      ↓
TcpClient ← Handler ← Device ← Service ← Controller ← 用户
    ↑                                                  ↓
future.get()返回结果                            返回JSON响应

🎓 学习建议

1. 基础知识

  • Java并发编程(Future、CompletableFuture)
  • Netty网络编程基础
  • 依赖注入原理(Guice)

2. 实践练习

  • 创建简单的Netty客户端/服务器
  • 使用CompletableFuture实现异步转同步
  • 理解Pipeline的数据流转

3. 深入理解

  • 虚拟线程的调度机制
  • TCP协议和粘包处理
  • 异步编程模式

4. 调试技巧

  • 使用日志跟踪数据流
  • 理解线程模型和切换
  • 监控Future的状态变化

📝 总结

IOT框架通过以下核心技术实现了高效的设备通信:

  1. Netty - 提供高性能网络通信能力
  2. Future - 实现异步转同步的编程模型
  3. 虚拟线程 - 支持大量并发连接
  4. 依赖注入 - 实现松耦合的组件管理
  5. 协议插件 - 支持多种通信协议

这个设计既保证了性能,又提供了简洁的API,是现代异步编程的优秀实践。

posted @ 2026-02-26 13:27  烈酒清茶  阅读(7)  评论(0)    收藏  举报