182 lines
5.1 KiB
Python
182 lines
5.1 KiB
Python
"""Import job persistence: create, get, list, update, claim for FIFO worker."""
|
|
|
|
from datetime import datetime
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from backend.models import (
|
|
JOB_STATUS_FAILED,
|
|
JOB_STATUS_QUEUED,
|
|
JOB_STATUS_RUNNING,
|
|
ImportJob,
|
|
ImportJobItem,
|
|
)
|
|
|
|
|
|
def create_job(
|
|
db: Session,
|
|
method: str,
|
|
items: list[dict],
|
|
) -> ImportJob:
|
|
"""Create a new import job with items. items: [{"filename": str, "stored_path": str}, ...]."""
|
|
job = ImportJob(
|
|
status=JOB_STATUS_QUEUED,
|
|
method=method,
|
|
total=len(items),
|
|
processed=0,
|
|
success_count=0,
|
|
failed_count=0,
|
|
)
|
|
db.add(job)
|
|
db.flush()
|
|
for seq, it in enumerate(items, start=1):
|
|
db.add(
|
|
ImportJobItem(
|
|
job_id=job.id,
|
|
seq=seq,
|
|
filename=it.get("filename", ""),
|
|
stored_path=it.get("stored_path", ""),
|
|
status=JOB_STATUS_QUEUED,
|
|
)
|
|
)
|
|
db.commit()
|
|
db.refresh(job)
|
|
return job
|
|
|
|
|
|
def create_job_empty(db: Session, method: str) -> ImportJob:
|
|
"""Create job with no items; caller then adds items and sets total."""
|
|
job = ImportJob(
|
|
status=JOB_STATUS_QUEUED,
|
|
method=method,
|
|
total=0,
|
|
processed=0,
|
|
success_count=0,
|
|
failed_count=0,
|
|
)
|
|
db.add(job)
|
|
db.flush()
|
|
return job
|
|
|
|
|
|
def add_job_items(db: Session, job_id: int, items: list[dict]) -> None:
|
|
"""Append items to job and set job.total. items: [{"filename": str, "stored_path": str}, ...]."""
|
|
job = db.query(ImportJob).filter(ImportJob.id == job_id).first()
|
|
if not job:
|
|
return
|
|
for seq, it in enumerate(items, start=1):
|
|
db.add(
|
|
ImportJobItem(
|
|
job_id=job_id,
|
|
seq=seq,
|
|
filename=it.get("filename", ""),
|
|
stored_path=it.get("stored_path", ""),
|
|
status=JOB_STATUS_QUEUED,
|
|
)
|
|
)
|
|
job.total = len(items)
|
|
db.commit()
|
|
|
|
|
|
def get_job(db: Session, job_id: int) -> ImportJob | None:
|
|
"""Get job by id with items loaded."""
|
|
return db.query(ImportJob).filter(ImportJob.id == job_id).first()
|
|
|
|
|
|
def list_jobs(
|
|
db: Session,
|
|
statuses: list[str] | None = None,
|
|
limit: int = 50,
|
|
) -> list[ImportJob]:
|
|
"""List jobs, optionally filtered by status(es), newest first."""
|
|
q = db.query(ImportJob).order_by(ImportJob.created_at.desc())
|
|
if statuses:
|
|
q = q.filter(ImportJob.status.in_(statuses))
|
|
return q.limit(limit).all()
|
|
|
|
|
|
def update_job(db: Session, job_id: int, **kwargs) -> ImportJob | None:
|
|
"""Update job fields. Returns updated job or None if not found."""
|
|
job = db.query(ImportJob).filter(ImportJob.id == job_id).first()
|
|
if not job:
|
|
return None
|
|
for k, v in kwargs.items():
|
|
if hasattr(job, k):
|
|
setattr(job, k, v)
|
|
job.updated_at = datetime.utcnow()
|
|
db.commit()
|
|
db.refresh(job)
|
|
return job
|
|
|
|
|
|
def update_job_item(db: Session, item_id: int, **kwargs) -> ImportJobItem | None:
|
|
"""Update a single job item."""
|
|
item = db.query(ImportJobItem).filter(ImportJobItem.id == item_id).first()
|
|
if not item:
|
|
return None
|
|
for k, v in kwargs.items():
|
|
if hasattr(item, k):
|
|
setattr(item, k, v)
|
|
db.commit()
|
|
db.refresh(item)
|
|
return item
|
|
|
|
|
|
def claim_oldest_queued(db: Session) -> ImportJob | None:
|
|
"""
|
|
Claim the oldest queued job by setting status to running.
|
|
Returns the job if claimed, None if no queued job.
|
|
Used by single-worker FIFO loop. (SQLite-compatible: no row locking.)
|
|
"""
|
|
job = (
|
|
db.query(ImportJob)
|
|
.filter(ImportJob.status == JOB_STATUS_QUEUED)
|
|
.order_by(ImportJob.created_at.asc())
|
|
.first()
|
|
)
|
|
if not job:
|
|
return None
|
|
job.status = JOB_STATUS_RUNNING
|
|
job.started_at = datetime.utcnow()
|
|
job.updated_at = datetime.utcnow()
|
|
db.commit()
|
|
db.refresh(job)
|
|
return job
|
|
|
|
|
|
def get_queued_job_for_worker(db: Session) -> ImportJob | None:
|
|
"""
|
|
Get oldest queued job without locking (SQLite has limited FOR UPDATE support).
|
|
Caller must then update status to running in same or follow-up transaction.
|
|
"""
|
|
job = (
|
|
db.query(ImportJob)
|
|
.filter(ImportJob.status == JOB_STATUS_QUEUED)
|
|
.order_by(ImportJob.created_at.asc())
|
|
.first()
|
|
)
|
|
return job
|
|
|
|
|
|
def cancel_job(db: Session, job_id: int) -> ImportJob | None:
|
|
"""Set job status to cancelled if it is queued or running."""
|
|
job = db.query(ImportJob).filter(ImportJob.id == job_id).first()
|
|
if not job or job.status not in (JOB_STATUS_QUEUED, JOB_STATUS_RUNNING):
|
|
return None
|
|
job.status = "cancelled"
|
|
job.ended_at = datetime.utcnow()
|
|
job.updated_at = datetime.utcnow()
|
|
db.commit()
|
|
db.refresh(job)
|
|
return job
|
|
|
|
|
|
def get_failed_items(db: Session, job_id: int) -> list[ImportJobItem]:
|
|
"""Return items with status failed for retry."""
|
|
return (
|
|
db.query(ImportJobItem)
|
|
.filter(ImportJobItem.job_id == job_id, ImportJobItem.status == JOB_STATUS_FAILED)
|
|
.order_by(ImportJobItem.seq.asc())
|
|
.all()
|
|
)
|