多进程运行代码模板
import os
import json
import random
import glob
import multiprocessing as mp
from collections import defaultdict
from tqdm import tqdm
# 配置变量:修改这些值来适应不同任务
MAX_BARS = 3 # 可见进度条的最大数量
def safe_print(msg: str):
tqdm.write(msg) # 与tqdm兼容的打印
def process_item(item, proc_idx, state):
"""
在这里实现每个任务的工作逻辑。
`item` 是任务项。
`state` 可以保存每个进程的缓存、计数器等。
"""
# ... 执行实际工作 ...
state["count"] += 1
return f"done-{item}"
def worker(proc_idx, tasks, counters):
touched_dirs = set()
show_bar = proc_idx < MAX_BARS
iterator = tqdm(
tasks,
desc=f"进程 {proc_idx}",
unit="task",
position=proc_idx if show_bar else 0, # 仅前 N 个占用独立行
disable=not show_bar, # 其他进程隐藏进度条
leave=True, # 进度条在完成后会保留在终端中,而不是被清除掉
ncols=100, # 设置进度条的总宽度为 100 个字符,确保在终端中显示得更整齐
)
for task_id, item in iterator:
process_item(item, proc_idx, counters)
return list(touched_dirs)
def distribute_tasks(items, num_procs):
"""
示例:随机抽取与轮询分配相结合的任务分配。
根据需要替换为其他分片方式(轮询、块分割、工作窃取等)。
"""
total_cnt = ...
buckets = [[] for _ in range(num_procs)]
for i in range(total_cnt):
item = random.choices(items, k=1)[0]
proc = i % num_procs
buckets[proc].append((i, item))
return buckets
def merge_outputs(dir_list):
"""
后处理合并每个进程的分片;根据输出布局调整。
"""
pass
def load_counters_all_proc():
counters_all_proc = defaultdict(lambda: defaultdict(int))
pass
return counters_all_proc
def main(root_dir, num_procs):
# 发现工作项
items = [...] # 填充你的项列表
if not items:
safe_print("未找到项。")
return
num_procs = max(1, num_procs)
tasks_per_proc = distribute_tasks(items, num_procs)
counters_all_proc = load_counters_all_proc()
with mp.Pool(processes=num_procs) as pool:
results = pool.starmap(
worker,
[
(
idx,
tasks,
counters_all_proc[idx],
)
for idx, tasks in enumerate(tasks_per_proc)
],
)
all_dirs = [d for sub in results for d in sub]
merge_outputs(all_dirs)
if __name__ == "__main__":
import sys
if len(sys.argv) != 3:
safe_print("Usage: python script.py <directory> <num_processes>")
sys.exit(1)
root_dir = sys.argv[1]
num_processes = int(sys.argv[2])
main(root_dir, num_processes)
注意事项
- 多进程代码比较重要的事情:
- 有恢复功能:将
load_counter_all_proc函数实现- 注意不要依赖状态文件(也就是不要存储状态文件),因为状态文件可能会很大,读写状态文件非常慢,如果此时程序断了,就会导致错误
- 恢复应该在主进程中直接存储到一个变量里面(就是上面的
counters_all_proc),而不是存储到磁盘里面
- 每个进程之间产生的文件不冲突:上面的代码让每个进程并行运行,产生的文件都附带了进程号(比如
file_select_counts_p{proc_idx}.json),最后可以通过一个函数合并 - 子进程在生成文件的时候,不要生成那种越来越大的文件,不然到后面会导致读写此文件非常慢,如果读写过程中程序断了就会出错
- 应该另写一个脚本来一次性生成这种大文件
- 有恢复功能:将
with mp.Pool(processes=num_procs) as pool:同时启动了num_procs个worker函数- 每个
worker函数处理一个(idx, tasks)元组 - 应该保证
tasks_per_proc的长度与num_procs相同 results是一个列表,长度为num_procs,每个元素是每个worker的返回结果
- 每个

浙公网安备 33010602011771号