from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy import func, or_ from sqlalchemy.orm import Session from backend.auth import get_current_user from backend.database import get_db from backend.models import Question from backend.schemas import ( BatchDeleteRequest, BatchUpdateRequest, QuestionCreate, QuestionOut, QuestionUpdate, ) router = APIRouter(dependencies=[Depends(get_current_user)]) @router.get("") def list_questions( page: int = Query(default=1, ge=1), page_size: int = Query(default=20, ge=1, le=200), keyword: str = "", chapter: str = "", secondary_knowledge: str = "", question_type: str = "", difficulty: str = "", db: Session = Depends(get_db), ) -> dict: query = db.query(Question) if keyword: like = f"%{keyword}%" query = query.filter( or_( Question.stem.like(like), Question.option_a.like(like), Question.option_b.like(like), Question.option_c.like(like), Question.option_d.like(like), Question.explanation.like(like), ) ) if chapter: query = query.filter(Question.chapter == chapter) if secondary_knowledge: query = query.filter(Question.secondary_knowledge == secondary_knowledge) if question_type: query = query.filter(Question.question_type == question_type) if difficulty: query = query.filter(Question.difficulty == difficulty) total = query.with_entities(func.count(Question.id)).scalar() or 0 items = ( query.order_by(Question.updated_at.desc()) .offset((page - 1) * page_size) .limit(page_size) .all() ) return {"total": total, "items": [QuestionOut.model_validate(i).model_dump() for i in items]} @router.get("/id/{question_id}", response_model=QuestionOut) def get_question(question_id: int, db: Session = Depends(get_db)) -> QuestionOut: item = db.get(Question, question_id) if not item: raise HTTPException(status_code=404, detail="题目不存在") return QuestionOut.model_validate(item) @router.post("", response_model=QuestionOut) def create_question(payload: QuestionCreate, db: Session = Depends(get_db)) -> QuestionOut: item = Question(**payload.model_dump()) db.add(item) db.commit() db.refresh(item) return QuestionOut.model_validate(item) @router.delete("/batch") def batch_delete(payload: BatchDeleteRequest, db: Session = Depends(get_db)) -> dict: if not payload.ids: return {"deleted": 0} deleted = db.query(Question).filter(Question.id.in_(payload.ids)).delete(synchronize_session=False) db.commit() return {"deleted": deleted} @router.put("/batch/update") def batch_update(payload: BatchUpdateRequest, db: Session = Depends(get_db)) -> dict: if not payload.ids: return {"updated": 0} updates = payload.model_dump(exclude_none=True, exclude={"ids"}) if not updates: return {"updated": 0} updated = db.query(Question).filter(Question.id.in_(payload.ids)).update( updates, synchronize_session=False ) db.commit() return {"updated": updated} @router.get("/meta/options") def get_filter_options(db: Session = Depends(get_db)) -> dict: def distinct_values(column) -> list[str]: rows = db.query(column).filter(column != "").distinct().all() return [row[0] for row in rows] return { "chapters": distinct_values(Question.chapter), "secondary_knowledge_list": distinct_values(Question.secondary_knowledge), "question_types": distinct_values(Question.question_type), "difficulties": distinct_values(Question.difficulty), } @router.put("/{question_id}", response_model=QuestionOut) def update_question(question_id: int, payload: QuestionUpdate, db: Session = Depends(get_db)) -> QuestionOut: item = db.get(Question, question_id) if not item: raise HTTPException(status_code=404, detail="题目不存在") update_data = payload.model_dump(exclude_none=True) for key, value in update_data.items(): setattr(item, key, value) db.commit() db.refresh(item) return QuestionOut.model_validate(item) @router.delete("/{question_id}") def delete_question(question_id: int, db: Session = Depends(get_db)) -> dict: item = db.get(Question, question_id) if not item: raise HTTPException(status_code=404, detail="题目不存在") db.delete(item) db.commit() return {"ok": True}