diff --git a/backend/app/rules/transit_rules.py b/backend/app/rules/transit_rules.py index 0b310bd..bbb5527 100644 --- a/backend/app/rules/transit_rules.py +++ b/backend/app/rules/transit_rules.py @@ -1,46 +1,49 @@ -"""Transit (self-transfer) detection rules. - -Identifies transactions that are internal transfers between the victim's -own accounts (e.g. bank -> Alipay -> WeChat) and should NOT be counted -as fraud loss. -""" +"""Transit (self-transfer) detection rules.""" from datetime import datetime from app.models.transaction import TransactionRecord -SELF_KEYWORDS = ["本人", "自己", "余额", "充值", "提现", "银行卡转入", "银行卡充值"] +SELF_KEYWORDS = ("本人", "自己", "余额", "充值", "提现", "银行卡转入", "银行卡充值") +APP_PATTERNS = ("支付宝", "微信", "银行卡", "数字钱包") FEE_TRANSIT_WINDOW_SECONDS = 120 FEE_TOLERANCE_RATIO = 0.02 +def _normalize_text(value: str | None) -> str: + return (value or "").strip().lower() + + +def _contains_any(text: str, patterns: tuple[str, ...]) -> bool: + return any(p in text for p in patterns) + + +def _amount_ratio_diff(a: float, b: float) -> float: + base = max(a, b) + return 0 if base <= 0 else abs(a - b) / base + + def is_self_transfer(tx: TransactionRecord, known_self_accounts: list[str]) -> bool: - """Check if a transaction is an inter-account transfer by the victim.""" - counterparty = (tx.counterparty_name or "").lower() - remark = (tx.remark or "").lower() + """Single-transaction heuristic for victim self-transfer.""" + counterparty = _normalize_text(tx.counterparty_name) + remark = _normalize_text(tx.remark) - # Rule 1: counterparty matches known self accounts - for acct in known_self_accounts: - if acct and acct.lower() in counterparty: - return True + # Rule 1: explicit known self account hit + if any(acct and _normalize_text(acct) in counterparty for acct in known_self_accounts): + return True - # Rule 2: counterparty contains self-transfer keywords - for kw in SELF_KEYWORDS: - if kw in counterparty or kw in remark: - return True + # Rule 2: self-transfer keywords + if _contains_any(counterparty, SELF_KEYWORDS) or _contains_any(remark, SELF_KEYWORDS): + return True - # Rule 3: counterparty references another payment app owned by victim - app_keywords = ["支付宝", "微信", "银行卡", "数字钱包"] - victim_patterns = [f"{app}-" for app in app_keywords] + app_keywords - for pat in victim_patterns: - if pat in counterparty: - if tx.direction.value == "out": - return True + # Rule 3: outflow to another own payment channel + if tx.direction.value == "out" and _contains_any(counterparty, APP_PATTERNS): + return True return False def is_fee_tolerant_transit_pair(tx_a: TransactionRecord, tx_b: TransactionRecord) -> bool: - """Two-way transfer pattern: opposite direction, close time, similar amount.""" + """Pair heuristic: opposite direction, close time, amount diff <= 2%.""" if tx_a.direction.value == tx_b.direction.value: return False @@ -48,16 +51,46 @@ def is_fee_tolerant_transit_pair(tx_a: TransactionRecord, tx_b: TransactionRecor amount_b = float(tx_b.amount or 0) if amount_a <= 0 or amount_b <= 0: return False + if _amount_ratio_diff(amount_a, amount_b) > FEE_TOLERANCE_RATIO: + return False time_a = tx_a.trade_time time_b = tx_b.trade_time if not isinstance(time_a, datetime) or not isinstance(time_b, datetime): return False - if abs((time_a - time_b).total_seconds()) > FEE_TRANSIT_WINDOW_SECONDS: + try: + return abs((time_a - time_b).total_seconds()) <= FEE_TRANSIT_WINDOW_SECONDS + except TypeError: return False - amount_base = max(amount_a, amount_b) - if amount_base <= 0: - return False - diff_ratio = abs(amount_a - amount_b) / amount_base - return diff_ratio <= FEE_TOLERANCE_RATIO + +def mark_transit_transactions( + transactions: list[TransactionRecord], + known_self_accounts: list[str], +) -> None: + """Mark `is_transit` in-place using single-transaction + pair rules.""" + candidates = [tx for tx in transactions if not tx.is_duplicate] + if not candidates: + return + + # Pass 1: single transaction rules + for tx in candidates: + if is_self_transfer(tx, known_self_accounts): + tx.is_transit = True + + # Pass 2: pair rules within 2-minute window + sorted_txs = sorted(candidates, key=lambda tx: tx.trade_time) + for i, tx_a in enumerate(sorted_txs): + j = i + 1 + while j < len(sorted_txs): + tx_b = sorted_txs[j] + try: + seconds_gap = (tx_b.trade_time - tx_a.trade_time).total_seconds() + except TypeError: + break + if seconds_gap > FEE_TRANSIT_WINDOW_SECONDS: + break + if is_fee_tolerant_transit_pair(tx_a, tx_b): + tx_a.is_transit = True + tx_b.is_transit = True + j += 1 diff --git a/backend/app/services/matching_service.py b/backend/app/services/matching_service.py index 044d1ba..81f73ad 100644 --- a/backend/app/services/matching_service.py +++ b/backend/app/services/matching_service.py @@ -15,7 +15,7 @@ from app.models.transaction import TransactionRecord from app.models.transaction_cluster import TransactionCluster from app.repositories.transaction_repo import TransactionRepository from app.rules.dedup_rules import is_duplicate_pair -from app.rules.transit_rules import is_self_transfer, is_fee_tolerant_transit_pair +from app.rules.transit_rules import mark_transit_transactions async def run_matching(case_id: UUID, self_accounts: list[str], db: AsyncSession) -> None: @@ -34,8 +34,6 @@ async def run_matching(case_id: UUID, self_accounts: list[str], db: AsyncSession # ── Layer 1 & 2: dedup ── matched: set[UUID] = set() - clusters: list[TransactionCluster] = [] - for i, tx_a in enumerate(transactions): if tx_a.id in matched: continue @@ -61,23 +59,9 @@ async def run_matching(case_id: UUID, self_accounts: list[str], db: AsyncSession tx.cluster_id = cluster.id if tx.id != primary.id: tx.is_duplicate = True - clusters.append(cluster) # ── Layer 3: transit detection ── - for tx in transactions: - if tx.is_duplicate: - continue - if is_self_transfer(tx, self_accounts): - tx.is_transit = True - - # Rule extension: if an in/out pair occurs within 2 minutes and - # amount difference is within 2% (e.g. fee), mark both as transit. - non_duplicate = [tx for tx in transactions if not tx.is_duplicate] - for i, tx_a in enumerate(non_duplicate): - for tx_b in non_duplicate[i + 1 :]: - if is_fee_tolerant_transit_pair(tx_a, tx_b): - tx_a.is_transit = True - tx_b.is_transit = True + mark_transit_transactions(transactions, self_accounts) await db.flush() diff --git a/backend/tests/test_rules.py b/backend/tests/test_rules.py index 53a7927..16b4691 100644 --- a/backend/tests/test_rules.py +++ b/backend/tests/test_rules.py @@ -8,7 +8,11 @@ import pytest from app.models.transaction import Direction from app.models.evidence_image import SourceApp from app.rules.dedup_rules import is_duplicate_pair -from app.rules.transit_rules import is_self_transfer, is_fee_tolerant_transit_pair +from app.rules.transit_rules import ( + is_self_transfer, + is_fee_tolerant_transit_pair, + mark_transit_transactions, +) from app.rules.assessment_rules import classify_transaction @@ -105,6 +109,36 @@ class TestTransitRules: ) assert not is_fee_tolerant_transit_pair(out_tx, in_tx) + def test_wechat_recharge_in_then_out_marked_transit(self): + in_tx = _make_tx( + trade_time=datetime(2026, 3, 17, 21, 46, 0, tzinfo=timezone.utc), + amount=50, + direction=Direction.in_, + counterparty_name="零钱充值-来自工商银行(3893)", + counterparty_account="", + self_account_tail_no="3893", + order_no="", + remark="", + confidence=0.95, + is_transit=False, + ) + out_tx = _make_tx( + trade_time=datetime(2026, 3, 17, 21, 46, 59, tzinfo=timezone.utc), + amount=50, + direction=Direction.out, + counterparty_name="童年", + counterparty_account="1154****0928", + self_account_tail_no="3893", + order_no="", + remark="充值", + confidence=0.98, + is_transit=False, + ) + txs = [in_tx, out_tx] + mark_transit_transactions(txs, known_self_accounts=[]) + assert in_tx.is_transit + assert out_tx.is_transit + class TestAssessmentRules: def test_transit_classified_as_low(self):