391 lines
16 KiB
Python
391 lines
16 KiB
Python
"""Report generation: Excel / Word / PDF."""
|
|
import uuid
|
|
from pathlib import Path
|
|
from uuid import UUID
|
|
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.core.config import settings
|
|
from app.models.assessment import FraudAssessment, ReviewStatus
|
|
from app.models.transaction import TransactionRecord
|
|
from app.models.report import ExportReport, ReportType
|
|
from app.schemas.report import ReportCreate
|
|
|
|
|
|
async def generate_report(case_id: UUID, body: ReportCreate, db: AsyncSession) -> ExportReport:
|
|
result = await db.execute(
|
|
select(ExportReport)
|
|
.where(ExportReport.case_id == case_id, ExportReport.report_type == body.report_type)
|
|
)
|
|
existing = list(result.scalars().all())
|
|
version = len(existing) + 1
|
|
|
|
report_dir = settings.upload_path / str(case_id) / "reports"
|
|
report_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
if body.report_type == ReportType.excel:
|
|
file_path = await _gen_excel(case_id, report_dir, body, db)
|
|
elif body.report_type == ReportType.word:
|
|
file_path = await _gen_word(case_id, report_dir, body, db)
|
|
else:
|
|
file_path = await _gen_pdf(case_id, report_dir, body, db)
|
|
|
|
relative = str(file_path.relative_to(settings.upload_path))
|
|
|
|
snap_result = await db.execute(
|
|
select(FraudAssessment).where(
|
|
FraudAssessment.case_id == case_id,
|
|
FraudAssessment.review_status == ReviewStatus.confirmed,
|
|
)
|
|
)
|
|
snapshot = [
|
|
{"amount": float(a.assessed_amount), "reason": a.reason}
|
|
for a in snap_result.scalars().all()
|
|
]
|
|
|
|
report = ExportReport(
|
|
case_id=case_id,
|
|
report_type=body.report_type,
|
|
file_path=relative,
|
|
version=version,
|
|
content_snapshot={"assessments": snapshot},
|
|
)
|
|
db.add(report)
|
|
await db.flush()
|
|
await db.refresh(report)
|
|
return report
|
|
|
|
|
|
# ── helpers ──
|
|
|
|
async def _get_confirmed(case_id: UUID, db: AsyncSession) -> list[FraudAssessment]:
|
|
r = await db.execute(
|
|
select(FraudAssessment).where(
|
|
FraudAssessment.case_id == case_id,
|
|
FraudAssessment.review_status == ReviewStatus.confirmed,
|
|
)
|
|
)
|
|
return list(r.scalars().all())
|
|
|
|
|
|
async def _get_all_assessments(case_id: UUID, db: AsyncSession) -> dict:
|
|
r = await db.execute(select(FraudAssessment).where(FraudAssessment.case_id == case_id))
|
|
return {a.transaction_id: a for a in r.scalars().all()}
|
|
|
|
|
|
async def _get_all_transactions(case_id: UUID, db: AsyncSession) -> list[TransactionRecord]:
|
|
r = await db.execute(
|
|
select(TransactionRecord).where(TransactionRecord.case_id == case_id)
|
|
)
|
|
return list(r.scalars().all())
|
|
|
|
|
|
async def _get_inquiry_suggestions(case_id: UUID, db: AsyncSession) -> list[str]:
|
|
from app.services.assessment_service import generate_inquiry_suggestions
|
|
return await generate_inquiry_suggestions(case_id, db)
|
|
|
|
|
|
# ── Excel ──
|
|
|
|
async def _gen_excel(case_id: UUID, report_dir: Path, body: ReportCreate, db: AsyncSession) -> Path:
|
|
from openpyxl import Workbook
|
|
|
|
wb = Workbook()
|
|
sheet_created = False
|
|
|
|
if body.include_summary:
|
|
ws = wb.active if not sheet_created else wb.create_sheet()
|
|
ws.title = "被骗金额汇总"
|
|
ws.append(["交易时间", "金额(元)", "方向", "对方", "来源APP", "备注", "置信度", "认定理由"])
|
|
for a in await _get_confirmed(case_id, db):
|
|
tx = await db.get(TransactionRecord, a.transaction_id)
|
|
if tx:
|
|
ws.append([
|
|
tx.trade_time.strftime("%Y-%m-%d %H:%M:%S"),
|
|
float(a.assessed_amount),
|
|
"支出" if tx.direction.value == "out" else "收入",
|
|
tx.counterparty_name,
|
|
tx.source_app.value,
|
|
tx.remark,
|
|
tx.confidence,
|
|
a.reason[:100],
|
|
])
|
|
sheet_created = True
|
|
|
|
if body.include_transactions:
|
|
ws2 = wb.active if not sheet_created else wb.create_sheet()
|
|
if not sheet_created:
|
|
ws2.title = "交易明细"
|
|
else:
|
|
ws2.title = "交易明细"
|
|
ws2.append([
|
|
"交易ID", "证据图片ID", "交易时间", "金额(元)", "方向", "来源APP",
|
|
"对方名称", "对方账号", "本方账户尾号", "订单号", "备注", "识别置信度",
|
|
"是否重复", "是否中转", "聚类ID", "记录创建时间",
|
|
"认定置信等级", "认定金额(元)", "认定理由", "排除条件",
|
|
"复核状态", "复核备注", "复核人", "复核时间",
|
|
])
|
|
assessment_by_tx = await _get_all_assessments(case_id, db)
|
|
for tx in await _get_all_transactions(case_id, db):
|
|
fa = assessment_by_tx.get(tx.id)
|
|
ws2.append([
|
|
str(tx.id),
|
|
str(tx.evidence_image_id) if tx.evidence_image_id else "",
|
|
tx.trade_time.strftime("%Y-%m-%d %H:%M:%S"),
|
|
float(tx.amount),
|
|
tx.direction.value,
|
|
tx.source_app.value,
|
|
tx.counterparty_name,
|
|
tx.counterparty_account,
|
|
tx.self_account_tail_no,
|
|
tx.order_no,
|
|
tx.remark,
|
|
float(tx.confidence),
|
|
"是" if tx.is_duplicate else "否",
|
|
"是" if tx.is_transit else "否",
|
|
str(tx.cluster_id) if tx.cluster_id else "",
|
|
tx.created_at.strftime("%Y-%m-%d %H:%M:%S") if tx.created_at else "",
|
|
fa.confidence_level.value if fa else "",
|
|
float(fa.assessed_amount) if fa else "",
|
|
fa.reason if fa else "",
|
|
fa.exclude_reason if fa else "",
|
|
fa.review_status.value if fa else "",
|
|
fa.review_note if fa else "",
|
|
fa.reviewed_by if fa else "",
|
|
fa.reviewed_at.strftime("%Y-%m-%d %H:%M:%S") if (fa and fa.reviewed_at) else "",
|
|
])
|
|
sheet_created = True
|
|
|
|
if body.include_reasons:
|
|
ws3 = wb.active if not sheet_created else wb.create_sheet()
|
|
ws3.title = "认定理由与排除说明"
|
|
ws3.append(["交易时间", "金额(元)", "置信等级", "认定理由", "排除条件", "复核状态"])
|
|
assessment_by_tx = await _get_all_assessments(case_id, db)
|
|
for tx in await _get_all_transactions(case_id, db):
|
|
fa = assessment_by_tx.get(tx.id)
|
|
if not fa:
|
|
continue
|
|
ws3.append([
|
|
tx.trade_time.strftime("%Y-%m-%d %H:%M:%S"),
|
|
float(tx.amount),
|
|
fa.confidence_level.value,
|
|
fa.reason,
|
|
fa.exclude_reason,
|
|
fa.review_status.value,
|
|
])
|
|
sheet_created = True
|
|
|
|
if body.include_inquiry:
|
|
ws4 = wb.active if not sheet_created else wb.create_sheet()
|
|
ws4.title = "笔录辅助问询建议"
|
|
ws4.append(["序号", "问询建议"])
|
|
suggestions = await _get_inquiry_suggestions(case_id, db)
|
|
for i, s in enumerate(suggestions, 1):
|
|
ws4.append([i, s])
|
|
sheet_created = True
|
|
|
|
if not sheet_created:
|
|
ws = wb.active
|
|
ws.title = "报告"
|
|
ws.append(["未选择任何报告内容"])
|
|
|
|
file_path = report_dir / f"report_{uuid.uuid4().hex[:8]}.xlsx"
|
|
wb.save(file_path)
|
|
return file_path
|
|
|
|
|
|
# ── Word ──
|
|
|
|
async def _gen_word(case_id: UUID, report_dir: Path, body: ReportCreate, db: AsyncSession) -> Path:
|
|
from docx import Document
|
|
|
|
doc = Document()
|
|
doc.add_heading("受害人被骗金额汇总报告", level=1)
|
|
|
|
if body.include_summary:
|
|
confirmed = await _get_confirmed(case_id, db)
|
|
total = sum(float(a.assessed_amount) for a in confirmed)
|
|
doc.add_paragraph(f"已确认被骗金额: ¥{total:,.2f}")
|
|
doc.add_paragraph(f"已确认交易笔数: {len(confirmed)}")
|
|
table = doc.add_table(rows=1, cols=4)
|
|
table.style = "Table Grid"
|
|
hdr = table.rows[0].cells
|
|
hdr[0].text, hdr[1].text, hdr[2].text, hdr[3].text = "交易时间", "金额(元)", "对方", "认定理由"
|
|
for a in confirmed:
|
|
tx = await db.get(TransactionRecord, a.transaction_id)
|
|
row = table.add_row().cells
|
|
row[0].text = tx.trade_time.strftime("%Y-%m-%d %H:%M") if tx else ""
|
|
row[1].text = f"{float(a.assessed_amount):,.2f}"
|
|
row[2].text = tx.counterparty_name if tx else ""
|
|
row[3].text = a.reason[:80]
|
|
|
|
if body.include_transactions:
|
|
doc.add_heading("交易明细", level=2)
|
|
table2 = doc.add_table(rows=1, cols=6)
|
|
table2.style = "Table Grid"
|
|
for i, h in enumerate(["交易时间", "金额", "方向", "对方", "订单号", "备注"]):
|
|
table2.rows[0].cells[i].text = h
|
|
for tx in await _get_all_transactions(case_id, db):
|
|
row = table2.add_row().cells
|
|
row[0].text = tx.trade_time.strftime("%Y-%m-%d %H:%M")
|
|
row[1].text = f"{float(tx.amount):,.2f}"
|
|
row[2].text = tx.direction.value
|
|
row[3].text = tx.counterparty_name[:20]
|
|
row[4].text = tx.order_no[:20]
|
|
row[5].text = tx.remark[:30]
|
|
|
|
if body.include_reasons:
|
|
doc.add_heading("认定理由与排除说明", level=2)
|
|
assessment_by_tx = await _get_all_assessments(case_id, db)
|
|
for tx in await _get_all_transactions(case_id, db):
|
|
fa = assessment_by_tx.get(tx.id)
|
|
if not fa:
|
|
continue
|
|
doc.add_paragraph(
|
|
f"[{tx.trade_time.strftime('%Y-%m-%d %H:%M')}] ¥{float(tx.amount):,.2f} "
|
|
f"— {fa.confidence_level.value} — {fa.reason[:100]}"
|
|
)
|
|
if fa.exclude_reason:
|
|
doc.add_paragraph(f" 排除条件: {fa.exclude_reason[:100]}")
|
|
|
|
if body.include_inquiry:
|
|
doc.add_heading("笔录辅助问询建议", level=2)
|
|
suggestions = await _get_inquiry_suggestions(case_id, db)
|
|
for i, s in enumerate(suggestions, 1):
|
|
doc.add_paragraph(f"{i}. {s}")
|
|
|
|
file_path = report_dir / f"report_{uuid.uuid4().hex[:8]}.docx"
|
|
doc.save(file_path)
|
|
return file_path
|
|
|
|
|
|
# ── PDF ──
|
|
|
|
async def _gen_pdf(case_id: UUID, report_dir: Path, body: ReportCreate, db: AsyncSession) -> Path:
|
|
from reportlab.lib.pagesizes import A4
|
|
from reportlab.lib import colors
|
|
from reportlab.lib.units import mm
|
|
from reportlab.platypus import SimpleDocTemplate, Table as RLTable, TableStyle, Paragraph, Spacer
|
|
from reportlab.lib.styles import getSampleStyleSheet
|
|
from reportlab.pdfbase import pdfmetrics
|
|
from reportlab.pdfbase.ttfonts import TTFont
|
|
import os
|
|
|
|
font_registered = False
|
|
for font_path in [
|
|
"/System/Library/Fonts/PingFang.ttc",
|
|
"/System/Library/Fonts/STHeiti Medium.ttc",
|
|
"/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.ttc",
|
|
"/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc",
|
|
]:
|
|
if os.path.exists(font_path):
|
|
try:
|
|
pdfmetrics.registerFont(TTFont("ChineseFont", font_path, subfontIndex=0))
|
|
font_registered = True
|
|
break
|
|
except Exception:
|
|
continue
|
|
|
|
font_name = "ChineseFont" if font_registered else "Helvetica"
|
|
styles = getSampleStyleSheet()
|
|
title_style = styles["Title"]
|
|
title_style.fontName = font_name
|
|
normal_style = styles["Normal"]
|
|
normal_style.fontName = font_name
|
|
h2_style = styles["Heading2"]
|
|
h2_style.fontName = font_name
|
|
|
|
file_path = report_dir / f"report_{uuid.uuid4().hex[:8]}.pdf"
|
|
doc = SimpleDocTemplate(str(file_path), pagesize=A4,
|
|
leftMargin=15 * mm, rightMargin=15 * mm,
|
|
topMargin=20 * mm, bottomMargin=20 * mm)
|
|
elements = []
|
|
elements.append(Paragraph("受害人被骗金额汇总报告", title_style))
|
|
elements.append(Spacer(1, 10 * mm))
|
|
|
|
def _make_table(header, rows, col_widths):
|
|
data = [header] + rows
|
|
tbl = RLTable(data, colWidths=col_widths, repeatRows=1)
|
|
tbl.setStyle(TableStyle([
|
|
("FONTNAME", (0, 0), (-1, -1), font_name),
|
|
("FONTSIZE", (0, 0), (-1, -1), 8),
|
|
("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#1677ff")),
|
|
("TEXTCOLOR", (0, 0), (-1, 0), colors.white),
|
|
("GRID", (0, 0), (-1, -1), 0.5, colors.grey),
|
|
("VALIGN", (0, 0), (-1, -1), "TOP"),
|
|
("TOPPADDING", (0, 0), (-1, -1), 3),
|
|
("BOTTOMPADDING", (0, 0), (-1, -1), 3),
|
|
]))
|
|
return tbl
|
|
|
|
if body.include_summary:
|
|
confirmed = await _get_confirmed(case_id, db)
|
|
total = sum(float(a.assessed_amount) for a in confirmed)
|
|
elements.append(Paragraph(f"已确认被骗金额: ¥{total:,.2f}", normal_style))
|
|
elements.append(Paragraph(f"已确认交易笔数: {len(confirmed)}", normal_style))
|
|
elements.append(Spacer(1, 6 * mm))
|
|
header = ["交易时间", "金额(元)", "对方", "认定理由"]
|
|
rows = []
|
|
for a in confirmed:
|
|
tx = await db.get(TransactionRecord, a.transaction_id)
|
|
rows.append([
|
|
tx.trade_time.strftime("%Y-%m-%d %H:%M") if tx else "",
|
|
f"{float(a.assessed_amount):,.2f}",
|
|
(tx.counterparty_name if tx else "")[:20],
|
|
Paragraph(a.reason[:120], normal_style),
|
|
])
|
|
if rows:
|
|
elements.append(_make_table(header, rows, [38*mm, 25*mm, 35*mm, 72*mm]))
|
|
else:
|
|
elements.append(Paragraph("暂无已确认交易记录。", normal_style))
|
|
elements.append(Spacer(1, 8 * mm))
|
|
|
|
if body.include_transactions:
|
|
elements.append(Paragraph("交易明细", h2_style))
|
|
elements.append(Spacer(1, 4 * mm))
|
|
header = ["交易时间", "金额", "方向", "对方", "订单号"]
|
|
rows = []
|
|
for tx in await _get_all_transactions(case_id, db):
|
|
rows.append([
|
|
tx.trade_time.strftime("%Y-%m-%d %H:%M"),
|
|
f"{float(tx.amount):,.2f}",
|
|
"出" if tx.direction.value == "out" else "入",
|
|
tx.counterparty_name[:16],
|
|
tx.order_no[:16],
|
|
])
|
|
if rows:
|
|
elements.append(_make_table(header, rows, [38*mm, 22*mm, 12*mm, 48*mm, 48*mm]))
|
|
elements.append(Spacer(1, 8 * mm))
|
|
|
|
if body.include_reasons:
|
|
elements.append(Paragraph("认定理由与排除说明", h2_style))
|
|
elements.append(Spacer(1, 4 * mm))
|
|
assessment_by_tx = await _get_all_assessments(case_id, db)
|
|
for tx in await _get_all_transactions(case_id, db):
|
|
fa = assessment_by_tx.get(tx.id)
|
|
if not fa:
|
|
continue
|
|
line = (
|
|
f"[{tx.trade_time.strftime('%m-%d %H:%M')}] ¥{float(tx.amount):,.2f} "
|
|
f"| {fa.confidence_level.value} | {fa.reason[:80]}"
|
|
)
|
|
elements.append(Paragraph(line, normal_style))
|
|
if fa.exclude_reason:
|
|
elements.append(Paragraph(f" 排除: {fa.exclude_reason[:80]}", normal_style))
|
|
elements.append(Spacer(1, 8 * mm))
|
|
|
|
if body.include_inquiry:
|
|
elements.append(Paragraph("笔录辅助问询建议", h2_style))
|
|
elements.append(Spacer(1, 4 * mm))
|
|
suggestions = await _get_inquiry_suggestions(case_id, db)
|
|
for i, s in enumerate(suggestions, 1):
|
|
elements.append(Paragraph(f"{i}. {s}", normal_style))
|
|
elements.append(Spacer(1, 8 * mm))
|
|
|
|
if not elements or len(elements) <= 2:
|
|
elements.append(Paragraph("未选择任何报告内容。", normal_style))
|
|
|
|
doc.build(elements)
|
|
return file_path
|