""" Optimize first-stop allocation for ordered two-stop pairs using Monte Carlo. """ from __future__ import annotations import argparse import os from typing import Dict, Tuple import numpy as np import pandas as pd DEFAULT_INPUT = "data/candidate_pairs_k6_cap250.csv" DEFAULT_OUTPUT = "data/ordered_pairs_allocation_k6_cap250.csv" ALPHA = 0.6 BETA = 0.2 GAMMA = 0.0 N_SIMS = 2000 RANDOM_SEED = 606 def _simulate_pair( mu_i: float, sigma_i: float, mu_j: float, sigma_j: float, *, capacity: float, alpha: float, beta: float, gamma: float, step: int, n_sims: int, seed: int, ) -> Dict[str, float]: rng = np.random.default_rng(seed) d_i = rng.normal(mu_i, sigma_i, size=n_sims) d_j = rng.normal(mu_j, sigma_j, size=n_sims) d_i = np.clip(d_i, 0, None) d_j = np.clip(d_j, 0, None) total_d = d_i + d_j total_d_safe = np.maximum(total_d, 1.0) d_i_safe = np.maximum(d_i, 1.0) d_j_safe = np.maximum(d_j, 1.0) best_q = 0 best_score = -1e9 best_metrics: Tuple[float, ...] = () for q in range(0, int(capacity) + 1, step): s_i = np.minimum(d_i, float(q)) remaining = capacity - s_i s_j = np.minimum(d_j, remaining) served = s_i + s_j base = served / total_d_safe unmet = np.maximum(0.0, total_d - served) / total_d_safe waste = (capacity - served) / capacity ratio_i = s_i / d_i_safe ratio_j = s_j / d_j_safe fairness = np.abs(ratio_i - ratio_j) score = base - alpha * unmet - beta * waste - gamma * fairness score_mean = float(np.mean(score)) if score_mean > best_score: best_score = score_mean best_q = q best_metrics = ( float(np.mean(s_i)), float(np.mean(s_j)), float(np.mean(served)), float(np.mean(unmet)), float(np.mean(waste)), float(np.mean(ratio_i)), float(np.mean(ratio_j)), float(np.mean(fairness)), ) return { "q_opt": float(best_q), "score_mean": float(best_score), "served_i_mean": best_metrics[0], "served_j_mean": best_metrics[1], "served_total_mean": best_metrics[2], "unmet_mean": best_metrics[3], "waste_mean": best_metrics[4], "ratio_i_mean": best_metrics[5], "ratio_j_mean": best_metrics[6], "fairness_mean": best_metrics[7], } def _directional_row(row: pd.Series, forward: bool) -> Dict[str, float]: if forward: return { "site_i_idx": int(row["site_i_idx"]), "site_i_name": row["site_i_name"], "site_j_idx": int(row["site_j_idx"]), "site_j_name": row["site_j_name"], "distance_miles": float(row["distance_miles"]), "mu_i": float(row["mu_i"]), "sigma_i": float(row["sigma_i"]), "mu_j": float(row["mu_j"]), "sigma_j": float(row["sigma_j"]), } return { "site_i_idx": int(row["site_j_idx"]), "site_i_name": row["site_j_name"], "site_j_idx": int(row["site_i_idx"]), "site_j_name": row["site_i_name"], "distance_miles": float(row["distance_miles"]), "mu_i": float(row["mu_j"]), "sigma_i": float(row["sigma_j"]), "mu_j": float(row["mu_i"]), "sigma_j": float(row["sigma_i"]), } def main() -> None: parser = argparse.ArgumentParser( description="Optimize first-stop allocation for ordered two-stop pairs." ) parser.add_argument("--input", default=DEFAULT_INPUT) parser.add_argument("--output", default=DEFAULT_OUTPUT) parser.add_argument("--capacity", type=float, default=250.0) parser.add_argument("--alpha", type=float, default=ALPHA) parser.add_argument("--beta", type=float, default=BETA) parser.add_argument("--gamma", type=float, default=GAMMA) parser.add_argument("--step", type=int, default=1, help="Grid step for q in [0, C].") parser.add_argument("--n-sims", type=int, default=N_SIMS) parser.add_argument("--seed", type=int, default=RANDOM_SEED) args = parser.parse_args() df = pd.read_csv(args.input) required = { "site_i_idx", "site_i_name", "site_j_idx", "site_j_name", "distance_miles", "mu_i", "sigma_i", "mu_j", "sigma_j", } missing = required.difference(df.columns) if missing: raise ValueError(f"Missing columns in input: {sorted(missing)}") rows = [] for idx, row in df.iterrows(): for forward in (True, False): base = _directional_row(row, forward) seed = int(args.seed) + int(idx) * 2 + (0 if forward else 1) metrics = _simulate_pair( base["mu_i"], base["sigma_i"], base["mu_j"], base["sigma_j"], capacity=args.capacity, alpha=args.alpha, beta=args.beta, gamma=args.gamma, step=args.step, n_sims=args.n_sims, seed=seed, ) rows.append({**base, **metrics}) out = pd.DataFrame(rows) out = out.sort_values(["site_i_idx", "site_j_idx"]).reset_index(drop=True) os.makedirs(os.path.dirname(args.output), exist_ok=True) out.to_csv(args.output, index=False) print(f"Saved {len(out)} ordered pairs to {args.output}") if __name__ == "__main__": main()