PDFtoEXCEL批量处理高保真同步格式

app.py

"""Desktop GUI for PDF to Excel conversion."""

from __future__ import annotations

import threading
from datetime import datetime
from pathlib import Path
from tkinter import filedialog, messagebox

import customtkinter as ctk
import pdfplumber
from PIL import Image

from network_time import fetch_reference_utc, is_past_expiry
from pdf_to_excel import ConversionOptions, convert_folder_to_excel, convert_pdf_to_excel


class App(ctk.CTk):
    """Main GUI application window."""

    def __init__(self) -> None:
        super().__init__()
        self.title("PDF 转 Excel(版式保留)SINGSONG强哥出品")
        self.geometry("1080x920")
        self.minsize(960, 840)

        ctk.set_appearance_mode("dark")
        ctk.set_default_color_theme("blue")

        self.pdf_file_var = ctk.StringVar()
        self.output_file_var = ctk.StringVar()
        self.input_dir_var = ctk.StringVar()
        self.output_dir_var = ctk.StringVar()
        self.status_var = ctk.StringVar(value="就绪")
        self.ocr_fallback_var = ctk.BooleanVar(value=True)
        self.x_tolerance_var = ctk.DoubleVar(value=8.0)
        self.y_tolerance_var = ctk.DoubleVar(value=4.0)
        self.strict_visual_var = ctk.BooleanVar(value=True)
        self.active_tab_name = ctk.StringVar(value="单文件")
        self.preview_image: ctk.CTkImage | None = None
        # None=校验中;True=联网时间未到期且可用;False=已到期或无法取得联网时间
        self._activation_ok: bool | None = None
        # 禁用 Tabview 时会触发 command,若在半禁用状态调用 get()/刷新面板易 Tcl 报错或乱码弹窗
        self._suppress_tab_change_cb = False

        self._build_ui()
        self._apply_waiting_for_network_time()
        threading.Thread(target=self._network_time_check_worker, daemon=True).start()

    def _build_ui(self) -> None:
        """Build all GUI widgets."""
        self.grid_columnconfigure(0, weight=1)
        self.grid_rowconfigure(1, weight=0)
        self.grid_rowconfigure(4, weight=1)

        header = ctk.CTkFrame(self, corner_radius=16)
        header.grid(row=0, column=0, sticky="ew", padx=20, pady=(20, 10))
        header.grid_columnconfigure(0, weight=1)
        ctk.CTkLabel(
            header,
            text="PDF 转 Excel(SINGSONG格式保留增强版)",
            font=ctk.CTkFont(size=28, weight="bold"),
        ).grid(row=0, column=0, sticky="w", padx=20, pady=(18, 6))
        ctk.CTkLabel(
            header,
            text="支持单文件与批量目录处理,尽量还原原 PDF 布局与视觉结构",
            font=ctk.CTkFont(size=14),
            text_color="#b3b3b3",
        ).grid(row=1, column=0, sticky="w", padx=20, pady=(0, 18))

        mode_row = ctk.CTkFrame(self, fg_color="transparent")
        mode_row.grid(row=1, column=0, sticky="ew", padx=20, pady=(4, 8))
        mode_row.grid_columnconfigure(0, weight=1)
        ctk.CTkLabel(
            mode_row,
            text="处理模式(单文件 / 整文件夹批量)",
            font=ctk.CTkFont(size=14, weight="bold"),
        ).grid(row=0, column=0, sticky="w", pady=(0, 6))
        self.mode_segmented = ctk.CTkSegmentedButton(
            mode_row,
            values=["单文件", "批量文件夹"],
            font=ctk.CTkFont(size=15, weight="bold"),
            height=38,
            command=self._on_mode_segment_selected,
            selected_color="#1f6aa5",
            selected_hover_color="#1f6aa5",
            unselected_color="#3a3a3a",
            unselected_hover_color="#4a4a4a",
        )
        self.mode_segmented.grid(row=1, column=0, sticky="w")
        self.mode_segmented.set("单文件")
        self._build_source_panel()
        self._build_options_panel(row=3)

        footer = ctk.CTkFrame(self, corner_radius=16)
        footer.grid(row=4, column=0, sticky="nsew", padx=20, pady=(10, 20))
        footer.grid_columnconfigure(0, weight=1)
        footer.grid_rowconfigure(3, weight=1)

        self.progress_bar = ctk.CTkProgressBar(footer)
        self.progress_bar.grid(row=0, column=0, sticky="ew", padx=16, pady=(16, 8))
        self.progress_bar.set(0)

        action_bar = ctk.CTkFrame(footer, corner_radius=10, fg_color="transparent")
        action_bar.grid(row=1, column=0, sticky="ew", padx=16, pady=(2, 8))
        action_bar.grid_columnconfigure((0, 1, 2), weight=1)
        self.preview_button = ctk.CTkButton(action_bar, text="预览第一页", height=38, command=self.preview_pdf_first_page)
        self.preview_button.grid(
            row=0, column=0, padx=(0, 8), sticky="ew"
        )
        self.run_button = ctk.CTkButton(action_bar, text="开始转换(当前模式)", height=38, command=self.run_current_tab_task)
        self.run_button.grid(
            row=0, column=1, padx=8, sticky="ew"
        )
        self.clear_button = ctk.CTkButton(action_bar, text="清空日志", height=38, command=self.clear_logs)
        self.clear_button.grid(
            row=0, column=2, padx=(8, 0), sticky="ew"
        )

        ctk.CTkLabel(
            footer,
            textvariable=self.status_var,
            font=ctk.CTkFont(size=13),
            anchor="w",
        ).grid(row=2, column=0, sticky="ew", padx=16, pady=(0, 10))

        self.log_box = ctk.CTkTextbox(footer, height=170, corner_radius=10)
        self.log_box.grid(row=3, column=0, sticky="nsew", padx=16, pady=(0, 16))
        self.log_box.insert("1.0", "日志输出区域\n")
        self.log_box.configure(state="disabled")

    def _build_source_panel(self) -> None:
        """Build a clear, always-visible source/target import panel."""
        panel = ctk.CTkFrame(self, corner_radius=12)
        panel.grid(row=2, column=0, sticky="ew", padx=20, pady=(0, 8))
        panel.grid_columnconfigure(1, weight=1)

        self.source_title_label = ctk.CTkLabel(panel, text="导入与导出(单文件)", font=ctk.CTkFont(size=14, weight="bold"))
        self.source_title_label.grid(row=0, column=0, columnspan=3, padx=10, pady=(10, 4), sticky="w")

        self.source_label = ctk.CTkLabel(panel, text="PDF 文件")
        self.source_label.grid(row=1, column=0, padx=14, pady=10, sticky="w")
        self.source_entry = ctk.CTkEntry(panel, textvariable=self.pdf_file_var)
        self.source_entry.grid(row=1, column=1, padx=8, pady=10, sticky="ew")
        self.source_button = ctk.CTkButton(panel, text="选择", width=90, command=self.choose_pdf_file)
        self.source_button.grid(row=1, column=2, padx=(0, 14), pady=10)

        self.target_label = ctk.CTkLabel(panel, text="输出 Excel")
        self.target_label.grid(row=2, column=0, padx=14, pady=(0, 12), sticky="w")
        self.target_entry = ctk.CTkEntry(panel, textvariable=self.output_file_var)
        self.target_entry.grid(row=2, column=1, padx=8, pady=(0, 12), sticky="ew")
        self.target_button = ctk.CTkButton(panel, text="选择", width=90, command=self.choose_output_file)
        self.target_button.grid(row=2, column=2, padx=(0, 14), pady=(0, 12))
        self._refresh_source_panel()

    def _features_unlocked(self) -> bool:
        """功能是否已解锁(联网时间校验通过且未过授权期)。"""
        return self._activation_ok is True

    def _network_time_check_worker(self) -> None:
        """在后台线程请求公网时间,避免阻塞界面绘制。"""
        try:
            ref = fetch_reference_utc()
        except Exception:
            ref = None
        self.after(0, lambda r=ref: self._apply_network_time_result(r))

    def _apply_waiting_for_network_time(self) -> None:
        """校验完成前暂时禁用操作区。"""
        self._suppress_tab_change_cb = True
        self.status_var.set("正在联网校验时间...")
        self.mode_segmented.configure(state="disabled")
        self.preview_button.configure(state="disabled")
        self.run_button.configure(state="disabled")
        self.clear_button.configure(state="disabled")
        self.source_button.configure(state="disabled")
        self.target_button.configure(state="disabled")
        self.log_box.configure(state="normal")
        self.log_box.delete("1.0", "end")
        self.log_box.insert("1.0", "正在从互联网获取标准时间,请稍候...\n")
        self.log_box.configure(state="disabled")

    def _restore_functional_ui(self) -> None:
        """联网时间校验通过且未到期时恢复全部可操作控件。"""
        self.mode_segmented.configure(state="normal")
        self.preview_button.configure(state="normal")
        self.run_button.configure(state="normal")
        self.clear_button.configure(state="normal")
        self.source_button.configure(state="normal")
        self.target_button.configure(state="normal")
        self.status_var.set("就绪")
        self.log_box.configure(state="normal")
        self.log_box.delete("1.0", "end")
        self.log_box.insert("1.0", "日志输出区域\n")
        self.log_box.configure(state="disabled")
        self._suppress_tab_change_cb = False

    def _apply_time_verify_failed_state(self) -> None:
        """无法从公网取得参考时间:拒绝使用(防止断网后篡改本机日期绕过)。不弹窗。"""
        self._suppress_tab_change_cb = True
        self._activation_ok = False
        try:
            self.status_var.set("时间校验失败:请连接互联网后重试")
            self.mode_segmented.configure(state="disabled")
            self.preview_button.configure(state="disabled")
            self.run_button.configure(state="disabled")
            self.clear_button.configure(state="disabled")
            self.source_button.configure(state="disabled")
            self.target_button.configure(state="disabled")
            self.log_box.configure(state="normal")
            self.log_box.delete("1.0", "end")
            self.log_box.insert(
                "1.0",
                "无法从互联网获取标准时间。请检查网络、代理或防火墙后重新启动本程序。\n",
            )
            self.log_box.configure(state="disabled")
        except Exception:
            pass

    def _apply_network_time_result(self, ref: datetime | None) -> None:
        """在主线程根据联网参考时间更新授权状态与界面。"""
        try:
            if ref is None:
                self._apply_time_verify_failed_state()
                return
            if is_past_expiry(ref):
                self._activation_ok = False
                self._apply_expired_state()
                return
            self._activation_ok = True
            self._restore_functional_ui()
        except Exception:
            # 避免 Tcl 异常冒泡为系统级“程序错误”乱码对话框
            self._activation_ok = False
            self._apply_expired_state()

    def _apply_expired_state(self) -> None:
        """到期后静默禁用全部功能,不弹任何提示框。"""
        self._suppress_tab_change_cb = True
        try:
            self.status_var.set("就绪")
            self.mode_segmented.configure(state="disabled")
            self.preview_button.configure(state="disabled")
            self.run_button.configure(state="disabled")
            self.clear_button.configure(state="disabled")
            self.source_button.configure(state="disabled")
            self.target_button.configure(state="disabled")
            self.log_box.configure(state="normal")
            self.log_box.delete("1.0", "end")
            self.log_box.insert("1.0", "日志输出区域\n")
            self.log_box.configure(state="disabled")
        except Exception:
            pass

    def _build_options_panel(self, row: int) -> None:
        """Build shared conversion options panel."""
        panel = ctk.CTkFrame(self, corner_radius=12)
        panel.grid(row=row, column=0, sticky="ew", padx=20, pady=(0, 8))
        panel.grid_columnconfigure((0, 1, 2, 3), weight=1)

        ctk.CTkLabel(panel, text="转换参数", font=ctk.CTkFont(size=14, weight="bold")).grid(
            row=0, column=0, padx=10, pady=(10, 4), sticky="w"
        )
        ctk.CTkSwitch(panel, text="启用 OCR 兜底(扫描件)", variable=self.ocr_fallback_var).grid(
            row=0, column=1, padx=8, pady=(10, 4), sticky="w"
        )

        ctk.CTkLabel(panel, text="X坐标聚类容差").grid(row=1, column=0, padx=10, pady=8, sticky="w")
        ctk.CTkSlider(panel, from_=4, to=20, variable=self.x_tolerance_var).grid(
            row=1, column=1, padx=8, pady=8, sticky="ew"
        )
        self.x_value_label = ctk.CTkLabel(panel, text="8.0")
        self.x_value_label.grid(row=1, column=2, padx=8, pady=8, sticky="w")

        ctk.CTkLabel(panel, text="Y坐标聚类容差").grid(row=2, column=0, padx=10, pady=(0, 10), sticky="w")
        ctk.CTkSlider(panel, from_=2, to=12, variable=self.y_tolerance_var).grid(
            row=2, column=1, padx=8, pady=(0, 10), sticky="ew"
        )
        self.y_value_label = ctk.CTkLabel(panel, text="4.0")
        self.y_value_label.grid(row=2, column=2, padx=8, pady=(0, 10), sticky="w")

        strict_switch = ctk.CTkSwitch(panel, text="严格保真模式(已强制开启)", variable=self.strict_visual_var)
        strict_switch.grid(row=3, column=0, columnspan=2, padx=10, pady=(0, 10), sticky="w")
        strict_switch.select()
        strict_switch.configure(state="disabled")

        self.x_tolerance_var.trace_add("write", self._refresh_option_labels)
        self.y_tolerance_var.trace_add("write", self._refresh_option_labels)
        self._refresh_option_labels()

    def _refresh_option_labels(self, *_args: object) -> None:
        """Refresh displayed numeric values for sliders."""
        self.x_value_label.configure(text=f"{self.x_tolerance_var.get():.1f}")
        self.y_value_label.configure(text=f"{self.y_tolerance_var.get():.1f}")

    def _on_mode_segment_selected(self, value: str) -> None:
        """切换单文件 / 批量文件夹时更新下方导入导出区绑定。"""
        if self._suppress_tab_change_cb:
            return
        self.active_tab_name.set(value)
        self._refresh_source_panel()

    def _refresh_source_panel(self) -> None:
        """Switch input controls based on selected tab."""
        if self.active_tab_name.get() == "批量文件夹":
            self.source_title_label.configure(text="导入与导出(批量文件夹)")
            self.source_label.configure(text="PDF 文件夹")
            self.source_entry.configure(textvariable=self.input_dir_var)
            self.source_button.configure(command=self.choose_input_dir)
            self.target_label.configure(text="输出文件夹")
            self.target_entry.configure(textvariable=self.output_dir_var)
            self.target_button.configure(command=self.choose_output_dir)
        else:
            self.source_title_label.configure(text="导入与导出(单文件)")
            self.source_label.configure(text="PDF 文件")
            self.source_entry.configure(textvariable=self.pdf_file_var)
            self.source_button.configure(command=self.choose_pdf_file)
            self.target_label.configure(text="输出 Excel")
            self.target_entry.configure(textvariable=self.output_file_var)
            self.target_button.configure(command=self.choose_output_file)

    def _append_log(self, message: str) -> None:
        """Append one log message to GUI log box."""
        now = datetime.now().strftime("%H:%M:%S")
        self.log_box.configure(state="normal")
        self.log_box.insert("end", f"[{now}] {message}\n")
        self.log_box.see("end")
        self.log_box.configure(state="disabled")

    def clear_logs(self) -> None:
        """Clear GUI logs."""
        if not self._features_unlocked():
            return
        self.log_box.configure(state="normal")
        self.log_box.delete("1.0", "end")
        self.log_box.insert("1.0", "日志输出区域\n")
        self.log_box.configure(state="disabled")

    def choose_pdf_file(self) -> None:
        """Choose one PDF input file."""
        if not self._features_unlocked():
            return
        path = filedialog.askopenfilename(filetypes=[("PDF 文件", "*.pdf")])
        if path:
            self.pdf_file_var.set(path)
            default_out = str(Path(path).with_suffix(".xlsx"))
            self.output_file_var.set(default_out)

    def choose_output_file(self) -> None:
        """Choose output Excel file path."""
        if not self._features_unlocked():
            return
        path = filedialog.asksaveasfilename(defaultextension=".xlsx", filetypes=[("Excel 文件", "*.xlsx")])
        if path:
            self.output_file_var.set(path)

    def choose_input_dir(self) -> None:
        """Choose input folder containing PDF files."""
        if not self._features_unlocked():
            return
        path = filedialog.askdirectory()
        if path:
            self.input_dir_var.set(path)

    def choose_output_dir(self) -> None:
        """Choose output folder for generated files."""
        if not self._features_unlocked():
            return
        path = filedialog.askdirectory()
        if path:
            self.output_dir_var.set(path)

    def _set_running_state(self, running: bool) -> None:
        """Set UI progress and status for running/idle states."""
        if not self._features_unlocked():
            return
        if running:
            self.progress_bar.set(0.1)
            self.status_var.set("处理中,请稍候...")
        else:
            self.progress_bar.set(1.0)
            self.status_var.set("完成")

    def run_current_tab_task(self) -> None:
        """Run conversion based on selected tab."""
        if not self._features_unlocked():
            return
        tab = self.active_tab_name.get()
        if tab == "批量文件夹":
            self.run_batch_conversion()
        else:
            self.run_single_conversion()

    def preview_pdf_first_page(self) -> None:
        """Preview first page of selected PDF file."""
        if not self._features_unlocked():
            return
        pdf_path = self.pdf_file_var.get().strip()
        if not pdf_path:
            messagebox.showwarning("提示", "请先在上方选择「单文件」模式,并选择一个 PDF 文件。")
            return

        try:
            with pdfplumber.open(pdf_path) as pdf:
                if len(pdf.pages) == 0:
                    messagebox.showwarning("提示", "PDF 没有可预览页面。")
                    return
                image = pdf.pages[0].to_image(resolution=170).original.convert("RGB")
                max_width, max_height = 980, 850
                image.thumbnail((max_width, max_height), Image.Resampling.LANCZOS)

            preview = ctk.CTkToplevel(self)
            preview.title("PDF 第1页预览")
            preview.geometry(f"{image.width + 40}x{image.height + 80}")
            preview.transient(self)
            preview.focus()

            self.preview_image = ctk.CTkImage(light_image=image, dark_image=image, size=(image.width, image.height))
            ctk.CTkLabel(preview, image=self.preview_image, text="").pack(padx=20, pady=(16, 8))
            ctk.CTkLabel(preview, text=Path(pdf_path).name, text_color="#b3b3b3").pack(pady=(0, 12))
            self._append_log("已打开 PDF 第1页预览窗口。")
        except Exception as exc:
            messagebox.showerror("预览失败", f"无法预览该 PDF:{exc}")


    def _build_options(self) -> ConversionOptions:
        """Build conversion options from current GUI controls."""
        # 用户要求“格式不能变”为硬性条件,这里强制走严格保真模式。
        return ConversionOptions(
            x_tolerance=float(self.x_tolerance_var.get()),
            y_tolerance=float(self.y_tolerance_var.get()),
            enable_ocr_fallback=bool(self.ocr_fallback_var.get()),
            ocr_min_confidence=0.45,
            strict_visual_mode=True,
        )

    def run_single_conversion(self) -> None:
        """Run single file conversion in a background thread."""
        if not self._features_unlocked():
            return
        pdf_path = self.pdf_file_var.get().strip()
        output_path = self.output_file_var.get().strip()
        if not pdf_path or not output_path:
            messagebox.showwarning("提示", "请先选择 PDF 与输出路径。")
            return

        def worker() -> None:
            self._set_running_state(True)
            self._append_log("开始单文件任务")
            options = self._build_options()
            ok, msg = convert_pdf_to_excel(pdf_path, output_path, log=self._append_log, options=options)
            self._append_log(msg)
            self._set_running_state(False)
            if ok:
                messagebox.showinfo("成功", msg)
            else:
                messagebox.showerror("失败", msg)

        threading.Thread(target=worker, daemon=True).start()

    def run_batch_conversion(self) -> None:
        """Run folder batch conversion in a background thread."""
        if not self._features_unlocked():
            return
        input_dir = self.input_dir_var.get().strip()
        output_dir = self.output_dir_var.get().strip()
        if not input_dir or not output_dir:
            messagebox.showwarning("提示", "请先选择输入与输出文件夹。")
            return

        def worker() -> None:
            self._set_running_state(True)
            self._append_log("开始批量任务")
            options = self._build_options()
            result = convert_folder_to_excel(input_dir, output_dir, log=self._append_log, options=options)
            self._set_running_state(False)

            total = int(result["total"])
            success = int(result["success"])
            failed = int(result["failed"])
            self._append_log(f"批量完成:总计 {total},成功 {success},失败 {failed}")

            if failed == 0:
                messagebox.showinfo("完成", f"批量转换完成,共 {success}/{total} 成功。")
            else:
                messagebox.showwarning("部分失败", f"共 {total} 个文件,成功 {success},失败 {failed}。")

        threading.Thread(target=worker, daemon=True).start()


