多进程运行代码模板

import os
import json
import random
import glob
import multiprocessing as mp
from collections import defaultdict
from tqdm import tqdm

# 配置变量:修改这些值来适应不同任务
MAX_BARS = 3  # 可见进度条的最大数量

def safe_print(msg: str):
    tqdm.write(msg)  # 与tqdm兼容的打印

def process_item(item, proc_idx, state):
    """
    在这里实现每个任务的工作逻辑。
    `item` 是任务项。
    `state` 可以保存每个进程的缓存、计数器等。
    """
    # ... 执行实际工作 ...
    state["count"] += 1
    return f"done-{item}"

def worker(proc_idx, tasks, counters):
    touched_dirs = set()

    show_bar = proc_idx < MAX_BARS
    iterator = tqdm(
        tasks,
        desc=f"进程 {proc_idx}",
        unit="task",
        position=proc_idx if show_bar else 0, # 仅前 N 个占用独立行
        disable=not show_bar, # 其他进程隐藏进度条
        leave=True, # 进度条在完成后会保留在终端中,而不是被清除掉
        ncols=100, # 设置进度条的总宽度为 100 个字符,确保在终端中显示得更整齐
    )

    for task_id, item in iterator:
        process_item(item, proc_idx, counters)

    return list(touched_dirs)

def distribute_tasks(items, num_procs):
    """
    示例:随机抽取与轮询分配相结合的任务分配。
    根据需要替换为其他分片方式(轮询、块分割、工作窃取等)。
    """
    total_cnt = ...
    buckets = [[] for _ in range(num_procs)]
    for i in range(total_cnt):
        item = random.choices(items, k=1)[0]
        proc = i % num_procs
        buckets[proc].append((i, item))
    return buckets

def merge_outputs(dir_list):
    """
    后处理合并每个进程的分片;根据输出布局调整。
    """
    pass

def load_counters_all_proc():
    counters_all_proc = defaultdict(lambda: defaultdict(int))

    pass

    return counters_all_proc

def main(root_dir, num_procs):
    # 发现工作项
    items = [...]  # 填充你的项列表
    if not items:
        safe_print("未找到项。")
        return

    num_procs = max(1, num_procs)
    tasks_per_proc = distribute_tasks(items, num_procs)
    counters_all_proc = load_counters_all_proc()

    with mp.Pool(processes=num_procs) as pool:
        results = pool.starmap(
            worker,
            [
                (
                    idx,
                    tasks,
                    counters_all_proc[idx],
                )
                for idx, tasks in enumerate(tasks_per_proc)
            ],
        )

    all_dirs = [d for sub in results for d in sub]
    merge_outputs(all_dirs)

if __name__ == "__main__":
    import sys
    if len(sys.argv) != 3:
        safe_print("Usage: python script.py <directory> <num_processes>")
        sys.exit(1)
    root_dir = sys.argv[1]
    num_processes = int(sys.argv[2])
    main(root_dir, num_processes)

注意事项

  • 多进程代码比较重要的事情:
    • 有恢复功能:将load_counter_all_proc函数实现
      • 注意不要依赖状态文件(也就是不要存储状态文件),因为状态文件可能会很大,读写状态文件非常慢,如果此时程序断了,就会导致错误
      • 恢复应该在主进程中直接存储到一个变量里面(就是上面的counters_all_proc),而不是存储到磁盘里面
    • 每个进程之间产生的文件不冲突:上面的代码让每个进程并行运行,产生的文件都附带了进程号(比如file_select_counts_p{proc_idx}.json),最后可以通过一个函数合并
    • 子进程在生成文件的时候,不要生成那种越来越大的文件,不然到后面会导致读写此文件非常慢,如果读写过程中程序断了就会出错
      • 应该另写一个脚本来一次性生成这种大文件
  • with mp.Pool(processes=num_procs) as pool:同时启动了num_procsworker函数
    • 每个worker函数处理一个(idx, tasks)元组
    • 应该保证tasks_per_proc的长度与num_procs相同
    • results是一个列表,长度为num_procs,每个元素是每个worker的返回结果
posted @ 2025-12-08 16:53  最爱丁珰  阅读(6)  评论(0)    收藏  举报