Files
fund-tracer/backend/app/services/matching_service.py

85 lines
2.7 KiB
Python
Raw Normal View History

2026-03-11 16:28:04 +08:00
"""Transaction deduplication and matching engine.
2026-03-12 19:57:30 +08:00
Strategy:
1. Deduplicate only by exact order_no match
2. Mark transit (self-transfer) records for exclusion from fraud totals
Note:
Highly similar records (same amount/time/counterparty) are preserved so they
can be reviewed by officers in the assessment review stage.
2026-03-11 16:28:04 +08:00
"""
from uuid import UUID
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.transaction import TransactionRecord
from app.models.transaction_cluster import TransactionCluster
from app.repositories.transaction_repo import TransactionRepository
from app.rules.dedup_rules import is_duplicate_pair
from app.rules.transit_rules import is_self_transfer
async def run_matching(case_id: UUID, self_accounts: list[str], db: AsyncSession) -> None:
"""Execute the full dedup + transit-marking pipeline for a case."""
repo = TransactionRepository(db)
transactions = await repo.get_all_by_case(case_id)
if not transactions:
return
# reset flags
for tx in transactions:
tx.is_duplicate = False
tx.is_transit = False
tx.cluster_id = None
# ── Layer 1 & 2: dedup ──
matched: set[UUID] = set()
clusters: list[TransactionCluster] = []
for i, tx_a in enumerate(transactions):
if tx_a.id in matched:
continue
group = [tx_a]
for tx_b in transactions[i + 1:]:
if tx_b.id in matched:
continue
if is_duplicate_pair(tx_a, tx_b):
group.append(tx_b)
matched.add(tx_b.id)
if len(group) > 1:
primary = max(group, key=lambda t: t.confidence)
cluster = TransactionCluster(
case_id=case_id,
primary_tx_id=primary.id,
match_reason=_match_reason(primary, group),
)
db.add(cluster)
await db.flush()
for tx in group:
tx.cluster_id = cluster.id
if tx.id != primary.id:
tx.is_duplicate = True
clusters.append(cluster)
# ── Layer 3: transit detection ──
for tx in transactions:
if tx.is_duplicate:
continue
if is_self_transfer(tx, self_accounts):
tx.is_transit = True
await db.flush()
def _match_reason(primary: TransactionRecord, group: list[TransactionRecord]) -> str:
reasons: list[str] = []
orders = {tx.order_no for tx in group if tx.order_no}
if len(orders) == 1:
reasons.append("订单号一致")
amounts = {float(tx.amount) for tx in group}
if len(amounts) == 1:
reasons.append("金额一致")
return "; ".join(reasons) if reasons else "时间和金额近似"