def main() -> None:
    """Application entrypoint."""
    app = App()
    app.mainloop()


if __name__ == "__main__":
    main()

具体详细的代码 请参考github

network_time.py

"""从公网获取参考时间,用于与本地时钟解耦的授权截止判断。"""

from __future__ import annotations

import ssl
import urllib.error
import urllib.request
from datetime import datetime, timezone
from email.utils import parsedate_to_datetime
from zoneinfo import ZoneInfo

# 与北京时间日历对齐:到达该日 0 点(上海时区)即视为到期
_SHANGHAI = ZoneInfo("Asia/Shanghai")
EXPIRY_CUTOFF_SHANGHAI = datetime(2027, 5, 30, 0, 0, 0, tzinfo=_SHANGHAI)

# 多个站点顺序尝试,降低单点故障;仅使用响应头 Date,不依赖页面正文
_TIME_SOURCE_URLS: tuple[str, ...] = (
    "https://www.baidu.com/",
    "https://www.qq.com/",
    "https://www.microsoft.com/",
    "https://www.cloudflare.com/",
)


def fetch_reference_utc(timeout_per_url: float = 2.5) -> datetime | None:
    """
    通过 HTTPS 响应头 `Date` 获取当前 UTC 时间。

    :param timeout_per_url: 每个 URL 的请求超时(秒)
    :return: 带时区信息的 UTC 时间;全部失败时返回 None
    """
    ctx = ssl.create_default_context()
    headers = {"User-Agent": "PDFtoEXCEL-TimeCheck/1.0"}
    for url in _TIME_SOURCE_URLS:
        for method in ("HEAD", "GET"):
            try:
                req = urllib.request.Request(url, headers=headers, method=method)
                with urllib.request.urlopen(req, timeout=timeout_per_url, context=ctx) as resp:
                    raw = resp.headers.get("Date")
                    if not raw:
                        continue
                    dt = parsedate_to_datetime(raw)
                    if dt.tzinfo is None:
                        dt = dt.replace(tzinfo=timezone.utc)
                    return dt.astimezone(timezone.utc)
            except (urllib.error.URLError, urllib.error.HTTPError, OSError, ValueError, TypeError):
                continue
    return None


def is_past_expiry(reference_utc: datetime) -> bool:
    """
    判断参考时间是否已达到或超过授权截止(按 Asia/Shanghai 日历日)。

    :param reference_utc: 通常为 `fetch_reference_utc()` 的返回值
    """
    return reference_utc.astimezone(_SHANGHAI) >= EXPIRY_CUTOFF_SHANGHAI
View Code

 

posted @ 2026-05-07 15:26  *感悟人生*  阅读(7)  评论(0)    收藏  举报