Files
problem-bank/backend/repositories/import_job_repo.py
2026-03-06 15:52:34 +08:00

182 lines
5.1 KiB
Python

"""Import job persistence: create, get, list, update, claim for FIFO worker."""
from datetime import datetime
from sqlalchemy.orm import Session
from backend.models import (
JOB_STATUS_FAILED,
JOB_STATUS_QUEUED,
JOB_STATUS_RUNNING,
ImportJob,
ImportJobItem,
)
def create_job(
db: Session,
method: str,
items: list[dict],
) -> ImportJob:
"""Create a new import job with items. items: [{"filename": str, "stored_path": str}, ...]."""
job = ImportJob(
status=JOB_STATUS_QUEUED,
method=method,
total=len(items),
processed=0,
success_count=0,
failed_count=0,
)
db.add(job)
db.flush()
for seq, it in enumerate(items, start=1):
db.add(
ImportJobItem(
job_id=job.id,
seq=seq,
filename=it.get("filename", ""),
stored_path=it.get("stored_path", ""),
status=JOB_STATUS_QUEUED,
)
)
db.commit()
db.refresh(job)
return job
def create_job_empty(db: Session, method: str) -> ImportJob:
"""Create job with no items; caller then adds items and sets total."""
job = ImportJob(
status=JOB_STATUS_QUEUED,
method=method,
total=0,
processed=0,
success_count=0,
failed_count=0,
)
db.add(job)
db.flush()
return job
def add_job_items(db: Session, job_id: int, items: list[dict]) -> None:
"""Append items to job and set job.total. items: [{"filename": str, "stored_path": str}, ...]."""
job = db.query(ImportJob).filter(ImportJob.id == job_id).first()
if not job:
return
for seq, it in enumerate(items, start=1):
db.add(
ImportJobItem(
job_id=job_id,
seq=seq,
filename=it.get("filename", ""),
stored_path=it.get("stored_path", ""),
status=JOB_STATUS_QUEUED,
)
)
job.total = len(items)
db.commit()
def get_job(db: Session, job_id: int) -> ImportJob | None:
"""Get job by id with items loaded."""
return db.query(ImportJob).filter(ImportJob.id == job_id).first()
def list_jobs(
db: Session,
statuses: list[str] | None = None,
limit: int = 50,
) -> list[ImportJob]:
"""List jobs, optionally filtered by status(es), newest first."""
q = db.query(ImportJob).order_by(ImportJob.created_at.desc())
if statuses:
q = q.filter(ImportJob.status.in_(statuses))
return q.limit(limit).all()
def update_job(db: Session, job_id: int, **kwargs) -> ImportJob | None:
"""Update job fields. Returns updated job or None if not found."""
job = db.query(ImportJob).filter(ImportJob.id == job_id).first()
if not job:
return None
for k, v in kwargs.items():
if hasattr(job, k):
setattr(job, k, v)
job.updated_at = datetime.utcnow()
db.commit()
db.refresh(job)
return job
def update_job_item(db: Session, item_id: int, **kwargs) -> ImportJobItem | None:
"""Update a single job item."""
item = db.query(ImportJobItem).filter(ImportJobItem.id == item_id).first()
if not item:
return None
for k, v in kwargs.items():
if hasattr(item, k):
setattr(item, k, v)
db.commit()
db.refresh(item)
return item
def claim_oldest_queued(db: Session) -> ImportJob | None:
"""
Claim the oldest queued job by setting status to running.
Returns the job if claimed, None if no queued job.
Used by single-worker FIFO loop. (SQLite-compatible: no row locking.)
"""
job = (
db.query(ImportJob)
.filter(ImportJob.status == JOB_STATUS_QUEUED)
.order_by(ImportJob.created_at.asc())
.first()
)
if not job:
return None
job.status = JOB_STATUS_RUNNING
job.started_at = datetime.utcnow()
job.updated_at = datetime.utcnow()
db.commit()
db.refresh(job)
return job
def get_queued_job_for_worker(db: Session) -> ImportJob | None:
"""
Get oldest queued job without locking (SQLite has limited FOR UPDATE support).
Caller must then update status to running in same or follow-up transaction.
"""
job = (
db.query(ImportJob)
.filter(ImportJob.status == JOB_STATUS_QUEUED)
.order_by(ImportJob.created_at.asc())
.first()
)
return job
def cancel_job(db: Session, job_id: int) -> ImportJob | None:
"""Set job status to cancelled if it is queued or running."""
job = db.query(ImportJob).filter(ImportJob.id == job_id).first()
if not job or job.status not in (JOB_STATUS_QUEUED, JOB_STATUS_RUNNING):
return None
job.status = "cancelled"
job.ended_at = datetime.utcnow()
job.updated_at = datetime.utcnow()
db.commit()
db.refresh(job)
return job
def get_failed_items(db: Session, job_id: int) -> list[ImportJobItem]:
"""Return items with status failed for retry."""
return (
db.query(ImportJobItem)
.filter(ImportJobItem.job_id == job_id, ImportJobItem.status == JOB_STATUS_FAILED)
.order_by(ImportJobItem.seq.asc())
.all()
)