"""Report generation: Excel / PDF.""" import uuid from pathlib import Path from uuid import UUID from datetime import datetime from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.core.config import settings from app.models.assessment import FraudAssessment, ReviewStatus from app.models.case import Case from app.models.transaction import TransactionRecord from app.models.report import ExportReport, ReportType from app.schemas.report import ReportCreate from app.services.flow_service import build_flow_graph async def generate_report(case_id: UUID, body: ReportCreate, db: AsyncSession) -> ExportReport: result = await db.execute( select(ExportReport) .where(ExportReport.case_id == case_id, ExportReport.report_type == body.report_type) ) existing = list(result.scalars().all()) version = len(existing) + 1 report_dir = settings.upload_path / str(case_id) / "reports" report_dir.mkdir(parents=True, exist_ok=True) if body.report_type == ReportType.excel: file_path = await _gen_excel(case_id, report_dir, body, db) elif body.report_type == ReportType.pdf: file_path = await _gen_pdf(case_id, report_dir, body, db) else: raise ValueError(f"Unsupported report_type: {body.report_type}") relative = str(file_path.relative_to(settings.upload_path)) snap_result = await db.execute( select(FraudAssessment).where( FraudAssessment.case_id == case_id, FraudAssessment.review_status == ReviewStatus.confirmed, ) ) snapshot = [ {"amount": float(a.assessed_amount), "reason": a.reason} for a in snap_result.scalars().all() ] report = ExportReport( case_id=case_id, report_type=body.report_type, file_path=relative, version=version, content_snapshot={"assessments": snapshot}, ) db.add(report) await db.flush() await db.refresh(report) return report # ── helpers ── async def _get_confirmed(case_id: UUID, db: AsyncSession) -> list[FraudAssessment]: r = await db.execute( select(FraudAssessment).where( FraudAssessment.case_id == case_id, FraudAssessment.review_status == ReviewStatus.confirmed, ) ) return list(r.scalars().all()) async def _get_all_assessments(case_id: UUID, db: AsyncSession) -> dict: r = await db.execute(select(FraudAssessment).where(FraudAssessment.case_id == case_id)) return {a.transaction_id: a for a in r.scalars().all()} async def _get_all_transactions(case_id: UUID, db: AsyncSession) -> list[TransactionRecord]: r = await db.execute( select(TransactionRecord).where(TransactionRecord.case_id == case_id) ) return list(r.scalars().all()) async def _get_inquiry_suggestions(case_id: UUID, db: AsyncSession) -> list[str]: from app.services.assessment_service import generate_inquiry_suggestions return await generate_inquiry_suggestions(case_id, db) def _fmt_dt(value: datetime | None, with_seconds: bool = False) -> str: if not value: return "-" return value.strftime("%Y-%m-%d %H:%M:%S" if with_seconds else "%Y-%m-%d %H:%M") def _mask_text(value: str, keep: int = 12) -> str: text = (value or "").strip() if len(text) <= keep: return text return f"{text[:keep]}..." def _build_record_qa_draft(transactions: list[TransactionRecord], assessment_by_tx: dict) -> str: candidates = [tx for tx in transactions if not tx.is_duplicate] if not candidates: return "问:具体的转账信息?\n答:目前暂无可确认的转账信息。" candidates = sorted(candidates, key=lambda x: x.trade_time or datetime.min) lines: list[str] = [] for idx, tx in enumerate(candidates, 1): fa = assessment_by_tx.get(tx.id) status = fa.review_status.value if fa else "pending" cp_account = f",对方账号{tx.counterparty_account}" if tx.counterparty_account else "" order_no = f",订单号{tx.order_no}" if tx.order_no else "" remark = f",备注“{tx.remark}”" if tx.remark else "" lines.append( f"第{idx}笔是{_fmt_dt(tx.trade_time, with_seconds=True)}通过{tx.source_app.value}" f"{'转出' if tx.direction.value == 'out' else '转入'}人民币{float(tx.amount):,.2f}元至{tx.counterparty_name}" f"{cp_account}{order_no}{remark},当前认定状态为{status}。" ) return "问:具体的转账信息?\n答:" + "".join(lines) async def _build_report_dataset(case_id: UUID, db: AsyncSession) -> dict: case_obj = await db.get(Case, case_id) transactions = await _get_all_transactions(case_id, db) transactions_sorted = sorted(transactions, key=lambda x: x.trade_time or datetime.min) assessment_by_tx = await _get_all_assessments(case_id, db) all_assessments = list(assessment_by_tx.values()) confirmed = [a for a in all_assessments if a.review_status == ReviewStatus.confirmed and float(a.assessed_amount) > 0] pending = [a for a in all_assessments if a.review_status in {ReviewStatus.pending, ReviewStatus.needs_info}] rejected = [a for a in all_assessments if a.review_status == ReviewStatus.rejected] confirmed_total = sum(float(a.assessed_amount) for a in confirmed) pending_total = sum(float(a.assessed_amount) for a in pending if float(a.assessed_amount) > 0) rejected_total = sum(float(a.assessed_amount) for a in rejected if float(a.assessed_amount) > 0) valid_for_timeline = [tx for tx in transactions_sorted if not tx.is_duplicate] start_time = valid_for_timeline[0].trade_time if valid_for_timeline else None end_time = valid_for_timeline[-1].trade_time if valid_for_timeline else None level_count = {"high": 0, "medium": 0, "low": 0} for a in all_assessments: key = a.confidence_level.value if key in level_count: level_count[key] += 1 flow_graph = await build_flow_graph(case_id, db) return { "case": case_obj, "transactions": transactions_sorted, "assessment_by_tx": assessment_by_tx, "confirmed": confirmed, "pending": pending, "rejected": rejected, "confirmed_total": confirmed_total, "pending_total": pending_total, "rejected_total": rejected_total, "level_count": level_count, "start_time": start_time, "end_time": end_time, "flow_graph": flow_graph, } # ── Excel ── async def _gen_excel(case_id: UUID, report_dir: Path, body: ReportCreate, db: AsyncSession) -> Path: from openpyxl import Workbook wb = Workbook() sheet_created = False if body.include_summary: ws = wb.active if not sheet_created else wb.create_sheet() ws.title = "被骗金额汇总" ws.append(["交易时间", "金额(元)", "方向", "对方", "来源APP", "备注", "置信度", "认定理由"]) for a in await _get_confirmed(case_id, db): tx = await db.get(TransactionRecord, a.transaction_id) if tx: ws.append([ tx.trade_time.strftime("%Y-%m-%d %H:%M:%S"), float(a.assessed_amount), "支出" if tx.direction.value == "out" else "收入", tx.counterparty_name, tx.source_app.value, tx.remark, tx.confidence, a.reason[:100], ]) sheet_created = True if body.include_transactions: ws2 = wb.active if not sheet_created else wb.create_sheet() if not sheet_created: ws2.title = "交易明细" else: ws2.title = "交易明细" ws2.append([ "交易ID", "证据图片ID", "交易时间", "金额(元)", "方向", "来源APP", "对方名称", "对方账号", "本方账户尾号", "订单号", "备注", "识别置信度", "是否重复", "是否中转", "聚类ID", "记录创建时间", "认定置信等级", "认定金额(元)", "认定理由", "排除条件", "复核状态", "复核备注", "复核人", "复核时间", ]) assessment_by_tx = await _get_all_assessments(case_id, db) for tx in await _get_all_transactions(case_id, db): fa = assessment_by_tx.get(tx.id) ws2.append([ str(tx.id), str(tx.evidence_image_id) if tx.evidence_image_id else "", tx.trade_time.strftime("%Y-%m-%d %H:%M:%S"), float(tx.amount), tx.direction.value, tx.source_app.value, tx.counterparty_name, tx.counterparty_account, tx.self_account_tail_no, tx.order_no, tx.remark, float(tx.confidence), "是" if tx.is_duplicate else "否", "是" if tx.is_transit else "否", str(tx.cluster_id) if tx.cluster_id else "", tx.created_at.strftime("%Y-%m-%d %H:%M:%S") if tx.created_at else "", fa.confidence_level.value if fa else "", float(fa.assessed_amount) if fa else "", fa.reason if fa else "", fa.exclude_reason if fa else "", fa.review_status.value if fa else "", fa.review_note if fa else "", fa.reviewed_by if fa else "", fa.reviewed_at.strftime("%Y-%m-%d %H:%M:%S") if (fa and fa.reviewed_at) else "", ]) sheet_created = True if body.include_reasons: ws3 = wb.active if not sheet_created else wb.create_sheet() ws3.title = "认定理由与排除说明" ws3.append(["交易时间", "金额(元)", "置信等级", "认定理由", "排除条件", "复核状态"]) assessment_by_tx = await _get_all_assessments(case_id, db) for tx in await _get_all_transactions(case_id, db): fa = assessment_by_tx.get(tx.id) if not fa: continue ws3.append([ tx.trade_time.strftime("%Y-%m-%d %H:%M:%S"), float(tx.amount), fa.confidence_level.value, fa.reason, fa.exclude_reason, fa.review_status.value, ]) sheet_created = True if body.include_inquiry: ws4 = wb.active if not sheet_created else wb.create_sheet() ws4.title = "笔录辅助问询建议" ws4.append(["序号", "问询建议"]) suggestions = await _get_inquiry_suggestions(case_id, db) for i, s in enumerate(suggestions, 1): ws4.append([i, s]) sheet_created = True if body.include_record_qa: ws5 = wb.active if not sheet_created else wb.create_sheet() ws5.title = "笔录问答草稿" ws5.append(["问答内容"]) dataset = await _build_report_dataset(case_id, db) ws5.append([_build_record_qa_draft(dataset["transactions"], dataset["assessment_by_tx"])]) sheet_created = True if not sheet_created: ws = wb.active ws.title = "报告" ws.append(["未选择任何报告内容"]) file_path = report_dir / f"report_{uuid.uuid4().hex[:8]}.xlsx" wb.save(file_path) return file_path # ── PDF ── async def _gen_pdf(case_id: UUID, report_dir: Path, body: ReportCreate, db: AsyncSession) -> Path: from reportlab.lib.pagesizes import A4 from reportlab.lib import colors from reportlab.lib.units import mm from reportlab.platypus import SimpleDocTemplate, Table as RLTable, TableStyle, Paragraph, Spacer from reportlab.lib.styles import getSampleStyleSheet from reportlab.pdfbase import pdfmetrics from reportlab.pdfbase.ttfonts import TTFont from reportlab.pdfbase.cidfonts import UnicodeCIDFont import os font_registered = False for font_path in [ "/System/Library/Fonts/PingFang.ttc", "/System/Library/Fonts/STHeiti Medium.ttc", "/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.ttc", "/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc", ]: if os.path.exists(font_path): try: pdfmetrics.registerFont(TTFont("ChineseFont", font_path, subfontIndex=0)) font_registered = True break except Exception: continue if not font_registered: # Docker images often do not ship Chinese system fonts. # Use built-in CID font as a portable fallback. try: pdfmetrics.registerFont(UnicodeCIDFont("STSong-Light")) font_name = "STSong-Light" font_registered = True except Exception: font_name = "Helvetica" else: font_name = "ChineseFont" styles = getSampleStyleSheet() title_style = styles["Title"] title_style.fontName = font_name normal_style = styles["Normal"] normal_style.fontName = font_name h2_style = styles["Heading2"] h2_style.fontName = font_name file_path = report_dir / f"report_{uuid.uuid4().hex[:8]}.pdf" doc = SimpleDocTemplate(str(file_path), pagesize=A4, leftMargin=15 * mm, rightMargin=15 * mm, topMargin=20 * mm, bottomMargin=20 * mm) elements = [] dataset = await _build_report_dataset(case_id, db) case_obj = dataset["case"] transactions = dataset["transactions"] tx_by_id = {tx.id: tx for tx in transactions} assessment_by_tx = dataset["assessment_by_tx"] confirmed = dataset["confirmed"] pending = dataset["pending"] rejected = dataset["rejected"] flow_graph = dataset["flow_graph"] level_count = dataset["level_count"] def _make_table(header, rows, col_widths): data = [header] + rows tbl = RLTable(data, colWidths=col_widths, repeatRows=1) tbl.setStyle(TableStyle([ ("FONTNAME", (0, 0), (-1, -1), font_name), ("FONTSIZE", (0, 0), (-1, -1), 8), ("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#1677ff")), ("TEXTCOLOR", (0, 0), (-1, 0), colors.white), ("GRID", (0, 0), (-1, -1), 0.5, colors.grey), ("VALIGN", (0, 0), (-1, -1), "TOP"), ("TOPPADDING", (0, 0), (-1, -1), 3), ("BOTTOMPADDING", (0, 0), (-1, -1), 3), ])) return tbl # Section 1: cover and case meta elements.append(Paragraph("受害人被骗金额归集与认定报告", title_style)) elements.append(Spacer(1, 4 * mm)) elements.append(Paragraph(f"案件编号:{case_obj.case_no if case_obj else '-'}", normal_style)) elements.append(Paragraph(f"案件名称:{case_obj.title if case_obj else '-'}", normal_style)) elements.append(Paragraph(f"受害人:{case_obj.victim_name if case_obj else '-'} 承办人:{case_obj.handler if case_obj else '-'}", normal_style)) elements.append(Paragraph(f"报告版本:v{len((await db.execute(select(ExportReport).where(ExportReport.case_id == case_id, ExportReport.report_type == ReportType.pdf))).scalars().all()) + 1}", normal_style)) elements.append(Paragraph(f"生成时间:{_fmt_dt(datetime.now(), with_seconds=True)}", normal_style)) elements.append(Spacer(1, 5 * mm)) # Section 2: scope and statistical boundaries elements.append(Paragraph("统计口径与数据范围", h2_style)) elements.append(Paragraph( f"证据截图数:{case_obj.image_count if case_obj else 0} 张;交易记录数:{len(transactions)} 笔;统计时间范围:{_fmt_dt(dataset['start_time'])} 至 {_fmt_dt(dataset['end_time'])}。", normal_style, )) elements.append(Paragraph("纳入口径:复核状态为“已确认”且认定金额大于 0 的交易。", normal_style)) elements.append(Paragraph("排除口径:复核状态为“已排除”或规则判定为中转/收入等不应计损交易。", normal_style)) elements.append(Paragraph("待复核口径:状态为“待复核/需补充”的交易,暂不纳入最终被骗金额。", normal_style)) elements.append(Paragraph("说明:PDF 报告不包含原始截图附件,仅在明细中保留证据索引字段。", normal_style)) elements.append(Spacer(1, 5 * mm)) # Section 3: executive summary if body.include_summary: elements.append(Paragraph("核心结论摘要", h2_style)) elements.append(Paragraph(f"已确认被骗金额:¥{dataset['confirmed_total']:,.2f}", normal_style)) elements.append(Paragraph(f"待复核金额:¥{dataset['pending_total']:,.2f}", normal_style)) elements.append(Paragraph(f"已排除金额:¥{dataset['rejected_total']:,.2f}", normal_style)) elements.append( Paragraph( f"已确认 {len(confirmed)} 笔,待复核 {len(pending)} 笔,已排除 {len(rejected)} 笔;置信分布:高 {level_count['high']} / 中 {level_count['medium']} / 低 {level_count['low']}", normal_style, ) ) summary_header = ["交易时间", "认定金额(元)", "对方", "认定理由"] summary_rows = [] for a in confirmed[:30]: tx = tx_by_id.get(a.transaction_id) summary_rows.append([ _fmt_dt(tx.trade_time if tx else None), f"{float(a.assessed_amount):,.2f}", _mask_text(tx.counterparty_name if tx else "", 14), Paragraph(_mask_text(a.reason, 80), normal_style), ]) if summary_rows: elements.append(_make_table(summary_header, summary_rows, [36 * mm, 28 * mm, 34 * mm, 72 * mm])) else: elements.append(Paragraph("暂无已确认被骗交易。", normal_style)) elements.append(Spacer(1, 6 * mm)) # Section 4: flow summary (textual) if body.include_flow_chart: elements.append(Paragraph("资金路径分析", h2_style)) elements.append( Paragraph( f"节点数:{len(flow_graph.nodes)},路径边数:{len(flow_graph.edges)}。以下列出关键资金流向(按金额降序,最多10条)。", normal_style, ) ) node_label_map = {n.id: n.label for n in flow_graph.nodes} sorted_edges = sorted(flow_graph.edges, key=lambda e: float(e.amount), reverse=True)[:10] edge_rows = [ [ _mask_text(node_label_map.get(e.source, e.source), 12), _mask_text(node_label_map.get(e.target, e.target), 12), f"{float(e.amount):,.2f}", str(e.count), e.trade_time or "-", ] for e in sorted_edges ] if edge_rows: elements.append(_make_table(["来源节点", "目标节点", "累计金额", "交易次数", "首笔时间"], edge_rows, [34 * mm, 34 * mm, 28 * mm, 20 * mm, 54 * mm])) else: elements.append(Paragraph("暂无可用的资金路径数据。", normal_style)) elements.append(Spacer(1, 6 * mm)) # Section 5: timeline if body.include_timeline: elements.append(Paragraph("交易时间轴", h2_style)) timeline_rows = [] for tx in [x for x in transactions if not x.is_duplicate][:80]: fa = assessment_by_tx.get(tx.id) status = fa.review_status.value if fa else "pending" timeline_rows.append([ _fmt_dt(tx.trade_time), f"{float(tx.amount):,.2f}", "支出" if tx.direction.value == "out" else "收入", _mask_text(tx.counterparty_name, 14), tx.source_app.value, status, ]) if timeline_rows: elements.append(_make_table(["时间", "金额(元)", "方向", "对方", "来源APP", "状态"], timeline_rows, [34 * mm, 24 * mm, 15 * mm, 34 * mm, 22 * mm, 28 * mm])) else: elements.append(Paragraph("暂无可生成时间轴的交易记录。", normal_style)) elements.append(Spacer(1, 6 * mm)) # Section 6: detailed transaction table if body.include_transactions: elements.append(Paragraph("重点交易明细(含证据索引)", h2_style)) elements.append(Spacer(1, 4 * mm)) header = ["交易时间", "金额", "方向", "来源APP", "对方", "证据索引", "状态"] rows = [] for tx in transactions[:120]: fa = assessment_by_tx.get(tx.id) rows.append([ _fmt_dt(tx.trade_time), f"{float(tx.amount):,.2f}", "出" if tx.direction.value == "out" else "入", tx.source_app.value, _mask_text(tx.counterparty_name, 12), str(tx.evidence_image_id)[:12] if tx.evidence_image_id else "-", fa.review_status.value if fa else "pending", ]) if rows: elements.append(_make_table(header, rows, [31 * mm, 19 * mm, 12 * mm, 20 * mm, 30 * mm, 35 * mm, 24 * mm])) elements.append(Spacer(1, 8 * mm)) # Section 7: reasons and exclusion if body.include_reasons: elements.append(Paragraph("认定理由与排除说明", h2_style)) elements.append(Spacer(1, 4 * mm)) for tx in transactions: fa = assessment_by_tx.get(tx.id) if not fa: continue line = ( f"[{_fmt_dt(tx.trade_time)}] ¥{float(tx.amount):,.2f} " f"| 置信:{fa.confidence_level.value} | 状态:{fa.review_status.value} | 理由:{_mask_text(fa.reason, 80)}" ) elements.append(Paragraph(line, normal_style)) if fa.exclude_reason: elements.append(Paragraph(f" 排除说明: {_mask_text(fa.exclude_reason, 100)}", normal_style)) if fa.review_status in {ReviewStatus.pending, ReviewStatus.needs_info}: elements.append(Paragraph(" 待补证提示: 建议补充问询与对账凭证后再作最终认定。", normal_style)) elements.append(Spacer(1, 8 * mm)) # Section 8: inquiry suggestions if body.include_inquiry: elements.append(Paragraph("待复核事项与笔录辅助问询建议", h2_style)) elements.append(Spacer(1, 4 * mm)) if pending: elements.append(Paragraph(f"当前待复核交易共 {len(pending)} 笔,涉及待复核金额约 ¥{dataset['pending_total']:,.2f}。", normal_style)) suggestions = await _get_inquiry_suggestions(case_id, db) for i, s in enumerate(suggestions, 1): elements.append(Paragraph(f"{i}. {s}", normal_style)) elements.append(Spacer(1, 8 * mm)) # Section 9: record QA draft if body.include_record_qa: elements.append(Paragraph("笔录问答草稿", h2_style)) elements.append(Spacer(1, 4 * mm)) qa_text = _build_record_qa_draft(transactions, assessment_by_tx) for line in qa_text.split("\n"): elements.append(Paragraph(line, normal_style)) elements.append(Spacer(1, 8 * mm)) if not elements or len(elements) <= 2: elements.append(Paragraph("未选择任何报告内容。", normal_style)) doc.build(elements) return file_path