"""Fund flow analysis: build directed graph and summary from transactions.""" from collections import defaultdict from decimal import Decimal import networkx as nx from app.schemas.analysis import ( AnalysisSummaryResponse, AppSummary, FlowGraphResponse, FlowNode, FlowEdge, ) from app.schemas.transaction import TransactionResponse # Transaction types that mean money leaving victim's app (outflow) OUT_TYPES = {"转出", "消费", "付款", "提现"} # Transaction types that mean money entering victim's app (inflow) IN_TYPES = {"转入", "收款", "充值"} def _is_out(t: TransactionResponse) -> bool: return t.transaction_type in OUT_TYPES or "转出" in (t.transaction_type or "") or "消费" in (t.transaction_type or "") def _is_in(t: TransactionResponse) -> bool: return t.transaction_type in IN_TYPES or "转入" in (t.transaction_type or "") or "收款" in (t.transaction_type or "") def _node_id(app_or_counterparty: str, kind: str) -> str: """Generate stable node id; kind in ('victim_app', 'counterparty').""" import hashlib safe = (app_or_counterparty or "").strip() or "unknown" h = hashlib.sha256(f"{kind}:{safe}".encode()).hexdigest()[:12] return f"{kind}_{h}" def build_flow_graph(transactions: list[TransactionResponse]) -> tuple[FlowGraphResponse, AnalysisSummaryResponse]: """ Build directed graph and summary from transaction list. Node: victim's app (app_source when outflow) or counterparty (counterparty_name or counterparty_account). Edge: source -> target with total amount and count. """ out_by_app: dict[str, Decimal] = defaultdict(Decimal) in_by_app: dict[str, Decimal] = defaultdict(Decimal) total_out = Decimal(0) total_in = Decimal(0) counterparties: set[str] = set() # (source_id, target_id) -> (amount, count) edges_agg: dict[tuple[str, str], tuple[Decimal, int]] = defaultdict(lambda: (Decimal(0), 0)) node_labels: dict[str, str] = {} node_types: dict[str, str] = {} for t in transactions: amount = t.amount if isinstance(t.amount, Decimal) else Decimal(str(t.amount)) app = (t.app_source or "").strip() or "未知APP" counterparty = (t.counterparty_name or t.counterparty_account or "未知对方").strip() or "未知对方" counterparties.add(counterparty) victim_node_id = _node_id(app, "victim_app") node_labels[victim_node_id] = app node_types[victim_node_id] = "victim_app" cp_node_id = _node_id(counterparty, "counterparty") node_labels[cp_node_id] = counterparty node_types[cp_node_id] = "counterparty" if _is_out(t): out_by_app[app] += amount total_out += amount key = (victim_node_id, cp_node_id) am, cnt = edges_agg[key] edges_agg[key] = (am + amount, cnt + 1) elif _is_in(t): in_by_app[app] += amount total_in += amount key = (cp_node_id, victim_node_id) am, cnt = edges_agg[key] edges_agg[key] = (am + amount, cnt + 1) all_apps = set(out_by_app.keys()) | set(in_by_app.keys()) by_app = { app: AppSummary( in_amount=in_by_app.get(app, Decimal(0)), out_amount=out_by_app.get(app, Decimal(0)), ) for app in all_apps } summary = AnalysisSummaryResponse( total_out=total_out, total_in=total_in, net_loss=total_out - total_in, by_app=by_app, counterparty_count=len(counterparties), ) nodes = [ FlowNode(id=nid, label=node_labels[nid], type=node_types.get(nid)) for nid in node_labels ] edges = [ FlowEdge(source=src, target=tgt, amount=am, count=cnt) for (src, tgt), (am, cnt) in edges_agg.items() ] graph = FlowGraphResponse(nodes=nodes, edges=edges) return graph, summary