跟着AI学AI - 诊断结论信息抽取 - 批量处理脚本

批量处理脚本

batch_process.py

# batch_process.py
import json
import os
from tqdm import tqdm
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed

class BatchNERProcessor:
    def __init__(self, api_url='http://localhost:8000/predict'):
        self.api_url = api_url
    
    def process_single(self, report):
        """处理单个报告"""
        try:
            response = requests.post(
                self.api_url,
                json={'text': report['text'], 'report_id': report.get('id')}
            )
            if response.status_code == 200:
                return response.json()
            else:
                return None
        except Exception as e:
            print(f"处理失败: {e}")
            return None
    
    def process_batch(self, reports, max_workers=4):
        """批量处理报告"""
        results = []
        
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = {executor.submit(self.process_single, report): report for report in reports}
            
            for future in tqdm(as_completed(futures), total=len(reports), desc="处理报告"):
                result = future.result()
                if result:
                    results.append(result)
        
        return results
    
    def process_file(self, input_file, output_file):
        """处理文件中的报告"""
        print(f"读取文件: {input_file}")
        with open(input_file, 'r', encoding='utf-8') as f:
            reports = json.load(f)
        
        print(f"处理 {len(reports)} 个报告...")
        results = self.process_batch(reports)
        
        print(f"保存结果到: {output_file}")
        with open(output_file, 'w', encoding='utf-8') as f:
            json.dump(results, f, indent=2, ensure_ascii=False)
        
        return results

# 使用示例
if __name__ == "__main__":
    processor = BatchNERProcessor()
    
    # 处理新报告
    new_reports = [
        {"id": "001", "text": "平均心率为82次/分,最快心率是158次/分..."},
        {"id": "002", "text": "心率监测:平均68次/分,最高132次/分..."}
    ]
    
    results = processor.process_batch(new_reports)
    
    for result in results:
        print(f"\n报告 {result['report_id']}:")
        print(f"实体数: {len(result['entities'])}")
        print(f"摘要: {result['summary']}")

持续优化流程

# continuous_improvement.py
class ActiveLearning:
    """主动学习循环"""
    
    def __init__(self, model_path, unlabeled_data_path):
        self.model_path = model_path
        self.unlabeled_data = self.load_unlabeled_data(unlabeled_data_path)
        self.ner_service = NERService(model_path)
    
    def load_unlabeled_data(self, path):
        with open(path, 'r', encoding='utf-8') as f:
            return json.load(f)
    
    def calculate_uncertainty(self, text):
        """计算模型的不确定性"""
        # 使用模型预测概率的熵作为不确定性度量
        # 实现细节...
        return uncertainty_score
    
    def select_samples_to_label(self, n_samples=10):
        """选择最有价值的样本进行人工标注"""
        uncertainties = []
        for report in self.unlabeled_data:
            uncertainty = self.calculate_uncertainty(report['text'])
            uncertainties.append((report, uncertainty))
        
        # 选择不确定性最高的n个样本
        uncertainties.sort(key=lambda x: x[1], reverse=True)
        return [report for report, _ in uncertainties[:n_samples]]
    
    def retrain_model(self, new_labeled_data):
        """使用新标注数据重新训练模型"""
        # 合并新旧标注数据
        # 重新训练
        pass

# 定时重训练脚本
def scheduled_retraining():
    """定期重新训练模型"""
    import schedule
    import time
    
    def retrain_job():
        print(f"{datetime.now()}: 开始重新训练模型")
        # 1. 收集新标注数据
        # 2. 重新训练
        # 3. 评估效果
        # 4. 部署新模型
        pass
    
    # 每周一凌晨3点重新训练
    schedule.every().monday.at("03:00").do(retrain_job)
    
    while True:
        schedule.run_pending()
        time.sleep(60)
ecg_ner_project/
├── data/
│   ├── raw/                    # 原始数据
│   ├── labeled/                # 标注数据
│   └── out/                    # 转换后的数据
├── models/
│   └── ecg_ner_model/          # 训练好的模型
├── src/
│   ├── data_preparation.py     # 数据准备
│   ├── train_model.py          # 模型训练
│   ├── evaluate_model.py       # 模型评估
│   ├── ner_service.py          # API服务
│   ├── batch_process.py        # 批量处理
│   └── continuous_improvement.py # 持续优化
├── scripts/
│   ├── run_training.sh         # 训练脚本
│   ├── run_api.sh              # 启动API
│   └── batch_predict.py        # 批量预测
├── tests/
│   ├── test_model.py           # 模型测试
│   └── test_api.py             # API测试
├── requirements.txt            # 依赖包
├── config.yaml                 # 配置文件
└── README.md                   # 项目说明

快速启动命令

# 1. 训练模型
python src/train_model.py

# 2. 启动API服务
uvicorn src.ner_service:app --reload --port 8000

# 3. 测试API
curl -X POST "http://localhost:8000/predict" \
     -H "Content-Type: application/json" \
     -d '{"text":"平均心率为76次/分"}'

# 4. 批量处理
python scripts/batch_predict.py --input data/new_reports.json --output results.json
posted @ 2026-05-22 10:29  VipSoft  阅读(10)  评论(0)    收藏  举报