Files
mcm-mfp/candidate_pairs.py
2026-01-18 17:06:19 +08:00

154 lines
4.9 KiB
Python

"""
Generate candidate two-stop site pairs using k-nearest Manhattan distance
and a capacity filter on average demand.
"""
from __future__ import annotations
import argparse
import os
from typing import Dict, List, Tuple
import numpy as np
import pandas as pd
DEFAULT_INPUT = "prob/MFP Regular Sites 2019.xlsx"
DEFAULT_OUTPUT = "data/candidate_pairs_k6_cap250.csv"
def _find_col(df: pd.DataFrame, candidates: List[str]) -> str:
for name in candidates:
if name in df.columns:
return name
lower_map = {c.lower(): c for c in df.columns}
for name in candidates:
key = name.lower()
if key in lower_map:
return lower_map[key]
raise ValueError(f"Missing required column. Tried: {candidates}")
def _load_sites(path: str) -> pd.DataFrame:
df = pd.read_excel(path)
col_site = _find_col(df, ["Site Name", "site name", "site"])
col_lat = _find_col(df, ["latitude", "lat"])
col_lon = _find_col(df, ["longitude", "lon", "lng"])
col_mu = _find_col(df, ["Average Demand per Visit", "average demand per visit", "avg demand"])
col_sigma = _find_col(
df, ["StDev(Demand per Visit)", "stdev(demand per visit)", "stdev", "std"]
)
out = df[[col_site, col_lat, col_lon, col_mu, col_sigma]].copy()
out.columns = ["site_name", "latitude", "longitude", "mu", "sigma"]
out["latitude"] = pd.to_numeric(out["latitude"], errors="coerce")
out["longitude"] = pd.to_numeric(out["longitude"], errors="coerce")
out["mu"] = pd.to_numeric(out["mu"], errors="coerce")
out["sigma"] = pd.to_numeric(out["sigma"], errors="coerce").fillna(0.0)
if out[["latitude", "longitude", "mu"]].isna().any().any():
missing = out[out[["latitude", "longitude", "mu"]].isna().any(axis=1)]
raise ValueError(f"Missing lat/lon/mu for {len(missing)} rows.")
out = out.reset_index(drop=True)
out["site_idx"] = np.arange(1, len(out) + 1, dtype=int)
return out
def _manhattan_miles(lat: np.ndarray, lon: np.ndarray) -> np.ndarray:
lat0 = float(np.mean(lat))
lat_scale = 69.0
lon_scale = 69.0 * float(np.cos(np.deg2rad(lat0)))
dlat = np.abs(lat[:, None] - lat[None, :]) * lat_scale
dlon = np.abs(lon[:, None] - lon[None, :]) * lon_scale
dist = dlat + dlon
np.fill_diagonal(dist, np.inf)
return dist
def generate_pairs(
df: pd.DataFrame,
*,
k: int,
capacity: float,
) -> pd.DataFrame:
if k <= 0:
raise ValueError("k must be > 0")
lat = df["latitude"].to_numpy(dtype=float)
lon = df["longitude"].to_numpy(dtype=float)
mu = df["mu"].to_numpy(dtype=float)
sigma = df["sigma"].to_numpy(dtype=float)
dist = _manhattan_miles(lat, lon)
pair_rows: List[Dict[str, float]] = []
seen: set[Tuple[int, int]] = set()
for i in range(len(df)):
nn_idx = np.argsort(dist[i])[:k]
for j in nn_idx:
if mu[i] + mu[j] > capacity:
continue
a, b = (i, j) if i < j else (j, i)
if (a, b) in seen:
continue
seen.add((a, b))
mu_a = float(mu[a])
mu_b = float(mu[b])
sigma_a = float(sigma[a])
sigma_b = float(sigma[b])
ratio = mu_a / mu_b if mu_b > 0 else np.inf
pair_rows.append(
{
"site_i_idx": int(df.loc[a, "site_idx"]),
"site_i_name": df.loc[a, "site_name"],
"site_j_idx": int(df.loc[b, "site_idx"]),
"site_j_name": df.loc[b, "site_name"],
"distance_miles": float(dist[a, b]),
"lat_i": float(lat[a]),
"lon_i": float(lon[a]),
"lat_j": float(lat[b]),
"lon_j": float(lon[b]),
"mu_i": mu_a,
"sigma_i": sigma_a,
"mu_j": mu_b,
"sigma_j": sigma_b,
"sum_mu": mu_a + mu_b,
"mu_ratio": ratio,
"sum_sigma": sigma_a + sigma_b,
}
)
out = pd.DataFrame(pair_rows)
if len(out) > 0:
out = out.sort_values(["distance_miles", "sum_mu"]).reset_index(drop=True)
return out
def main() -> None:
parser = argparse.ArgumentParser(
description="Generate candidate two-stop site pairs from MFP dataset."
)
parser.add_argument("--input", default=DEFAULT_INPUT)
parser.add_argument("--k", type=int, default=6, help="k nearest neighbors per site.")
parser.add_argument("--capacity", type=float, default=250.0)
parser.add_argument("--output", default=DEFAULT_OUTPUT)
args = parser.parse_args()
df = _load_sites(args.input)
pairs = generate_pairs(df, k=args.k, capacity=args.capacity)
os.makedirs(os.path.dirname(args.output), exist_ok=True)
pairs.to_csv(args.output, index=False)
print(f"Saved {len(pairs)} candidate pairs to {args.output}")
if __name__ == "__main__":
main()