Files
fund-tracer/backend/app/services/matching_service.py
2026-03-17 22:39:05 +08:00

94 lines
3.2 KiB
Python

"""Transaction deduplication and matching engine.
Strategy:
1. Deduplicate only by exact order_no match
2. Mark transit (self-transfer) records for exclusion from fraud totals
Note:
Highly similar records (same amount/time/counterparty) are preserved so they
can be reviewed by officers in the assessment review stage.
"""
from uuid import UUID
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.transaction import TransactionRecord
from app.models.transaction_cluster import TransactionCluster
from app.repositories.transaction_repo import TransactionRepository
from app.rules.dedup_rules import is_duplicate_pair
from app.rules.transit_rules import is_self_transfer, is_fee_tolerant_transit_pair
async def run_matching(case_id: UUID, self_accounts: list[str], db: AsyncSession) -> None:
"""Execute the full dedup + transit-marking pipeline for a case."""
repo = TransactionRepository(db)
transactions = await repo.get_all_by_case(case_id)
if not transactions:
return
# reset flags
for tx in transactions:
tx.is_duplicate = False
tx.is_transit = False
tx.cluster_id = None
# ── Layer 1 & 2: dedup ──
matched: set[UUID] = set()
clusters: list[TransactionCluster] = []
for i, tx_a in enumerate(transactions):
if tx_a.id in matched:
continue
group = [tx_a]
for tx_b in transactions[i + 1:]:
if tx_b.id in matched:
continue
if is_duplicate_pair(tx_a, tx_b):
group.append(tx_b)
matched.add(tx_b.id)
if len(group) > 1:
primary = max(group, key=lambda t: t.confidence)
cluster = TransactionCluster(
case_id=case_id,
primary_tx_id=primary.id,
match_reason=_match_reason(primary, group),
)
db.add(cluster)
await db.flush()
for tx in group:
tx.cluster_id = cluster.id
if tx.id != primary.id:
tx.is_duplicate = True
clusters.append(cluster)
# ── Layer 3: transit detection ──
for tx in transactions:
if tx.is_duplicate:
continue
if is_self_transfer(tx, self_accounts):
tx.is_transit = True
# Rule extension: if an in/out pair occurs within 2 minutes and
# amount difference is within 2% (e.g. fee), mark both as transit.
non_duplicate = [tx for tx in transactions if not tx.is_duplicate]
for i, tx_a in enumerate(non_duplicate):
for tx_b in non_duplicate[i + 1 :]:
if is_fee_tolerant_transit_pair(tx_a, tx_b):
tx_a.is_transit = True
tx_b.is_transit = True
await db.flush()
def _match_reason(primary: TransactionRecord, group: list[TransactionRecord]) -> str:
reasons: list[str] = []
orders = {tx.order_no for tx in group if tx.order_no}
if len(orders) == 1:
reasons.append("订单号一致")
amounts = {float(tx.amount) for tx in group}
if len(amounts) == 1:
reasons.append("金额一致")
return "; ".join(reasons) if reasons else "时间和金额近似"