"""Report generation: Excel / Word / PDF.""" import uuid from pathlib import Path from uuid import UUID from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.core.config import settings from app.models.assessment import FraudAssessment, ReviewStatus from app.models.transaction import TransactionRecord from app.models.report import ExportReport, ReportType from app.schemas.report import ReportCreate async def generate_report(case_id: UUID, body: ReportCreate, db: AsyncSession) -> ExportReport: result = await db.execute( select(ExportReport) .where(ExportReport.case_id == case_id, ExportReport.report_type == body.report_type) ) existing = list(result.scalars().all()) version = len(existing) + 1 report_dir = settings.upload_path / str(case_id) / "reports" report_dir.mkdir(parents=True, exist_ok=True) if body.report_type == ReportType.excel: file_path = await _gen_excel(case_id, report_dir, db) elif body.report_type == ReportType.word: file_path = await _gen_word(case_id, report_dir, db) else: file_path = await _gen_pdf_placeholder(case_id, report_dir) relative = str(file_path.relative_to(settings.upload_path)) # snapshot confirmed assessments snap_result = await db.execute( select(FraudAssessment).where( FraudAssessment.case_id == case_id, FraudAssessment.review_status == ReviewStatus.confirmed, ) ) snapshot = [ {"amount": float(a.assessed_amount), "reason": a.reason} for a in snap_result.scalars().all() ] report = ExportReport( case_id=case_id, report_type=body.report_type, file_path=relative, version=version, content_snapshot={"assessments": snapshot}, ) db.add(report) await db.flush() await db.refresh(report) return report async def _gen_excel(case_id: UUID, report_dir: Path, db: AsyncSession) -> Path: from openpyxl import Workbook wb = Workbook() # Sheet 1: Summary ws = wb.active ws.title = "被骗金额汇总" ws.append(["交易时间", "金额(元)", "方向", "对方", "来源APP", "备注", "置信度", "认定理由"]) assessments_result = await db.execute( select(FraudAssessment).where( FraudAssessment.case_id == case_id, FraudAssessment.review_status == ReviewStatus.confirmed, ) ) for a in assessments_result.scalars().all(): tx = await db.get(TransactionRecord, a.transaction_id) if tx: ws.append([ tx.trade_time.strftime("%Y-%m-%d %H:%M:%S"), float(a.assessed_amount), "支出" if tx.direction.value == "out" else "收入", tx.counterparty_name, tx.source_app.value, tx.remark, tx.confidence, a.reason[:100], ]) # Sheet 2: All transactions ws2 = wb.create_sheet("交易明细") ws2.append(["交易时间", "金额", "方向", "对方", "来源", "订单号", "是否重复", "是否中转"]) tx_result = await db.execute( select(TransactionRecord).where(TransactionRecord.case_id == case_id) ) for tx in tx_result.scalars().all(): ws2.append([ tx.trade_time.strftime("%Y-%m-%d %H:%M:%S"), float(tx.amount), tx.direction.value, tx.counterparty_name, tx.source_app.value, tx.order_no, "是" if tx.is_duplicate else "否", "是" if tx.is_transit else "否", ]) file_path = report_dir / f"report_{uuid.uuid4().hex[:8]}.xlsx" wb.save(file_path) return file_path async def _gen_word(case_id: UUID, report_dir: Path, db: AsyncSession) -> Path: from docx import Document doc = Document() doc.add_heading("受害人被骗金额汇总报告", level=1) assessments_result = await db.execute( select(FraudAssessment).where( FraudAssessment.case_id == case_id, FraudAssessment.review_status == ReviewStatus.confirmed, ) ) confirmed = list(assessments_result.scalars().all()) total = sum(float(a.assessed_amount) for a in confirmed) doc.add_paragraph(f"已确认被骗金额: ¥{total:,.2f}") doc.add_paragraph(f"已确认交易笔数: {len(confirmed)}") table = doc.add_table(rows=1, cols=4) table.style = "Table Grid" hdr = table.rows[0].cells hdr[0].text = "交易时间" hdr[1].text = "金额(元)" hdr[2].text = "对方" hdr[3].text = "认定理由" for a in confirmed: tx = await db.get(TransactionRecord, a.transaction_id) row = table.add_row().cells row[0].text = tx.trade_time.strftime("%Y-%m-%d %H:%M") if tx else "" row[1].text = f"{float(a.assessed_amount):,.2f}" row[2].text = tx.counterparty_name if tx else "" row[3].text = a.reason[:80] file_path = report_dir / f"report_{uuid.uuid4().hex[:8]}.docx" doc.save(file_path) return file_path async def _gen_pdf_placeholder(case_id: UUID, report_dir: Path) -> Path: file_path = report_dir / f"report_{uuid.uuid4().hex[:8]}.pdf" file_path.write_text("PDF report placeholder – integrate weasyprint/reportlab for production.") return file_path