import threading
import time
import serial # 导入pyserial,用于串口初始化/读取
# 全局变量:存储IO状态(供其他模块调用)
global_io_state = {"serial_data": None, "gpio_status": 0}
# 全局停止标志:控制后台任务启停
task_stop_flag = False
def IO_init():
"""实际的IO/串口初始化逻辑(适配STM32/硬件串口)"""
global serial_port
try:
# 初始化串口(示例:COM3/38400波特率/8N1)
serial_port = serial.Serial(
port="COM3", # 替换为你的串口名(Linux/Mac为/dev/ttyUSB0)
baudrate=38400,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=0.001 # 超时节流,适配5ms周期
)
print(f"串口初始化成功:{serial_port.port}")
# 配置全局IO变量(示例:初始GPIO状态为0)
global_io_state["gpio_status"] = 0
return 0
except Exception as e:
print(f"串口初始化失败:{str(e)}")
return -1
def Update_IO_status():
"""200Hz(5ms周期)IO状态刷新函数(带实际串口读取逻辑)"""
global global_io_state
try:
# 1. 高精度计时(保留毫秒)
current_time = time.strftime("%H:%M:%S.%f")[:-3]
# 2. 读取串口IO状态(非阻塞读取,适配5ms周期)
if serial_port.in_waiting > 0:
# 读取1字节串口数据(示例)
serial_data = serial_port.read(1)
global_io_state["serial_data"] = serial_data.hex() # 转为16进制存储
else:
global_io_state["serial_data"] = "00" # 无数据时默认00
# 3. 模拟读取GPIO状态(可替换为实际硬件读取逻辑)
global_io_state["gpio_status"] = 1 - global_io_state["gpio_status"] # 翻转状态
# 4. 打印刷新日志(可选,高频场景建议注释以减少耗时)
print(f"后台IO状态刷新:{current_time} | 串口数据:{global_io_state['serial_data']} | GPIO:{global_io_state['gpio_status']}")
except Exception as e:
print(f"IO状态刷新异常:{str(e)}")
def start_task_ms(task_func, interval_ms=5):
"""
启动毫秒级定时任务(带耗时补偿+停止机制)
:param task_func: 要执行的任务函数
:param interval_ms: 执行周期(毫秒),默认5ms(200Hz)
"""
def timer_loop():
while not task_stop_flag:
# 记录任务开始时间(高精度计时)
start_time = time.perf_counter()
# 执行任务
task_func()
# 计算任务执行耗时,动态补偿休眠时间(保证精准周期)
exec_duration = (time.perf_counter() - start_time) * 1000 # 转为毫秒
sleep_ms = max(0, interval_ms - exec_duration) # 避免休眠时间为负
# 精准休眠(毫秒转秒)
time.sleep(sleep_ms / 1000)
# 创建后台守护线程
timer_thread = threading.Thread(target=timer_loop, daemon=True)
timer_thread.start()
print(f"后台任务启动成功,周期:{interval_ms}ms({1000/interval_ms}Hz)")
def stop_task():
"""安全停止后台任务"""
global task_stop_flag
task_stop_flag = True
# 关闭串口(释放硬件资源)
if 'serial_port' in globals() and serial_port.is_open:
serial_port.close()
print("串口已关闭,后台任务停止")
# ------------------- 测试运行 -------------------
if __name__ == "__main__":
# 第一步:初始化IO/串口(必须先执行)
init_result = IO_init()
if init_result != 0:
print("IO初始化失败,程序退出")
exit(1)
# 第二步:启动200Hz IO刷新任务(5ms周期)
start_task_ms(Update_IO_status, interval_ms=5) # 200Hz=5ms,修正原100ms错误
# 主程序持续运行(按 Ctrl+C 安全退出)
try:
print("主程序启动成功,按 Ctrl+C 停止...")
while True:
time.sleep(1) # 主程序业务逻辑(可替换为实际功能)
except KeyboardInterrupt:
stop_task()
print("\n主程序正常退出")