""" Import queue: single-consumer FIFO worker and job execution. Run run_worker_loop() in a background thread; on startup call reset_stale_running_jobs(). """ from datetime import datetime from pathlib import Path import time from sqlalchemy.orm import Session from backend.database import SessionLocal from backend.models import ( JOB_STATUS_FAILED, JOB_STATUS_QUEUED, JOB_STATUS_RUNNING, JOB_STATUS_SUCCESS, ImportHistory, ImportJob, ImportJobItem, Question, ) from backend.repositories import import_job_repo as repo from backend.services.excel_service import parse_excel_file from backend.services.parser import OpenAICompatibleParserService, extract_metadata def _build_ai_rows(path: Path) -> list[dict]: parser = OpenAICompatibleParserService() metadata = extract_metadata(path.name) questions = parser.parse_file(str(path)) rows = [] for q in questions: rows.append( { "chapter": metadata["chapter"], "primary_knowledge": "", "secondary_knowledge": metadata["secondary_knowledge"], "question_type": metadata["question_type"], "difficulty": metadata["difficulty"], "stem": q.get("题干", ""), "option_a": q.get("选项A", ""), "option_b": q.get("选项B", ""), "option_c": q.get("选项C", ""), "option_d": q.get("选项D", ""), "answer": q.get("正确答案", ""), "explanation": q.get("解析", ""), "notes": q.get("备注", ""), "source_file": metadata["source_file"], } ) return rows def _process_one_item( db: Session, job: ImportJob, item: ImportJobItem, method: str, ) -> None: path = Path(item.stored_path) filename = item.filename job.current_file = filename job.current_index = item.seq job.updated_at = datetime.utcnow() item.status = JOB_STATUS_RUNNING item.started_at = datetime.utcnow() db.commit() try: if method == "excel": if path.suffix.lower() not in [".xlsx", ".xlsm", ".xltx", ".xltm"]: raise ValueError("仅支持 Excel 文件") rows = parse_excel_file(path) else: rows = _build_ai_rows(path) questions = [Question(**row) for row in rows] if questions: db.add_all(questions) db.add( ImportHistory( filename=filename, method=method, question_count=len(questions), status="success", ) ) db.commit() item.status = JOB_STATUS_SUCCESS item.question_count = len(questions) item.ended_at = datetime.utcnow() job.success_count += 1 job.processed += 1 job.updated_at = datetime.utcnow() db.commit() except Exception as exc: db.rollback() db.add( ImportHistory( filename=filename, method=method, question_count=0, status="failed", ) ) db.commit() item.status = JOB_STATUS_FAILED item.error = str(exc) item.ended_at = datetime.utcnow() job.failed_count += 1 job.processed += 1 job.updated_at = datetime.utcnow() db.commit() def process_job(db: Session, job_id: int) -> None: """Execute a single job: process all items in order, then set job terminal status.""" job = repo.get_job(db, job_id) if not job or job.status != JOB_STATUS_RUNNING: return method = job.method items = sorted(job.items, key=lambda x: x.seq) # Resume: ensure processed/success_count/failed_count reflect already-completed items job.processed = sum(1 for it in items if it.status in (JOB_STATUS_SUCCESS, JOB_STATUS_FAILED)) job.success_count = sum(1 for it in items if it.status == JOB_STATUS_SUCCESS) job.failed_count = sum(1 for it in items if it.status == JOB_STATUS_FAILED) db.commit() for item in items: if item.status in (JOB_STATUS_SUCCESS, JOB_STATUS_FAILED): continue _process_one_item(db, job, item, method) db.refresh(job) job = repo.get_job(db, job_id) if not job: return if job.failed_count > 0 and job.success_count == 0: job.status = JOB_STATUS_FAILED job.error = "部分或全部文件处理失败" else: job.status = JOB_STATUS_SUCCESS job.error = "" job.ended_at = datetime.utcnow() job.current_file = "" job.updated_at = datetime.utcnow() db.commit() def reset_stale_running_jobs(db: Session) -> int: """On startup: set any job left in 'running' back to 'queued' so worker can pick it up.""" count = 0 for job in db.query(ImportJob).filter(ImportJob.status == JOB_STATUS_RUNNING).all(): job.status = JOB_STATUS_QUEUED count += 1 if count: db.commit() return count def run_worker_loop(interval_seconds: float = 1.0) -> None: """ Single-consumer FIFO loop. Call from a background thread. Claims oldest queued job, processes it, then repeats. Sleeps when no job. """ while True: db = SessionLocal() try: job = repo.claim_oldest_queued(db) if job: process_job(db, job.id) else: time.sleep(interval_seconds) except Exception: if db: db.rollback() time.sleep(interval_seconds) finally: db.close()