""" Task 3 - Step 8: 可视化(Fig.1/2/4/5/6) ===================================== 输入: - 01_distance.xlsx (sites: 经纬度、mu/sigma) - 02_pairing.xlsx (selected_pairs: 配对连线) - 03_allocation.xlsx (allocation: q*, E_total) - 05_calendar.xlsx (calendar: 365天×2槽位排程) - 06_evaluate.xlsx (pair_risk: 缺口概率分布) 输出: - figures/fig1_pairing_map.png - figures/fig2_allocation_scatter.png - figures/fig6_fairness_satisfaction.png - figures/fig4_calendar_heatmap.png - figures/fig5_risk_distribution.png """ from __future__ import annotations from dataclasses import dataclass import json from pathlib import Path import os import tempfile import numpy as np import pandas as pd _mpl_config_dir = Path(tempfile.gettempdir()) / "mm_task3_mplconfig" _xdg_cache_dir = Path(tempfile.gettempdir()) / "mm_task3_xdg_cache" _mpl_config_dir.mkdir(parents=True, exist_ok=True) _xdg_cache_dir.mkdir(parents=True, exist_ok=True) os.environ.setdefault("MPLCONFIGDIR", str(_mpl_config_dir)) os.environ.setdefault("XDG_CACHE_HOME", str(_xdg_cache_dir)) import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt from matplotlib import colors as mcolors from cycler import cycler BASE_DIR = Path(__file__).resolve().parent MORANDI = { "grid": "#D7D1C7", "text": "#3F3F3F", "muted_blue": "#8FA3A7", "muted_blue_dark": "#6E858A", "sage": "#AEB8A6", "clay": "#C6AA9B", "rose": "#D2B8B3", "warm_gray": "#8C857A", "terracotta": "#B07A6A", } FIG_BG = "#FFFFFF" FIG_GRID = "#DADDE1" def _apply_morandi_style() -> None: plt.rcParams.update( { "figure.facecolor": FIG_BG, "axes.facecolor": FIG_BG, "savefig.facecolor": FIG_BG, "axes.edgecolor": FIG_GRID, "axes.labelcolor": MORANDI["text"], "xtick.color": MORANDI["text"], "ytick.color": MORANDI["text"], "text.color": MORANDI["text"], "grid.color": FIG_GRID, "grid.alpha": 0.55, "axes.grid": True, "axes.titleweight": "semibold", "axes.titlesize": 11, "axes.labelsize": 10, "legend.frameon": True, "legend.framealpha": 0.9, "legend.facecolor": FIG_BG, "legend.edgecolor": FIG_GRID, "axes.prop_cycle": cycler( "color", [ MORANDI["muted_blue"], MORANDI["sage"], MORANDI["clay"], MORANDI["rose"], MORANDI["warm_gray"], ], ), } ) @dataclass(frozen=True) class Paths: distance_xlsx: Path pairing_xlsx: Path allocation_xlsx: Path calendar_xlsx: Path evaluate_xlsx: Path figures_dir: Path def _default_paths() -> Paths: return Paths( distance_xlsx=BASE_DIR / "01_distance.xlsx", pairing_xlsx=BASE_DIR / "02_pairing.xlsx", allocation_xlsx=BASE_DIR / "03_allocation.xlsx", calendar_xlsx=BASE_DIR / "05_calendar.xlsx", evaluate_xlsx=BASE_DIR / "06_evaluate.xlsx", figures_dir=BASE_DIR / "figures", ) def _require_file(path: Path) -> None: if not path.exists(): raise FileNotFoundError( f"Missing required file: {path}. Run the earlier steps first (01~06)." ) def _pair_key(a: int, b: int) -> tuple[int, int]: return (a, b) if a <= b else (b, a) def fig1_pairing_map(paths: Paths) -> Path: _apply_morandi_style() # Warmer + higher contrast for Fig.1 specifically (clearer on paper/screens) fig1_colors = { "paired": "#e76f51", "unpaired": "#6c757d", "link": "#bc6c25", } sites = pd.read_excel(paths.distance_xlsx, sheet_name="sites") pairs = pd.read_excel(paths.pairing_xlsx, sheet_name="selected_pairs") sites = sites.copy() sites["site_id"] = sites["site_id"].astype(int) paired_ids = set(pairs["site_i_id"].astype(int)).union( set(pairs["site_j_id"].astype(int)) ) sites["is_paired"] = sites["site_id"].isin(paired_ids) fig, ax = plt.subplots(figsize=(8.5, 6.5), dpi=200) for _, row in pairs.iterrows(): i = int(row["site_i_id"]) j = int(row["site_j_id"]) si = sites.loc[sites["site_id"] == i].iloc[0] sj = sites.loc[sites["site_id"] == j].iloc[0] ax.plot( [si["lon"], sj["lon"]], [si["lat"], sj["lat"]], color=fig1_colors["link"], alpha=0.5, linewidth=1.2, zorder=1, ) paired = sites[sites["is_paired"]] unpaired = sites[~sites["is_paired"]] ax.scatter( paired["lon"], paired["lat"], s=18, color=fig1_colors["paired"], alpha=0.9, label=f"Paired sites (n={len(paired)})", zorder=2, ) if len(unpaired) > 0: ax.scatter( unpaired["lon"], unpaired["lat"], s=22, color=fig1_colors["unpaired"], alpha=0.9, marker="x", label=f"Unpaired sites (n={len(unpaired)})", zorder=3, ) ax.set_title(f"Fig.1 Pairing map (sites + selected n={len(pairs)} links)") ax.set_xlabel("Longitude") ax.set_ylabel("Latitude") ax.grid(True, alpha=0.55) ax.legend(loc="best", frameon=True) out = paths.figures_dir / "fig1_pairing_map.png" fig.tight_layout() fig.savefig(out, facecolor=FIG_BG) plt.close(fig) return out def export_fig1_points_js(paths: Paths) -> Path: sites = pd.read_excel(paths.distance_xlsx, sheet_name="sites").copy() pairs = pd.read_excel(paths.pairing_xlsx, sheet_name="selected_pairs").copy() sites["site_id"] = sites["site_id"].astype(int) sites["k"] = sites["k"].astype(int) paired_ids = set(pairs["site_i_id"].astype(int)).union( set(pairs["site_j_id"].astype(int)) ) points = [] for _, r in sites.iterrows(): points.append( { "site_id": int(r["site_id"]), "site_name": str(r["site_name"]), "lat": float(r["lat"]), "lng": float(r["lon"]), "mu": float(r["mu"]), "sigma": float(r["sigma"]), "k": int(r["k"]), "is_paired": int(r["site_id"]) in paired_ids, } ) links = [] for _, r in pairs.iterrows(): links.append( { "site_i_id": int(r["site_i_id"]), "site_j_id": int(r["site_j_id"]), "site_i_name": str(r["site_i_name"]), "site_j_name": str(r["site_j_name"]), "distance": float(r["distance"]), } ) out = BASE_DIR / "fig1_points.js" payload = ( "// Auto-generated from `task3/01_distance.xlsx` (sites) + `task3/02_pairing.xlsx` (selected_pairs)\n" "// Usage: open `task3/fig1_carto.html` in a browser (same directory must contain this file).\n" f"window.FIG1_POINTS = {json.dumps(points, ensure_ascii=False, separators=(',', ':'))};\n" f"window.FIG1_LINKS = {json.dumps(links, ensure_ascii=False, separators=(',', ':'))};\n" ) out.write_text(payload, encoding="utf-8") return out def fig2_allocation_scatter(paths: Paths) -> Path: _apply_morandi_style() df = pd.read_excel(paths.allocation_xlsx, sheet_name="allocation") q_ratio = df["q_ratio"].to_numpy(dtype=float) mu_share = (df["mu_i"] / (df["mu_i"] + df["mu_j"])).to_numpy(dtype=float) sigma_share = (df["sigma_i"] / (df["sigma_i"] + df["sigma_j"])).to_numpy(dtype=float) sigma_j_share = 1.0 - sigma_share fig, axes = plt.subplots(1, 3, figsize=(12, 3.8), dpi=200, sharey=True) panels = [ ("$q^*/Q$ vs $\\mu_i/(\\mu_i+\\mu_j)$", mu_share), ("$q^*/Q$ vs $\\sigma_i/(\\sigma_i+\\sigma_j)$", sigma_share), ("$q^*/Q$ vs $\\sigma_j/(\\sigma_i+\\sigma_j)$", sigma_j_share), ] for ax, (title, x) in zip(axes, panels): ax.scatter( x, q_ratio, s=22, alpha=0.8, color=MORANDI["muted_blue"], edgecolor="none", ) if np.isfinite(x).all() and np.isfinite(q_ratio).all() and len(x) >= 2: coef = np.polyfit(x, q_ratio, 1) xx = np.linspace(float(np.min(x)), float(np.max(x)), 100) yy = coef[0] * xx + coef[1] ax.plot(xx, yy, color=MORANDI["terracotta"], linewidth=2.0, alpha=0.95) ax.set_title(title) ax.set_xlabel("x") ax.grid(True, alpha=0.55) axes[0].set_ylabel("$q^*/Q$") fig.suptitle(f"Fig.2 Allocation strategy scatter (n={len(df)} pairs)", y=1.02) fig.tight_layout() out = paths.figures_dir / "fig2_allocation_scatter.png" fig.savefig(out, bbox_inches="tight", facecolor=FIG_BG) plt.close(fig) return out def fig4_calendar_heatmap(paths: Paths) -> Path: _apply_morandi_style() sites = pd.read_excel(paths.distance_xlsx, sheet_name="sites")[["site_id", "mu"]] sites["site_id"] = sites["site_id"].astype(int) mu_by_id = dict(zip(sites["site_id"], sites["mu"])) alloc = pd.read_excel(paths.allocation_xlsx, sheet_name="allocation")[ ["site_i_id", "site_j_id", "E_total"] ].copy() alloc["site_i_id"] = alloc["site_i_id"].astype(int) alloc["site_j_id"] = alloc["site_j_id"].astype(int) pair_E = {_pair_key(i, j): float(e) for i, j, e in alloc.to_numpy()} cal = pd.read_excel(paths.calendar_xlsx, sheet_name="calendar") expected = np.full((365, 2), np.nan, dtype=float) for idx, row in cal.iterrows(): day = int(row["day"]) for slot in (1, 2): typ = str(row[f"slot_{slot}_type"]).strip().lower() i = row[f"slot_{slot}_site_i"] j = row.get(f"slot_{slot}_site_j", np.nan) if pd.isna(i): continue i_id = int(i) if typ == "single": expected[day - 1, slot - 1] = float(mu_by_id.get(i_id, np.nan)) elif typ == "dual": if pd.isna(j): continue j_id = int(j) expected[day - 1, slot - 1] = pair_E.get(_pair_key(i_id, j_id), np.nan) fig, ax = plt.subplots(figsize=(12, 2.6), dpi=200) cmap = mcolors.LinearSegmentedColormap.from_list( "morandi", [ FIG_BG, "#E7E1D7", MORANDI["sage"], MORANDI["muted_blue"], "#5F7478", ], ) im = ax.imshow(expected.T, aspect="auto", cmap=cmap) ax.set_yticks([0, 1], labels=["Slot 1", "Slot 2"]) ax.set_xlabel("Day of year") ax.set_title("Fig.4 Calendar heatmap (expected service per slot)") month_days = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31] month_starts = np.cumsum([0] + month_days[:-1]) # 0-based month_labels = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"] ax.set_xticks(month_starts, labels=month_labels) cbar = fig.colorbar(im, ax=ax, fraction=0.03, pad=0.02) cbar.set_label("Expected service") fig.tight_layout() out = paths.figures_dir / "fig4_calendar_heatmap.png" fig.savefig(out, bbox_inches="tight", facecolor=FIG_BG) plt.close(fig) return out def fig5_risk_distribution(paths: Paths) -> Path: _apply_morandi_style() risk = pd.read_excel(paths.evaluate_xlsx, sheet_name="pair_risk").copy() p = risk["shortfall_prob_either"].to_numpy(dtype=float) p = p[np.isfinite(p)] fig, axes = plt.subplots(1, 2, figsize=(11, 3.8), dpi=200) ax = axes[0] ax.hist(p, bins=10, color=MORANDI["sage"], alpha=0.9, edgecolor=FIG_BG) ax.axvline( float(np.mean(p)), color=MORANDI["terracotta"], linewidth=2.2, label=f"mean={np.mean(p):.3f}", ) ax.set_title("Histogram") ax.set_xlabel("Shortfall probability (either site)") ax.set_ylabel("Count") ax.grid(True, alpha=0.55) ax.legend(loc="best", frameon=True) ax = axes[1] p_sorted = np.sort(p)[::-1] ax.plot( np.arange(1, len(p_sorted) + 1), p_sorted, marker="o", markersize=3.2, linewidth=1.6, color=MORANDI["muted_blue_dark"], markerfacecolor=MORANDI["muted_blue"], markeredgecolor=FIG_BG, markeredgewidth=0.6, ) ax.set_title("Sorted by risk (descending)") ax.set_xlabel("Pair rank") ax.set_ylabel("Shortfall probability") ax.grid(True, alpha=0.55) fig.suptitle(f"Fig.5 Risk distribution across n={len(p)} pairs", y=1.02) fig.tight_layout() out = paths.figures_dir / "fig5_risk_distribution.png" fig.savefig(out, bbox_inches="tight", facecolor=FIG_BG) plt.close(fig) return out def fig6_fairness_satisfaction(paths: Paths) -> Path: _apply_morandi_style() df = pd.read_excel(paths.evaluate_xlsx, sheet_name="site_satisfaction").copy() r = df["satisfaction_rate_r"].to_numpy(dtype=float) r = r[np.isfinite(r)] r_sorted = np.sort(r) # Lorenz curve (site share vs satisfaction share) n = len(r_sorted) if n == 0 or np.sum(r_sorted) <= 0: x = np.array([0.0, 1.0]) y = np.array([0.0, 1.0]) gini = 0.0 else: cum = np.cumsum(r_sorted) x = np.concatenate([[0.0], np.arange(1, n + 1) / n]) y = np.concatenate([[0.0], cum / cum[-1]]) # Gini via Lorenz area gini = 1.0 - 2.0 * float(np.trapezoid(y, x)) fig, axes = plt.subplots(1, 2, figsize=(11, 3.8), dpi=200) ax = axes[0] ax.hist(r, bins=12, color=MORANDI["muted_blue"], alpha=0.9, edgecolor=FIG_BG) ax.axvline(float(np.mean(r)), color=MORANDI["terracotta"], linewidth=2.2, label=f"mean={np.mean(r):.2f}") ax.set_title("Satisfaction rate distribution") ax.set_xlabel("Satisfaction rate $r_i$") ax.set_ylabel("Count") ax.grid(True, alpha=0.55) ax.legend(loc="best", frameon=True) ax = axes[1] ax.plot(x, y, color=MORANDI["sage"], linewidth=2.2, label="Lorenz curve") ax.plot([0, 1], [0, 1], color=MORANDI["warm_gray"], linestyle="--", linewidth=1.4, label="Equality line") ax.set_title(f"Lorenz curve (Gini={gini:.3f})") ax.set_xlabel("Cumulative share of sites") ax.set_ylabel("Cumulative share of satisfaction") ax.set_xlim(0, 1) ax.set_ylim(0, 1) ax.grid(True, alpha=0.55) ax.legend(loc="lower right", frameon=True) fig.suptitle(f"Fig.6 Fairness diagnostics (n={len(r)} sites)", y=1.02) fig.tight_layout() out = paths.figures_dir / "fig6_fairness_satisfaction.png" fig.savefig(out, bbox_inches="tight", facecolor=FIG_BG) plt.close(fig) return out def main() -> None: paths = _default_paths() for p in [ paths.distance_xlsx, paths.pairing_xlsx, paths.allocation_xlsx, paths.calendar_xlsx, paths.evaluate_xlsx, ]: _require_file(p) paths.figures_dir.mkdir(parents=True, exist_ok=True) print("=" * 60) print("Task 3 - Step 8: Visualization") print("=" * 60) out1 = fig1_pairing_map(paths) print(f"Saved: {out1.relative_to(BASE_DIR)}") out1_js = export_fig1_points_js(paths) print(f"Saved: {out1_js.relative_to(BASE_DIR)}") out2 = fig2_allocation_scatter(paths) print(f"Saved: {out2.relative_to(BASE_DIR)}") out6 = fig6_fairness_satisfaction(paths) print(f"Saved: {out6.relative_to(BASE_DIR)}") out4 = fig4_calendar_heatmap(paths) print(f"Saved: {out4.relative_to(BASE_DIR)}") out5 = fig5_risk_distribution(paths) print(f"Saved: {out5.relative_to(BASE_DIR)}") if __name__ == "__main__": main()