""" Generate candidate two-stop site pairs using k-nearest Manhattan distance and a capacity filter on average demand. """ from __future__ import annotations import argparse import os from typing import Dict, List, Tuple import numpy as np import pandas as pd DEFAULT_INPUT = "prob/MFP Regular Sites 2019.xlsx" DEFAULT_OUTPUT = "data/candidate_pairs_k6_cap250.csv" def _find_col(df: pd.DataFrame, candidates: List[str]) -> str: for name in candidates: if name in df.columns: return name lower_map = {c.lower(): c for c in df.columns} for name in candidates: key = name.lower() if key in lower_map: return lower_map[key] raise ValueError(f"Missing required column. Tried: {candidates}") def _load_sites(path: str) -> pd.DataFrame: df = pd.read_excel(path) col_site = _find_col(df, ["Site Name", "site name", "site"]) col_lat = _find_col(df, ["latitude", "lat"]) col_lon = _find_col(df, ["longitude", "lon", "lng"]) col_mu = _find_col(df, ["Average Demand per Visit", "average demand per visit", "avg demand"]) col_sigma = _find_col( df, ["StDev(Demand per Visit)", "stdev(demand per visit)", "stdev", "std"] ) out = df[[col_site, col_lat, col_lon, col_mu, col_sigma]].copy() out.columns = ["site_name", "latitude", "longitude", "mu", "sigma"] out["latitude"] = pd.to_numeric(out["latitude"], errors="coerce") out["longitude"] = pd.to_numeric(out["longitude"], errors="coerce") out["mu"] = pd.to_numeric(out["mu"], errors="coerce") out["sigma"] = pd.to_numeric(out["sigma"], errors="coerce").fillna(0.0) if out[["latitude", "longitude", "mu"]].isna().any().any(): missing = out[out[["latitude", "longitude", "mu"]].isna().any(axis=1)] raise ValueError(f"Missing lat/lon/mu for {len(missing)} rows.") out = out.reset_index(drop=True) out["site_idx"] = np.arange(1, len(out) + 1, dtype=int) return out def _manhattan_miles(lat: np.ndarray, lon: np.ndarray) -> np.ndarray: lat0 = float(np.mean(lat)) lat_scale = 69.0 lon_scale = 69.0 * float(np.cos(np.deg2rad(lat0))) dlat = np.abs(lat[:, None] - lat[None, :]) * lat_scale dlon = np.abs(lon[:, None] - lon[None, :]) * lon_scale dist = dlat + dlon np.fill_diagonal(dist, np.inf) return dist def generate_pairs( df: pd.DataFrame, *, k: int, capacity: float, ) -> pd.DataFrame: if k <= 0: raise ValueError("k must be > 0") lat = df["latitude"].to_numpy(dtype=float) lon = df["longitude"].to_numpy(dtype=float) mu = df["mu"].to_numpy(dtype=float) sigma = df["sigma"].to_numpy(dtype=float) dist = _manhattan_miles(lat, lon) pair_rows: List[Dict[str, float]] = [] seen: set[Tuple[int, int]] = set() for i in range(len(df)): nn_idx = np.argsort(dist[i])[:k] for j in nn_idx: if mu[i] + mu[j] > capacity: continue a, b = (i, j) if i < j else (j, i) if (a, b) in seen: continue seen.add((a, b)) mu_a = float(mu[a]) mu_b = float(mu[b]) sigma_a = float(sigma[a]) sigma_b = float(sigma[b]) ratio = mu_a / mu_b if mu_b > 0 else np.inf pair_rows.append( { "site_i_idx": int(df.loc[a, "site_idx"]), "site_i_name": df.loc[a, "site_name"], "site_j_idx": int(df.loc[b, "site_idx"]), "site_j_name": df.loc[b, "site_name"], "distance_miles": float(dist[a, b]), "lat_i": float(lat[a]), "lon_i": float(lon[a]), "lat_j": float(lat[b]), "lon_j": float(lon[b]), "mu_i": mu_a, "sigma_i": sigma_a, "mu_j": mu_b, "sigma_j": sigma_b, "sum_mu": mu_a + mu_b, "mu_ratio": ratio, "sum_sigma": sigma_a + sigma_b, } ) out = pd.DataFrame(pair_rows) if len(out) > 0: out = out.sort_values(["distance_miles", "sum_mu"]).reset_index(drop=True) return out def main() -> None: parser = argparse.ArgumentParser( description="Generate candidate two-stop site pairs from MFP dataset." ) parser.add_argument("--input", default=DEFAULT_INPUT) parser.add_argument("--k", type=int, default=6, help="k nearest neighbors per site.") parser.add_argument("--capacity", type=float, default=250.0) parser.add_argument("--output", default=DEFAULT_OUTPUT) args = parser.parse_args() df = _load_sites(args.input) pairs = generate_pairs(df, k=args.k, capacity=args.capacity) os.makedirs(os.path.dirname(args.output), exist_ok=True) pairs.to_csv(args.output, index=False) print(f"Saved {len(pairs)} candidate pairs to {args.output}") if __name__ == "__main__": main()