Files
problem-bank/backend/services/parser.py
2026-03-06 15:52:34 +08:00

233 lines
8.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import re
import shutil
import subprocess
import tempfile
from pathlib import Path
import requests
from backend.config import settings
class OpenAICompatibleParserService:
def __init__(self) -> None:
self.api_key = settings.api_key
self.model_name = settings.model_name
self.api_url = settings.openai_api_url
def parse_file(self, file_path: str) -> list[dict]:
path = Path(file_path)
if path.suffix.lower() != ".pdf":
pdf_path = self._convert_to_pdf(path)
else:
pdf_path = path
try:
file_url = self._upload_to_temp_host(pdf_path)
questions = self._parse_with_file_url(file_url, path.name)
finally:
if pdf_path != path and pdf_path.exists():
pdf_path.unlink(missing_ok=True)
return questions
def _upload_to_temp_host(self, path: Path) -> str:
try:
with path.open("rb") as f:
response = requests.post("https://file.io", files={"file": f}, timeout=60)
if response.status_code == 200 and response.json().get("success"):
return response.json()["link"]
except Exception:
pass
with path.open("rb") as f:
response = requests.post("https://tmpfiles.org/api/v1/upload", files={"file": f}, timeout=60)
if response.status_code != 200:
raise ValueError(f"上传失败: {response.text}")
data = response.json()
if data.get("status") != "success":
raise ValueError(f"上传失败: {data}")
return data["data"]["url"].replace("tmpfiles.org/", "tmpfiles.org/dl/")
def _convert_to_pdf(self, path: Path) -> Path:
pdf_path = path.with_suffix(".pdf")
pdf_path.unlink(missing_ok=True)
source_path = path
temp_dir: str | None = None
if path.suffix.lower() == ".doc":
source_path, temp_dir = self._convert_doc_to_docx(path)
try:
cmd = [
"pandoc",
str(source_path),
"-o",
str(pdf_path),
"--pdf-engine=xelatex",
"-V",
"CJKmainfont=PingFang SC",
]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=90)
if result.returncode != 0:
fallback = [
"pandoc",
str(source_path),
"-o",
str(pdf_path),
"--pdf-engine=weasyprint",
]
result = subprocess.run(fallback, capture_output=True, text=True, timeout=90)
if result.returncode != 0:
raise ValueError(f"文件转 PDF 失败: {result.stderr}")
return pdf_path
finally:
if temp_dir:
shutil.rmtree(temp_dir, ignore_errors=True)
def _convert_doc_to_docx(self, path: Path) -> tuple[Path, str]:
temp_dir = tempfile.mkdtemp(prefix="pb_doc_convert_")
converted_path = Path(temp_dir) / f"{path.stem}.docx"
convert_errors: list[str] = []
if shutil.which("soffice"):
result = subprocess.run(
[
"soffice",
"--headless",
"--convert-to",
"docx",
"--outdir",
temp_dir,
str(path),
],
capture_output=True,
text=True,
timeout=120,
)
if result.returncode == 0 and converted_path.exists():
return converted_path, temp_dir
convert_errors.append(f"soffice: {(result.stderr or result.stdout).strip()}")
else:
convert_errors.append("soffice: 未安装")
if shutil.which("textutil"):
result = subprocess.run(
[
"textutil",
"-convert",
"docx",
"-output",
str(converted_path),
str(path),
],
capture_output=True,
text=True,
timeout=120,
)
if result.returncode == 0 and converted_path.exists():
return converted_path, temp_dir
convert_errors.append(f"textutil: {(result.stderr or result.stdout).strip()}")
else:
convert_errors.append("textutil: 未安装")
raise ValueError(
"检测到 .doc 文件pandoc 不支持直接转换。"
"已尝试自动转换为 .docx 但失败。请先把 .doc 另存为 .docx 后重试。"
f" 详细信息: {' | '.join(convert_errors)}"
)
def _parse_with_file_url(self, file_url: str, original_filename: str) -> list[dict]:
if not self.api_key:
raise ValueError("未配置 API_KEY无法调用 OpenAI 兼容接口")
payload = {
"model": self.model_name,
"input": [
{
"role": "user",
"content": [
{"type": "input_file", "file_url": file_url},
{"type": "input_text", "text": self._build_instruction(original_filename)},
],
}
],
}
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
response = requests.post(
self.api_url, headers=headers, data=json.dumps(payload), timeout=180
)
if response.status_code != 200:
raise ValueError(f"OpenAI 兼容接口请求失败: {response.status_code} {response.text}")
return self._extract_questions(response.json())
def _build_instruction(self, filename: str) -> str:
return f"""请从文件\"{filename}\"中提取所有题目信息并以JSON数组格式返回。
提取字段题干、选项A、选项B、选项C、选项D、正确答案、解析、备注。
数学公式统一转为 LaTeX内联公式使用 $...$,独立公式使用 $$...$$。
答案统一大写字母,清理“答案:”等前缀。
仅返回 JSON 数组,不要返回额外说明。"""
def _extract_questions(self, response: dict) -> list[dict]:
if response.get("status") != "completed":
raise ValueError(f"响应状态异常: {response.get('status')}")
text = None
for item in response.get("output", []):
if item.get("type") == "message":
for content in item.get("content", []):
if content.get("type") == "output_text":
text = content.get("text")
break
if not text:
raise ValueError("未在响应中找到文本内容")
questions = self._parse_json(text)
for q in questions:
for field in ["选项A", "选项B", "选项C", "选项D", "解析", "备注"]:
q.setdefault(field, "")
return questions
def _parse_json(self, text: str) -> list[dict]:
start_idx = text.find("[")
end_idx = text.rfind("]")
if start_idx < 0 or end_idx < 0:
raise ValueError(f"未找到 JSON 数组: {text[:200]}")
json_str = text[start_idx : end_idx + 1]
data = json.loads(json_str)
if not isinstance(data, list):
raise ValueError("解析结果不是数组")
for index, item in enumerate(data):
if "题干" not in item or "正确答案" not in item:
raise ValueError(f"{index + 1} 题缺少题干或正确答案")
item["正确答案"] = re.sub(r"[^A-D0-9]", "", str(item.get("正确答案", "")).upper())
return data
def extract_metadata(filename: str) -> dict:
basename = Path(filename).stem
separators = ["+", " ", "-", "_"]
parts = [basename]
for sep in separators:
if sep in basename:
parts = basename.split(sep)
break
secondary = parts[0].strip() if len(parts) > 0 else ""
raw_type = parts[1].strip() if len(parts) > 1 else ""
difficulty = parts[2].strip() if len(parts) > 2 else ""
mapped_type = settings.type_map.get(raw_type, raw_type)
chapter = secondary.split("")[0] if "" in secondary else secondary
chapter = chapter or "未分类"
return {
"chapter": chapter,
"secondary_knowledge": secondary,
"question_type": mapped_type,
"difficulty": difficulty,
"source_file": filename,
}