跟着AI学AI - 诊断结论信息抽取 - 批量处理脚本
批量处理脚本
batch_process.py
# batch_process.py
import json
import os
from tqdm import tqdm
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
class BatchNERProcessor:
def __init__(self, api_url='http://localhost:8000/predict'):
self.api_url = api_url
def process_single(self, report):
"""处理单个报告"""
try:
response = requests.post(
self.api_url,
json={'text': report['text'], 'report_id': report.get('id')}
)
if response.status_code == 200:
return response.json()
else:
return None
except Exception as e:
print(f"处理失败: {e}")
return None
def process_batch(self, reports, max_workers=4):
"""批量处理报告"""
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(self.process_single, report): report for report in reports}
for future in tqdm(as_completed(futures), total=len(reports), desc="处理报告"):
result = future.result()
if result:
results.append(result)
return results
def process_file(self, input_file, output_file):
"""处理文件中的报告"""
print(f"读取文件: {input_file}")
with open(input_file, 'r', encoding='utf-8') as f:
reports = json.load(f)
print(f"处理 {len(reports)} 个报告...")
results = self.process_batch(reports)
print(f"保存结果到: {output_file}")
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(results, f, indent=2, ensure_ascii=False)
return results
# 使用示例
if __name__ == "__main__":
processor = BatchNERProcessor()
# 处理新报告
new_reports = [
{"id": "001", "text": "平均心率为82次/分,最快心率是158次/分..."},
{"id": "002", "text": "心率监测:平均68次/分,最高132次/分..."}
]
results = processor.process_batch(new_reports)
for result in results:
print(f"\n报告 {result['report_id']}:")
print(f"实体数: {len(result['entities'])}")
print(f"摘要: {result['summary']}")
持续优化流程
# continuous_improvement.py
class ActiveLearning:
"""主动学习循环"""
def __init__(self, model_path, unlabeled_data_path):
self.model_path = model_path
self.unlabeled_data = self.load_unlabeled_data(unlabeled_data_path)
self.ner_service = NERService(model_path)
def load_unlabeled_data(self, path):
with open(path, 'r', encoding='utf-8') as f:
return json.load(f)
def calculate_uncertainty(self, text):
"""计算模型的不确定性"""
# 使用模型预测概率的熵作为不确定性度量
# 实现细节...
return uncertainty_score
def select_samples_to_label(self, n_samples=10):
"""选择最有价值的样本进行人工标注"""
uncertainties = []
for report in self.unlabeled_data:
uncertainty = self.calculate_uncertainty(report['text'])
uncertainties.append((report, uncertainty))
# 选择不确定性最高的n个样本
uncertainties.sort(key=lambda x: x[1], reverse=True)
return [report for report, _ in uncertainties[:n_samples]]
def retrain_model(self, new_labeled_data):
"""使用新标注数据重新训练模型"""
# 合并新旧标注数据
# 重新训练
pass
# 定时重训练脚本
def scheduled_retraining():
"""定期重新训练模型"""
import schedule
import time
def retrain_job():
print(f"{datetime.now()}: 开始重新训练模型")
# 1. 收集新标注数据
# 2. 重新训练
# 3. 评估效果
# 4. 部署新模型
pass
# 每周一凌晨3点重新训练
schedule.every().monday.at("03:00").do(retrain_job)
while True:
schedule.run_pending()
time.sleep(60)
ecg_ner_project/
├── data/
│ ├── raw/ # 原始数据
│ ├── labeled/ # 标注数据
│ └── out/ # 转换后的数据
├── models/
│ └── ecg_ner_model/ # 训练好的模型
├── src/
│ ├── data_preparation.py # 数据准备
│ ├── train_model.py # 模型训练
│ ├── evaluate_model.py # 模型评估
│ ├── ner_service.py # API服务
│ ├── batch_process.py # 批量处理
│ └── continuous_improvement.py # 持续优化
├── scripts/
│ ├── run_training.sh # 训练脚本
│ ├── run_api.sh # 启动API
│ └── batch_predict.py # 批量预测
├── tests/
│ ├── test_model.py # 模型测试
│ └── test_api.py # API测试
├── requirements.txt # 依赖包
├── config.yaml # 配置文件
└── README.md # 项目说明
快速启动命令
# 1. 训练模型
python src/train_model.py
# 2. 启动API服务
uvicorn src.ner_service:app --reload --port 8000
# 3. 测试API
curl -X POST "http://localhost:8000/predict" \
-H "Content-Type: application/json" \
-d '{"text":"平均心率为76次/分"}'
# 4. 批量处理
python scripts/batch_predict.py --input data/new_reports.json --output results.json
本文来自博客园,作者:VipSoft 转载请注明原文链接:https://chuna2.787528.xyz/vipsoft/p/20016599
浙公网安备 33010602011771号