diff --git a/backend/app/services/assessment_service.py b/backend/app/services/assessment_service.py index 93065a0..7a7bd0a 100644 --- a/backend/app/services/assessment_service.py +++ b/backend/app/services/assessment_service.py @@ -3,7 +3,7 @@ import logging from uuid import UUID import httpx -from sqlalchemy import select +from sqlalchemy import delete, select from sqlalchemy.ext.asyncio import AsyncSession from app.core.config import settings @@ -15,7 +15,14 @@ logger = logging.getLogger(__name__) async def assess_case(case_id: UUID, db: AsyncSession) -> list[FraudAssessment]: - """Run rule-based assessment on all non-duplicate transactions and generate reasons.""" + """Run rule-based assessment on all non-duplicate transactions and replace old results.""" + # Replace mode: rerun analysis for the same case should overwrite prior assessments + # instead of appending duplicated rows. + await db.execute( + delete(FraudAssessment).where(FraudAssessment.case_id == case_id) + ) + await db.flush() + result = await db.execute( select(TransactionRecord) .where(TransactionRecord.case_id == case_id) diff --git a/backend/app/services/report_service.py b/backend/app/services/report_service.py index 9b9dc5d..280b407 100644 --- a/backend/app/services/report_service.py +++ b/backend/app/services/report_service.py @@ -25,15 +25,14 @@ async def generate_report(case_id: UUID, body: ReportCreate, db: AsyncSession) - report_dir.mkdir(parents=True, exist_ok=True) if body.report_type == ReportType.excel: - file_path = await _gen_excel(case_id, report_dir, db) + file_path = await _gen_excel(case_id, report_dir, body, db) elif body.report_type == ReportType.word: - file_path = await _gen_word(case_id, report_dir, db) + file_path = await _gen_word(case_id, report_dir, body, db) else: - file_path = await _gen_pdf_placeholder(case_id, report_dir) + file_path = await _gen_pdf(case_id, report_dir, body, db) relative = str(file_path.relative_to(settings.upload_path)) - # snapshot confirmed assessments snap_result = await db.execute( select(FraudAssessment).where( FraudAssessment.case_id == case_id, @@ -58,99 +57,334 @@ async def generate_report(case_id: UUID, body: ReportCreate, db: AsyncSession) - return report -async def _gen_excel(case_id: UUID, report_dir: Path, db: AsyncSession) -> Path: - from openpyxl import Workbook +# ── helpers ── - wb = Workbook() - - # Sheet 1: Summary - ws = wb.active - ws.title = "被骗金额汇总" - ws.append(["交易时间", "金额(元)", "方向", "对方", "来源APP", "备注", "置信度", "认定理由"]) - - assessments_result = await db.execute( +async def _get_confirmed(case_id: UUID, db: AsyncSession) -> list[FraudAssessment]: + r = await db.execute( select(FraudAssessment).where( FraudAssessment.case_id == case_id, FraudAssessment.review_status == ReviewStatus.confirmed, ) ) - for a in assessments_result.scalars().all(): - tx = await db.get(TransactionRecord, a.transaction_id) - if tx: - ws.append([ - tx.trade_time.strftime("%Y-%m-%d %H:%M:%S"), - float(a.assessed_amount), - "支出" if tx.direction.value == "out" else "收入", - tx.counterparty_name, - tx.source_app.value, - tx.remark, - tx.confidence, - a.reason[:100], - ]) + return list(r.scalars().all()) - # Sheet 2: All transactions - ws2 = wb.create_sheet("交易明细") - ws2.append(["交易时间", "金额", "方向", "对方", "来源", "订单号", "是否重复", "是否中转"]) - tx_result = await db.execute( + +async def _get_all_assessments(case_id: UUID, db: AsyncSession) -> dict: + r = await db.execute(select(FraudAssessment).where(FraudAssessment.case_id == case_id)) + return {a.transaction_id: a for a in r.scalars().all()} + + +async def _get_all_transactions(case_id: UUID, db: AsyncSession) -> list[TransactionRecord]: + r = await db.execute( select(TransactionRecord).where(TransactionRecord.case_id == case_id) ) - for tx in tx_result.scalars().all(): + return list(r.scalars().all()) + + +async def _get_inquiry_suggestions(case_id: UUID, db: AsyncSession) -> list[str]: + from app.services.assessment_service import generate_inquiry_suggestions + return await generate_inquiry_suggestions(case_id, db) + + +# ── Excel ── + +async def _gen_excel(case_id: UUID, report_dir: Path, body: ReportCreate, db: AsyncSession) -> Path: + from openpyxl import Workbook + + wb = Workbook() + sheet_created = False + + if body.include_summary: + ws = wb.active if not sheet_created else wb.create_sheet() + ws.title = "被骗金额汇总" + ws.append(["交易时间", "金额(元)", "方向", "对方", "来源APP", "备注", "置信度", "认定理由"]) + for a in await _get_confirmed(case_id, db): + tx = await db.get(TransactionRecord, a.transaction_id) + if tx: + ws.append([ + tx.trade_time.strftime("%Y-%m-%d %H:%M:%S"), + float(a.assessed_amount), + "支出" if tx.direction.value == "out" else "收入", + tx.counterparty_name, + tx.source_app.value, + tx.remark, + tx.confidence, + a.reason[:100], + ]) + sheet_created = True + + if body.include_transactions: + ws2 = wb.active if not sheet_created else wb.create_sheet() + if not sheet_created: + ws2.title = "交易明细" + else: + ws2.title = "交易明细" ws2.append([ - tx.trade_time.strftime("%Y-%m-%d %H:%M:%S"), - float(tx.amount), - tx.direction.value, - tx.counterparty_name, - tx.source_app.value, - tx.order_no, - "是" if tx.is_duplicate else "否", - "是" if tx.is_transit else "否", + "交易ID", "证据图片ID", "交易时间", "金额(元)", "方向", "来源APP", + "对方名称", "对方账号", "本方账户尾号", "订单号", "备注", "识别置信度", + "是否重复", "是否中转", "聚类ID", "记录创建时间", + "认定置信等级", "认定金额(元)", "认定理由", "排除条件", + "复核状态", "复核备注", "复核人", "复核时间", ]) + assessment_by_tx = await _get_all_assessments(case_id, db) + for tx in await _get_all_transactions(case_id, db): + fa = assessment_by_tx.get(tx.id) + ws2.append([ + str(tx.id), + str(tx.evidence_image_id) if tx.evidence_image_id else "", + tx.trade_time.strftime("%Y-%m-%d %H:%M:%S"), + float(tx.amount), + tx.direction.value, + tx.source_app.value, + tx.counterparty_name, + tx.counterparty_account, + tx.self_account_tail_no, + tx.order_no, + tx.remark, + float(tx.confidence), + "是" if tx.is_duplicate else "否", + "是" if tx.is_transit else "否", + str(tx.cluster_id) if tx.cluster_id else "", + tx.created_at.strftime("%Y-%m-%d %H:%M:%S") if tx.created_at else "", + fa.confidence_level.value if fa else "", + float(fa.assessed_amount) if fa else "", + fa.reason if fa else "", + fa.exclude_reason if fa else "", + fa.review_status.value if fa else "", + fa.review_note if fa else "", + fa.reviewed_by if fa else "", + fa.reviewed_at.strftime("%Y-%m-%d %H:%M:%S") if (fa and fa.reviewed_at) else "", + ]) + sheet_created = True + + if body.include_reasons: + ws3 = wb.active if not sheet_created else wb.create_sheet() + ws3.title = "认定理由与排除说明" + ws3.append(["交易时间", "金额(元)", "置信等级", "认定理由", "排除条件", "复核状态"]) + assessment_by_tx = await _get_all_assessments(case_id, db) + for tx in await _get_all_transactions(case_id, db): + fa = assessment_by_tx.get(tx.id) + if not fa: + continue + ws3.append([ + tx.trade_time.strftime("%Y-%m-%d %H:%M:%S"), + float(tx.amount), + fa.confidence_level.value, + fa.reason, + fa.exclude_reason, + fa.review_status.value, + ]) + sheet_created = True + + if body.include_inquiry: + ws4 = wb.active if not sheet_created else wb.create_sheet() + ws4.title = "笔录辅助问询建议" + ws4.append(["序号", "问询建议"]) + suggestions = await _get_inquiry_suggestions(case_id, db) + for i, s in enumerate(suggestions, 1): + ws4.append([i, s]) + sheet_created = True + + if not sheet_created: + ws = wb.active + ws.title = "报告" + ws.append(["未选择任何报告内容"]) file_path = report_dir / f"report_{uuid.uuid4().hex[:8]}.xlsx" wb.save(file_path) return file_path -async def _gen_word(case_id: UUID, report_dir: Path, db: AsyncSession) -> Path: +# ── Word ── + +async def _gen_word(case_id: UUID, report_dir: Path, body: ReportCreate, db: AsyncSession) -> Path: from docx import Document doc = Document() doc.add_heading("受害人被骗金额汇总报告", level=1) - assessments_result = await db.execute( - select(FraudAssessment).where( - FraudAssessment.case_id == case_id, - FraudAssessment.review_status == ReviewStatus.confirmed, - ) - ) - confirmed = list(assessments_result.scalars().all()) - total = sum(float(a.assessed_amount) for a in confirmed) + if body.include_summary: + confirmed = await _get_confirmed(case_id, db) + total = sum(float(a.assessed_amount) for a in confirmed) + doc.add_paragraph(f"已确认被骗金额: ¥{total:,.2f}") + doc.add_paragraph(f"已确认交易笔数: {len(confirmed)}") + table = doc.add_table(rows=1, cols=4) + table.style = "Table Grid" + hdr = table.rows[0].cells + hdr[0].text, hdr[1].text, hdr[2].text, hdr[3].text = "交易时间", "金额(元)", "对方", "认定理由" + for a in confirmed: + tx = await db.get(TransactionRecord, a.transaction_id) + row = table.add_row().cells + row[0].text = tx.trade_time.strftime("%Y-%m-%d %H:%M") if tx else "" + row[1].text = f"{float(a.assessed_amount):,.2f}" + row[2].text = tx.counterparty_name if tx else "" + row[3].text = a.reason[:80] - doc.add_paragraph(f"已确认被骗金额: ¥{total:,.2f}") - doc.add_paragraph(f"已确认交易笔数: {len(confirmed)}") + if body.include_transactions: + doc.add_heading("交易明细", level=2) + table2 = doc.add_table(rows=1, cols=6) + table2.style = "Table Grid" + for i, h in enumerate(["交易时间", "金额", "方向", "对方", "订单号", "备注"]): + table2.rows[0].cells[i].text = h + for tx in await _get_all_transactions(case_id, db): + row = table2.add_row().cells + row[0].text = tx.trade_time.strftime("%Y-%m-%d %H:%M") + row[1].text = f"{float(tx.amount):,.2f}" + row[2].text = tx.direction.value + row[3].text = tx.counterparty_name[:20] + row[4].text = tx.order_no[:20] + row[5].text = tx.remark[:30] - table = doc.add_table(rows=1, cols=4) - table.style = "Table Grid" - hdr = table.rows[0].cells - hdr[0].text = "交易时间" - hdr[1].text = "金额(元)" - hdr[2].text = "对方" - hdr[3].text = "认定理由" + if body.include_reasons: + doc.add_heading("认定理由与排除说明", level=2) + assessment_by_tx = await _get_all_assessments(case_id, db) + for tx in await _get_all_transactions(case_id, db): + fa = assessment_by_tx.get(tx.id) + if not fa: + continue + doc.add_paragraph( + f"[{tx.trade_time.strftime('%Y-%m-%d %H:%M')}] ¥{float(tx.amount):,.2f} " + f"— {fa.confidence_level.value} — {fa.reason[:100]}" + ) + if fa.exclude_reason: + doc.add_paragraph(f" 排除条件: {fa.exclude_reason[:100]}") - for a in confirmed: - tx = await db.get(TransactionRecord, a.transaction_id) - row = table.add_row().cells - row[0].text = tx.trade_time.strftime("%Y-%m-%d %H:%M") if tx else "" - row[1].text = f"{float(a.assessed_amount):,.2f}" - row[2].text = tx.counterparty_name if tx else "" - row[3].text = a.reason[:80] + if body.include_inquiry: + doc.add_heading("笔录辅助问询建议", level=2) + suggestions = await _get_inquiry_suggestions(case_id, db) + for i, s in enumerate(suggestions, 1): + doc.add_paragraph(f"{i}. {s}") file_path = report_dir / f"report_{uuid.uuid4().hex[:8]}.docx" doc.save(file_path) return file_path -async def _gen_pdf_placeholder(case_id: UUID, report_dir: Path) -> Path: +# ── PDF ── + +async def _gen_pdf(case_id: UUID, report_dir: Path, body: ReportCreate, db: AsyncSession) -> Path: + from reportlab.lib.pagesizes import A4 + from reportlab.lib import colors + from reportlab.lib.units import mm + from reportlab.platypus import SimpleDocTemplate, Table as RLTable, TableStyle, Paragraph, Spacer + from reportlab.lib.styles import getSampleStyleSheet + from reportlab.pdfbase import pdfmetrics + from reportlab.pdfbase.ttfonts import TTFont + import os + + font_registered = False + for font_path in [ + "/System/Library/Fonts/PingFang.ttc", + "/System/Library/Fonts/STHeiti Medium.ttc", + "/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.ttc", + "/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc", + ]: + if os.path.exists(font_path): + try: + pdfmetrics.registerFont(TTFont("ChineseFont", font_path, subfontIndex=0)) + font_registered = True + break + except Exception: + continue + + font_name = "ChineseFont" if font_registered else "Helvetica" + styles = getSampleStyleSheet() + title_style = styles["Title"] + title_style.fontName = font_name + normal_style = styles["Normal"] + normal_style.fontName = font_name + h2_style = styles["Heading2"] + h2_style.fontName = font_name + file_path = report_dir / f"report_{uuid.uuid4().hex[:8]}.pdf" - file_path.write_text("PDF report placeholder – integrate weasyprint/reportlab for production.") + doc = SimpleDocTemplate(str(file_path), pagesize=A4, + leftMargin=15 * mm, rightMargin=15 * mm, + topMargin=20 * mm, bottomMargin=20 * mm) + elements = [] + elements.append(Paragraph("受害人被骗金额汇总报告", title_style)) + elements.append(Spacer(1, 10 * mm)) + + def _make_table(header, rows, col_widths): + data = [header] + rows + tbl = RLTable(data, colWidths=col_widths, repeatRows=1) + tbl.setStyle(TableStyle([ + ("FONTNAME", (0, 0), (-1, -1), font_name), + ("FONTSIZE", (0, 0), (-1, -1), 8), + ("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#1677ff")), + ("TEXTCOLOR", (0, 0), (-1, 0), colors.white), + ("GRID", (0, 0), (-1, -1), 0.5, colors.grey), + ("VALIGN", (0, 0), (-1, -1), "TOP"), + ("TOPPADDING", (0, 0), (-1, -1), 3), + ("BOTTOMPADDING", (0, 0), (-1, -1), 3), + ])) + return tbl + + if body.include_summary: + confirmed = await _get_confirmed(case_id, db) + total = sum(float(a.assessed_amount) for a in confirmed) + elements.append(Paragraph(f"已确认被骗金额: ¥{total:,.2f}", normal_style)) + elements.append(Paragraph(f"已确认交易笔数: {len(confirmed)}", normal_style)) + elements.append(Spacer(1, 6 * mm)) + header = ["交易时间", "金额(元)", "对方", "认定理由"] + rows = [] + for a in confirmed: + tx = await db.get(TransactionRecord, a.transaction_id) + rows.append([ + tx.trade_time.strftime("%Y-%m-%d %H:%M") if tx else "", + f"{float(a.assessed_amount):,.2f}", + (tx.counterparty_name if tx else "")[:20], + Paragraph(a.reason[:120], normal_style), + ]) + if rows: + elements.append(_make_table(header, rows, [38*mm, 25*mm, 35*mm, 72*mm])) + else: + elements.append(Paragraph("暂无已确认交易记录。", normal_style)) + elements.append(Spacer(1, 8 * mm)) + + if body.include_transactions: + elements.append(Paragraph("交易明细", h2_style)) + elements.append(Spacer(1, 4 * mm)) + header = ["交易时间", "金额", "方向", "对方", "订单号"] + rows = [] + for tx in await _get_all_transactions(case_id, db): + rows.append([ + tx.trade_time.strftime("%Y-%m-%d %H:%M"), + f"{float(tx.amount):,.2f}", + "出" if tx.direction.value == "out" else "入", + tx.counterparty_name[:16], + tx.order_no[:16], + ]) + if rows: + elements.append(_make_table(header, rows, [38*mm, 22*mm, 12*mm, 48*mm, 48*mm])) + elements.append(Spacer(1, 8 * mm)) + + if body.include_reasons: + elements.append(Paragraph("认定理由与排除说明", h2_style)) + elements.append(Spacer(1, 4 * mm)) + assessment_by_tx = await _get_all_assessments(case_id, db) + for tx in await _get_all_transactions(case_id, db): + fa = assessment_by_tx.get(tx.id) + if not fa: + continue + line = ( + f"[{tx.trade_time.strftime('%m-%d %H:%M')}] ¥{float(tx.amount):,.2f} " + f"| {fa.confidence_level.value} | {fa.reason[:80]}" + ) + elements.append(Paragraph(line, normal_style)) + if fa.exclude_reason: + elements.append(Paragraph(f" 排除: {fa.exclude_reason[:80]}", normal_style)) + elements.append(Spacer(1, 8 * mm)) + + if body.include_inquiry: + elements.append(Paragraph("笔录辅助问询建议", h2_style)) + elements.append(Spacer(1, 4 * mm)) + suggestions = await _get_inquiry_suggestions(case_id, db) + for i, s in enumerate(suggestions, 1): + elements.append(Paragraph(f"{i}. {s}", normal_style)) + elements.append(Spacer(1, 8 * mm)) + + if not elements or len(elements) <= 2: + elements.append(Paragraph("未选择任何报告内容。", normal_style)) + + doc.build(elements) return file_path