"""Transit (self-transfer) detection rules.""" from datetime import datetime from app.models.transaction import TransactionRecord SELF_KEYWORDS = ("本人", "自己", "余额", "充值", "提现", "银行卡转入", "银行卡充值") APP_PATTERNS = ("支付宝", "微信", "银行卡", "数字钱包") FEE_TRANSIT_WINDOW_SECONDS = 120 FEE_TOLERANCE_RATIO = 0.02 def _normalize_text(value: str | None) -> str: return (value or "").strip().lower() def _contains_any(text: str, patterns: tuple[str, ...]) -> bool: return any(p in text for p in patterns) def _amount_ratio_diff(a: float, b: float) -> float: base = max(a, b) return 0 if base <= 0 else abs(a - b) / base def is_self_transfer(tx: TransactionRecord, known_self_accounts: list[str]) -> bool: """Single-transaction heuristic for victim self-transfer.""" counterparty = _normalize_text(tx.counterparty_name) remark = _normalize_text(tx.remark) # Rule 1: explicit known self account hit if any(acct and _normalize_text(acct) in counterparty for acct in known_self_accounts): return True # Rule 2: self-transfer keywords if _contains_any(counterparty, SELF_KEYWORDS) or _contains_any(remark, SELF_KEYWORDS): return True # Rule 3: outflow to another own payment channel if tx.direction.value == "out" and _contains_any(counterparty, APP_PATTERNS): return True return False def is_fee_tolerant_transit_pair(tx_a: TransactionRecord, tx_b: TransactionRecord) -> bool: """Pair heuristic: opposite direction, close time, amount diff <= 2%.""" if tx_a.direction.value == tx_b.direction.value: return False amount_a = float(tx_a.amount or 0) amount_b = float(tx_b.amount or 0) if amount_a <= 0 or amount_b <= 0: return False if _amount_ratio_diff(amount_a, amount_b) > FEE_TOLERANCE_RATIO: return False time_a = tx_a.trade_time time_b = tx_b.trade_time if not isinstance(time_a, datetime) or not isinstance(time_b, datetime): return False try: return abs((time_a - time_b).total_seconds()) <= FEE_TRANSIT_WINDOW_SECONDS except TypeError: return False def mark_transit_transactions( transactions: list[TransactionRecord], known_self_accounts: list[str], ) -> None: """Mark `is_transit` in-place using single-transaction + pair rules.""" candidates = [tx for tx in transactions if not tx.is_duplicate] if not candidates: return # Pass 1: single transaction rules for tx in candidates: if is_self_transfer(tx, known_self_accounts): tx.is_transit = True # Pass 2: pair rules within 2-minute window sorted_txs = sorted(candidates, key=lambda tx: tx.trade_time) for i, tx_a in enumerate(sorted_txs): j = i + 1 while j < len(sorted_txs): tx_b = sorted_txs[j] try: seconds_gap = (tx_b.trade_time - tx_a.trade_time).total_seconds() except TypeError: break if seconds_gap > FEE_TRANSIT_WINDOW_SECONDS: break if is_fee_tolerant_transit_pair(tx_a, tx_b): tx_a.is_transit = True tx_b.is_transit = True j += 1