36 lines
1.3 KiB
Python
36 lines
1.3 KiB
Python
from sqlalchemy import select, func, or_
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.models.case import Case, CaseStatus
|
|
from app.repositories.base import BaseRepository
|
|
|
|
|
|
class CaseRepository(BaseRepository[Case]):
|
|
def __init__(self, session: AsyncSession):
|
|
super().__init__(Case, session)
|
|
|
|
async def list_cases(
|
|
self,
|
|
offset: int = 0,
|
|
limit: int = 50,
|
|
status: CaseStatus | None = None,
|
|
search: str | None = None,
|
|
) -> tuple[list[Case], int]:
|
|
query = select(Case).where(Case.deleted_at.is_(None))
|
|
count_query = select(func.count()).select_from(Case).where(Case.deleted_at.is_(None))
|
|
|
|
if status:
|
|
query = query.where(Case.status == status)
|
|
count_query = count_query.where(Case.status == status)
|
|
|
|
if search:
|
|
pattern = f"%{search}%"
|
|
search_filter = or_(Case.case_no.ilike(pattern), Case.title.ilike(pattern))
|
|
query = query.where(search_filter)
|
|
count_query = count_query.where(search_filter)
|
|
|
|
total = (await self.session.execute(count_query)).scalar() or 0
|
|
query = query.order_by(Case.created_at.desc()).offset(offset).limit(limit)
|
|
result = await self.session.execute(query)
|
|
return list(result.scalars().all()), total
|