smart_IO

import threading
import time
import serial  # 导入pyserial,用于串口初始化/读取

# 全局变量:存储IO状态(供其他模块调用)
global_io_state = {"serial_data": None, "gpio_status": 0}
# 全局停止标志:控制后台任务启停
task_stop_flag = False

def IO_init():
    """实际的IO/串口初始化逻辑(适配STM32/硬件串口)"""
    global serial_port
    try:
        # 初始化串口(示例:COM3/38400波特率/8N1)
        serial_port = serial.Serial(
            port="COM3",  # 替换为你的串口名(Linux/Mac为/dev/ttyUSB0)
            baudrate=38400,
            bytesize=serial.EIGHTBITS,
            parity=serial.PARITY_NONE,
            stopbits=serial.STOPBITS_ONE,
            timeout=0.001  # 超时节流,适配5ms周期
        )
        print(f"串口初始化成功:{serial_port.port}")
        # 配置全局IO变量(示例:初始GPIO状态为0)
        global_io_state["gpio_status"] = 0
        return 0
    except Exception as e:
        print(f"串口初始化失败:{str(e)}")
        return -1

def Update_IO_status():
    """200Hz(5ms周期)IO状态刷新函数(带实际串口读取逻辑)"""
    global global_io_state
    try:
        # 1. 高精度计时(保留毫秒)
        current_time = time.strftime("%H:%M:%S.%f")[:-3]
        
        # 2. 读取串口IO状态(非阻塞读取,适配5ms周期)
        if serial_port.in_waiting > 0:
            # 读取1字节串口数据(示例)
            serial_data = serial_port.read(1)
            global_io_state["serial_data"] = serial_data.hex()  # 转为16进制存储
        else:
            global_io_state["serial_data"] = "00"  # 无数据时默认00
        
        # 3. 模拟读取GPIO状态(可替换为实际硬件读取逻辑)
        global_io_state["gpio_status"] = 1 - global_io_state["gpio_status"]  # 翻转状态
        
        # 4. 打印刷新日志(可选,高频场景建议注释以减少耗时)
        print(f"后台IO状态刷新:{current_time} | 串口数据:{global_io_state['serial_data']} | GPIO:{global_io_state['gpio_status']}")
    
    except Exception as e:
        print(f"IO状态刷新异常:{str(e)}")

def start_task_ms(task_func, interval_ms=5):
    """
    启动毫秒级定时任务(带耗时补偿+停止机制)
    :param task_func: 要执行的任务函数
    :param interval_ms: 执行周期(毫秒),默认5ms(200Hz)
    """
    def timer_loop():
        while not task_stop_flag:
            # 记录任务开始时间(高精度计时)
            start_time = time.perf_counter()
            
            # 执行任务
            task_func()
            
            # 计算任务执行耗时,动态补偿休眠时间(保证精准周期)
            exec_duration = (time.perf_counter() - start_time) * 1000  # 转为毫秒
            sleep_ms = max(0, interval_ms - exec_duration)  # 避免休眠时间为负
            
            # 精准休眠(毫秒转秒)
            time.sleep(sleep_ms / 1000)
    
    # 创建后台守护线程
    timer_thread = threading.Thread(target=timer_loop, daemon=True)
    timer_thread.start()
    print(f"后台任务启动成功,周期:{interval_ms}ms({1000/interval_ms}Hz)")

def stop_task():
    """安全停止后台任务"""
    global task_stop_flag
    task_stop_flag = True
    # 关闭串口(释放硬件资源)
    if 'serial_port' in globals() and serial_port.is_open:
        serial_port.close()
        print("串口已关闭,后台任务停止")




# ------------------- 测试运行 -------------------
if __name__ == "__main__":
    # 第一步:初始化IO/串口(必须先执行)
    init_result = IO_init()
    if init_result != 0:
        print("IO初始化失败,程序退出")
        exit(1)
    
    # 第二步:启动200Hz IO刷新任务(5ms周期)
    start_task_ms(Update_IO_status, interval_ms=5)  # 200Hz=5ms,修正原100ms错误
    
    # 主程序持续运行(按 Ctrl+C 安全退出)
    try:
        print("主程序启动成功,按 Ctrl+C 停止...")
        while True:
            time.sleep(1)  # 主程序业务逻辑(可替换为实际功能)
    except KeyboardInterrupt:
        stop_task()
        print("\n主程序正常退出")

 

posted @ 2025-12-04 15:45  为鲸  阅读(6)  评论(0)    收藏  举报