Files
problem-bank/analysis.py
2026-03-05 11:50:15 +08:00

551 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
DMXAPI直接解析 - 无需本地文本提取
将文件上传到临时托管服务然后使用DMXAPI的responses接口分析
"""
import sys
import json
import requests
from pathlib import Path
# 添加项目路径
sys.path.insert(0, str(Path(__file__).parent))
from config import settings
from config.database import get_db_manager
class DMXAPIParser:
"""DMXAPI解析器"""
def __init__(self):
self.api_key = settings.API_KEY
self.model_name = settings.MODEL_NAME
self.api_url = "https://www.dmxapi.cn/v1/responses"
def upload_to_temp_host(self, file_path):
"""
上传文件到临时托管服务获取公网URL
Args:
file_path: 文件路径
Returns:
文件的公网访问URL
"""
path = Path(file_path)
print(f' 上传PDF到临时托管服务...')
print(f' 文件: {path.name}')
print(f' 大小: {path.stat().st_size / 1024:.2f} KB')
# 使用 file.io 临时文件托管24小时有效一次下载后删除
try:
with open(file_path, 'rb') as f:
files = {'file': f}
response = requests.post(
'https://file.io',
files=files,
timeout=60
)
if response.status_code == 200:
result = response.json()
if result.get('success'):
file_url = result['link']
print(f' ✓ 上传成功')
print(f' URL: {file_url}')
return file_url
else:
raise Exception(f'上传失败: {result}')
else:
raise Exception(f'HTTP错误: {response.status_code} - {response.text}')
except Exception as e:
print(f' ✗ file.io上传失败: {e}')
print(' 尝试备用服务...')
# 备用方案:使用 tmpfiles.org (7天有效)
try:
with open(file_path, 'rb') as f:
files = {'file': f}
response = requests.post(
'https://tmpfiles.org/api/v1/upload',
files=files,
timeout=60
)
if response.status_code == 200:
result = response.json()
if result.get('status') == 'success':
# tmpfiles.org返回的URL需要调整
url = result['data']['url']
# 转换为直接下载链接
file_url = url.replace('tmpfiles.org/', 'tmpfiles.org/dl/')
print(f' ✓ 上传成功(备用服务)')
print(f' URL: {file_url}')
return file_url
raise Exception(f'备用服务也失败: {response.text}')
except Exception as e2:
raise Exception(f'所有上传服务均失败: {e2}')
def parse_file(self, file_path):
"""
使用DMXAPI分析文件
Args:
file_path: 文件路径
Returns:
解析出的题目列表
"""
print(f'\n[2/4] 准备文件并调用DMXAPI...')
path = Path(file_path)
# 判断文件类型如果不是PDF则转换
if path.suffix.lower() != '.pdf':
print(f' 检测到 {path.suffix} 文件使用pandoc转换为PDF')
pdf_path = self._convert_to_pdf(file_path)
else:
print(f' 检测到PDF文件直接使用')
pdf_path = file_path
# 上传PDF到临时服务
file_url = self.upload_to_temp_host(pdf_path)
# 使用DMXAPI分析
questions = self._parse_with_file_url(file_url, path.name)
# 清理临时PDF如果是转换生成的
if pdf_path != file_path:
try:
Path(pdf_path).unlink()
print(f' ✓ 已清理临时PDF文件')
except:
pass
return questions
def _convert_to_pdf(self, file_path):
"""
使用pandoc将文件转换为PDF
Args:
file_path: 原始文件路径
Returns:
转换后的PDF文件路径
"""
import subprocess
path = Path(file_path)
pdf_path = path.with_suffix('.pdf')
# 如果PDF已存在先删除
if pdf_path.exists():
pdf_path.unlink()
print(f' 运行: pandoc (使用xelatex引擎)')
try:
# 使用xelatex引擎支持中文
result = subprocess.run(
[
'pandoc',
str(path),
'-o', str(pdf_path),
'--pdf-engine=xelatex',
'-V', 'CJKmainfont=PingFang SC' # macOS中文字体
],
capture_output=True,
text=True,
timeout=60
)
if result.returncode != 0:
# xelatex失败尝试使用weasyprint
print(f' xelatex失败尝试weasyprint引擎...')
result = subprocess.run(
[
'pandoc',
str(path),
'-o', str(pdf_path),
'--pdf-engine=weasyprint'
],
capture_output=True,
text=True,
timeout=60
)
if result.returncode != 0:
raise Exception(f'所有PDF引擎均失败\nxelatex错误: {result.stderr}')
if not pdf_path.exists():
raise Exception('PDF文件未生成')
print(f' ✓ 转换成功: {pdf_path.name}')
return str(pdf_path)
except FileNotFoundError:
raise Exception('未找到pandoc命令请先安装pandoc\n macOS: brew install pandoc\n Ubuntu: sudo apt install pandoc')
except subprocess.TimeoutExpired:
raise Exception('pandoc转换超时')
def _parse_with_file_url(self, file_url, original_filename):
"""使用file_url方式解析PDF"""
print(f' 文件URL: {file_url}')
print(f' 模型: {self.model_name}')
print(f' 正在分析...')
# 构建请求
payload = {
"model": self.model_name,
"input": [{
"role": "user",
"content": [
{
"type": "input_file",
"file_url": file_url
},
{
"type": "input_text",
"text": self._build_instruction(original_filename)
}
]
}]
}
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
# 发送请求
print(f' 模型: {self.model_name}')
print(f' 正在分析...')
response = requests.post(
self.api_url,
headers=headers,
data=json.dumps(payload),
timeout=180 # 3分钟超时
)
if response.status_code != 200:
raise Exception(f'API请求失败: {response.status_code} - {response.text}')
result = response.json()
# 提取题目
questions = self._extract_questions(result)
print(f' ✓ 解析完成,共 {len(questions)} 个题目')
return questions
def _build_instruction(self, filename):
"""构建分析指令"""
return f"""请从文件"{filename}"中提取所有题目信息并以JSON数组格式返回。
**提取要求:**
1. **题目识别**
- 如果文档中有"修改试题"作为分隔符,请以此分割题目
- 否则根据题目序号、题干、选项、答案、解析的结构识别每个题目
2. **字段提取**
- 题干:题目的问题部分
- 选项A、选项B、选项C、选项D选择题的选项填空题或解答题为空字符串
- 正确答案:答案内容(单选如"A",多选如"ABD"
- 解析:题目的解答过程
- 备注:提取"难度 难""属性:共享 难度:易 采用:是"等元数据
3. **数学公式格式要求(重要):**
- **所有数学公式必须转换为LaTeX格式**
- 内联公式使用 $...$ 包围
- 独立公式使用 $$...$$ 包围
- 示例:
* "𝑥²""$x^2$"
* "sin x""$\\sin x$"
* "∫₀¹ x dx""$\\int_0^1 x \\, dx$"
* "f(x) = x² + 1""$f(x) = x^2 + 1$"
* 分数a/b → "$\\frac{{a}}{{b}}$"
* 根号√x → "$\\sqrt{{x}}$"
* 上下标xₙ → "$x_n$"x² → "$x^2$"
4. **其他格式要求**
- 答案统一为大写字母(选择题)
- 移除答案前缀(如"答案:"""等)
- 保留其他文本格式
**返回格式只返回JSON数组**
```json
[
{{
"题干": "下列函数中定义域为R的是",
"选项A": "$y = \\frac{{1}}{{x}}$",
"选项B": "$y = \\sqrt{{x}}$",
"选项C": "$y = x^2$",
"选项D": "$y = \\ln(x)$",
"正确答案": "C",
"解析": "$x^2$对所有实数都有定义",
"备注": "难度 易"
}}
]
```
**注意请确保所有数学符号、公式都转换为LaTeX格式**"""
def _extract_questions(self, response):
"""从DMXAPI响应中提取题目"""
# 检查状态
if response.get('status') != 'completed':
raise ValueError(f'响应状态异常: {response.get("status")}')
# 提取文本内容
output = response.get('output', [])
text_content = None
for item in output:
if item.get('type') == 'message':
content = item.get('content', [])
for c in content:
if c.get('type') == 'output_text':
text_content = c.get('text', '')
break
if text_content:
break
if not text_content:
raise ValueError('未找到文本内容')
# 解析JSON
questions = self._parse_json(text_content)
# 补充缺失字段
for q in questions:
for field in ['选项A', '选项B', '选项C', '选项D', '解析', '备注']:
if field not in q:
q[field] = ''
return questions
def _parse_json(self, text):
"""从文本中解析JSON"""
# 查找JSON数组
start_idx = text.find('[')
end_idx = text.rfind(']')
if start_idx == -1 or end_idx == -1:
# 尝试查找被```包围的JSON
if '```json' in text:
lines = text.split('\n')
json_lines = []
in_json = False
for line in lines:
if '```json' in line:
in_json = True
continue
elif '```' in line and in_json:
break
elif in_json:
json_lines.append(line)
text = '\n'.join(json_lines)
start_idx = text.find('[')
end_idx = text.rfind(']')
if start_idx == -1 or end_idx == -1:
raise ValueError(f'未找到JSON数组\n文本: {text[:200]}...')
json_str = text[start_idx:end_idx + 1]
try:
questions = json.loads(json_str)
if not isinstance(questions, list):
raise ValueError('解析结果不是数组')
# 验证必填字段
for i, q in enumerate(questions):
if '题干' not in q or '正确答案' not in q:
raise ValueError(f'{i+1} 个题目缺少必填字段')
return questions
except json.JSONDecodeError as e:
raise ValueError(f'JSON解析失败: {e}\nJSON: {json_str[:300]}...')
def extract_metadata(filename):
"""从文件名提取元数据"""
from config.settings import TYPE_MAP
path = Path(filename)
basename = path.stem
# 解析文件名
separators = ['+', ' ', '-', '_']
parts = [basename]
for sep in separators:
if sep in basename:
parts = basename.split(sep)
break
secondary_knowledge = parts[0].strip() if len(parts) > 0 else ''
question_type_raw = parts[1].strip() if len(parts) > 1 else ''
difficulty = parts[2].strip() if len(parts) > 2 else ''
# 映射题型
question_type = TYPE_MAP.get(question_type_raw, question_type_raw)
valid_types = ['单选', '多选', '不定项', '填空', '解答']
if question_type not in valid_types and question_type_raw in valid_types:
question_type = question_type_raw
# 推断章节
chapter = ''
if '' in secondary_knowledge:
chapter = secondary_knowledge.split('')[0]
else:
chapter = secondary_knowledge
if not chapter:
chapter = '未分类'
return {
'章节': chapter,
'二级知识点': secondary_knowledge,
'题目类型': question_type,
'难度': difficulty,
'文件路径': filename
}
def save_to_database(questions, metadata):
"""保存题目到数据库"""
print(f' 保存题目到数据库...')
import re
db = get_db_manager()
saved_ids = []
for i, q_data in enumerate(questions, 1):
# 标准化答案
answer = q_data.get('正确答案', '').strip().upper()
answer = re.sub(r'[^A-D0-9]', '', answer)
sql = """
INSERT INTO questions
(章节, 一级知识点, 二级知识点, 题目类型, 难度, 题干,
选项A, 选项B, 选项C, 选项D, 正确答案, 解析, 备注, 文件路径)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
params = (
metadata['章节'], '', metadata['二级知识点'],
metadata['题目类型'], metadata['难度'], q_data.get('题干', ''),
q_data.get('选项A', ''), q_data.get('选项B', ''),
q_data.get('选项C', ''), q_data.get('选项D', ''),
answer, q_data.get('解析', ''),
q_data.get('备注', ''), metadata['文件路径']
)
question_id = db.execute(sql, params)
saved_ids.append(question_id)
print(f' ✓ 题目 {i}/{len(questions)}, ID={question_id}')
return saved_ids
def display_results(questions, metadata, saved_ids):
"""显示结果"""
print(f'\n[4/4] 完成')
print('='*60)
print(f'文件: {metadata["文件路径"]}')
print(f'\n元数据:')
print(f' 章节: {metadata["章节"]}')
print(f' 知识点: {metadata["二级知识点"]}')
print(f' 题型: {metadata["题目类型"]}')
print(f' 难度: {metadata["难度"]}')
print(f'\n题目列表 (共 {len(questions)} 个):')
print('='*60)
for i, q in enumerate(questions, 1):
print(f'\n题目 {i} (ID={saved_ids[i-1] if saved_ids else "未保存"}):')
print(f' 题干: {q["题干"][:60]}...')
if q.get('选项A'):
print(f' 选项A: {q["选项A"][:40]}...')
print(f' 答案: {q["正确答案"]}')
if q.get('备注'):
print(f' 备注: {q["备注"]}')
print('='*60)
print(f'\n✓ 所有题目已保存到数据库')
# 显示数据库统计
db = get_db_manager()
total = db.execute("SELECT COUNT(*) as count FROM questions", fetch_one=True)
print(f'✓ 数据库中共有 {total["count"]} 个题目')
def main():
"""主函数"""
if len(sys.argv) < 2:
print('用法: python dmxapi_parse.py <文件路径>')
print('\n示例:')
print(' python dmxapi_parse.py "函数的周期性 单选题 难.docx"')
print(' python dmxapi_parse.py test.pdf')
print('\n说明:')
print(' - 支持格式: PDF, Word (.docx, .doc)')
print(' - 文件名格式: 知识点+题型+难度.扩展名')
print(' - 不在本地解析直接上传到DMXAPI分析')
sys.exit(1)
file_path = sys.argv[1]
path = Path(file_path)
if not path.exists():
print(f'✗ 文件不存在: {file_path}')
sys.exit(1)
print('='*60)
print('DMXAPI 文件解析 (无本地解析)')
print('='*60)
try:
# 创建解析器
parser = DMXAPIParser()
# [1/4] 文件转换和上传在parse_file中完成
# [2/4] DMXAPI分析在parse_file中完成
questions = parser.parse_file(file_path)
if not questions:
print('\n✗ 未解析到任何题目')
sys.exit(1)
# [3/4] 提取元数据
print(f'\n[3/4] 提取元数据...')
metadata = extract_metadata(path.name)
# [4/4] 保存到数据库和显示
saved_ids = save_to_database(questions, metadata)
display_results(questions, metadata, saved_ids)
print('\n✓ 处理完成!')
sys.exit(0)
except Exception as e:
print(f'\n✗ 处理失败: {e}')
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == '__main__':
main()