Files
mcm-mfp/two_stop_allocation.py
2026-01-18 17:06:19 +08:00

186 lines
5.5 KiB
Python

"""
Optimize first-stop allocation for ordered two-stop pairs using Monte Carlo.
"""
from __future__ import annotations
import argparse
import os
from typing import Dict, Tuple
import numpy as np
import pandas as pd
DEFAULT_INPUT = "data/candidate_pairs_k6_cap250.csv"
DEFAULT_OUTPUT = "data/ordered_pairs_allocation_k6_cap250.csv"
ALPHA = 0.6
BETA = 0.2
GAMMA = 0.0
N_SIMS = 2000
RANDOM_SEED = 606
def _simulate_pair(
mu_i: float,
sigma_i: float,
mu_j: float,
sigma_j: float,
*,
capacity: float,
alpha: float,
beta: float,
gamma: float,
step: int,
n_sims: int,
seed: int,
) -> Dict[str, float]:
rng = np.random.default_rng(seed)
d_i = rng.normal(mu_i, sigma_i, size=n_sims)
d_j = rng.normal(mu_j, sigma_j, size=n_sims)
d_i = np.clip(d_i, 0, None)
d_j = np.clip(d_j, 0, None)
total_d = d_i + d_j
total_d_safe = np.maximum(total_d, 1.0)
d_i_safe = np.maximum(d_i, 1.0)
d_j_safe = np.maximum(d_j, 1.0)
best_q = 0
best_score = -1e9
best_metrics: Tuple[float, ...] = ()
for q in range(0, int(capacity) + 1, step):
s_i = np.minimum(d_i, float(q))
remaining = capacity - s_i
s_j = np.minimum(d_j, remaining)
served = s_i + s_j
base = served / total_d_safe
unmet = np.maximum(0.0, total_d - served) / total_d_safe
waste = (capacity - served) / capacity
ratio_i = s_i / d_i_safe
ratio_j = s_j / d_j_safe
fairness = np.abs(ratio_i - ratio_j)
score = base - alpha * unmet - beta * waste - gamma * fairness
score_mean = float(np.mean(score))
if score_mean > best_score:
best_score = score_mean
best_q = q
best_metrics = (
float(np.mean(s_i)),
float(np.mean(s_j)),
float(np.mean(served)),
float(np.mean(unmet)),
float(np.mean(waste)),
float(np.mean(ratio_i)),
float(np.mean(ratio_j)),
float(np.mean(fairness)),
)
return {
"q_opt": float(best_q),
"score_mean": float(best_score),
"served_i_mean": best_metrics[0],
"served_j_mean": best_metrics[1],
"served_total_mean": best_metrics[2],
"unmet_mean": best_metrics[3],
"waste_mean": best_metrics[4],
"ratio_i_mean": best_metrics[5],
"ratio_j_mean": best_metrics[6],
"fairness_mean": best_metrics[7],
}
def _directional_row(row: pd.Series, forward: bool) -> Dict[str, float]:
if forward:
return {
"site_i_idx": int(row["site_i_idx"]),
"site_i_name": row["site_i_name"],
"site_j_idx": int(row["site_j_idx"]),
"site_j_name": row["site_j_name"],
"distance_miles": float(row["distance_miles"]),
"mu_i": float(row["mu_i"]),
"sigma_i": float(row["sigma_i"]),
"mu_j": float(row["mu_j"]),
"sigma_j": float(row["sigma_j"]),
}
return {
"site_i_idx": int(row["site_j_idx"]),
"site_i_name": row["site_j_name"],
"site_j_idx": int(row["site_i_idx"]),
"site_j_name": row["site_i_name"],
"distance_miles": float(row["distance_miles"]),
"mu_i": float(row["mu_j"]),
"sigma_i": float(row["sigma_j"]),
"mu_j": float(row["mu_i"]),
"sigma_j": float(row["sigma_i"]),
}
def main() -> None:
parser = argparse.ArgumentParser(
description="Optimize first-stop allocation for ordered two-stop pairs."
)
parser.add_argument("--input", default=DEFAULT_INPUT)
parser.add_argument("--output", default=DEFAULT_OUTPUT)
parser.add_argument("--capacity", type=float, default=250.0)
parser.add_argument("--alpha", type=float, default=ALPHA)
parser.add_argument("--beta", type=float, default=BETA)
parser.add_argument("--gamma", type=float, default=GAMMA)
parser.add_argument("--step", type=int, default=1, help="Grid step for q in [0, C].")
parser.add_argument("--n-sims", type=int, default=N_SIMS)
parser.add_argument("--seed", type=int, default=RANDOM_SEED)
args = parser.parse_args()
df = pd.read_csv(args.input)
required = {
"site_i_idx",
"site_i_name",
"site_j_idx",
"site_j_name",
"distance_miles",
"mu_i",
"sigma_i",
"mu_j",
"sigma_j",
}
missing = required.difference(df.columns)
if missing:
raise ValueError(f"Missing columns in input: {sorted(missing)}")
rows = []
for idx, row in df.iterrows():
for forward in (True, False):
base = _directional_row(row, forward)
seed = int(args.seed) + int(idx) * 2 + (0 if forward else 1)
metrics = _simulate_pair(
base["mu_i"],
base["sigma_i"],
base["mu_j"],
base["sigma_j"],
capacity=args.capacity,
alpha=args.alpha,
beta=args.beta,
gamma=args.gamma,
step=args.step,
n_sims=args.n_sims,
seed=seed,
)
rows.append({**base, **metrics})
out = pd.DataFrame(rows)
out = out.sort_values(["site_i_idx", "site_j_idx"]).reset_index(drop=True)
os.makedirs(os.path.dirname(args.output), exist_ok=True)
out.to_csv(args.output, index=False)
print(f"Saved {len(out)} ordered pairs to {args.output}")
if __name__ == "__main__":
main()