Files
mcm-mfp/task3/08_visualize.py
2026-01-20 01:55:46 +08:00

501 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Task 3 - Step 8: 可视化Fig.1/2/4/5/6
=====================================
输入:
- 01_distance.xlsx (sites: 经纬度、mu/sigma)
- 02_pairing.xlsx (selected_pairs: 配对连线)
- 03_allocation.xlsx (allocation: q*, E_total)
- 05_calendar.xlsx (calendar: 365天×2槽位排程)
- 06_evaluate.xlsx (pair_risk: 缺口概率分布)
输出:
- figures/fig1_pairing_map.png
- figures/fig2_allocation_scatter.png
- figures/fig6_fairness_satisfaction.png
- figures/fig4_calendar_heatmap.png
- figures/fig5_risk_distribution.png
"""
from __future__ import annotations
from dataclasses import dataclass
import json
from pathlib import Path
import os
import tempfile
import numpy as np
import pandas as pd
_mpl_config_dir = Path(tempfile.gettempdir()) / "mm_task3_mplconfig"
_xdg_cache_dir = Path(tempfile.gettempdir()) / "mm_task3_xdg_cache"
_mpl_config_dir.mkdir(parents=True, exist_ok=True)
_xdg_cache_dir.mkdir(parents=True, exist_ok=True)
os.environ.setdefault("MPLCONFIGDIR", str(_mpl_config_dir))
os.environ.setdefault("XDG_CACHE_HOME", str(_xdg_cache_dir))
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from matplotlib import colors as mcolors
from cycler import cycler
BASE_DIR = Path(__file__).resolve().parent
MORANDI = {
"grid": "#D7D1C7",
"text": "#3F3F3F",
"muted_blue": "#8FA3A7",
"muted_blue_dark": "#6E858A",
"sage": "#AEB8A6",
"clay": "#C6AA9B",
"rose": "#D2B8B3",
"warm_gray": "#8C857A",
"terracotta": "#B07A6A",
}
FIG_BG = "#FFFFFF"
FIG_GRID = "#DADDE1"
def _apply_morandi_style() -> None:
plt.rcParams.update(
{
"figure.facecolor": FIG_BG,
"axes.facecolor": FIG_BG,
"savefig.facecolor": FIG_BG,
"axes.edgecolor": FIG_GRID,
"axes.labelcolor": MORANDI["text"],
"xtick.color": MORANDI["text"],
"ytick.color": MORANDI["text"],
"text.color": MORANDI["text"],
"grid.color": FIG_GRID,
"grid.alpha": 0.55,
"axes.grid": True,
"axes.titleweight": "semibold",
"axes.titlesize": 11,
"axes.labelsize": 10,
"legend.frameon": True,
"legend.framealpha": 0.9,
"legend.facecolor": FIG_BG,
"legend.edgecolor": FIG_GRID,
"axes.prop_cycle": cycler(
"color",
[
MORANDI["muted_blue"],
MORANDI["sage"],
MORANDI["clay"],
MORANDI["rose"],
MORANDI["warm_gray"],
],
),
}
)
@dataclass(frozen=True)
class Paths:
distance_xlsx: Path
pairing_xlsx: Path
allocation_xlsx: Path
calendar_xlsx: Path
evaluate_xlsx: Path
figures_dir: Path
def _default_paths() -> Paths:
return Paths(
distance_xlsx=BASE_DIR / "01_distance.xlsx",
pairing_xlsx=BASE_DIR / "02_pairing.xlsx",
allocation_xlsx=BASE_DIR / "03_allocation.xlsx",
calendar_xlsx=BASE_DIR / "05_calendar.xlsx",
evaluate_xlsx=BASE_DIR / "06_evaluate.xlsx",
figures_dir=BASE_DIR / "figures",
)
def _require_file(path: Path) -> None:
if not path.exists():
raise FileNotFoundError(
f"Missing required file: {path}. Run the earlier steps first (01~06)."
)
def _pair_key(a: int, b: int) -> tuple[int, int]:
return (a, b) if a <= b else (b, a)
def fig1_pairing_map(paths: Paths) -> Path:
_apply_morandi_style()
# Warmer + higher contrast for Fig.1 specifically (clearer on paper/screens)
fig1_colors = {
"paired": "#e76f51",
"unpaired": "#6c757d",
"link": "#bc6c25",
}
sites = pd.read_excel(paths.distance_xlsx, sheet_name="sites")
pairs = pd.read_excel(paths.pairing_xlsx, sheet_name="selected_pairs")
sites = sites.copy()
sites["site_id"] = sites["site_id"].astype(int)
paired_ids = set(pairs["site_i_id"].astype(int)).union(
set(pairs["site_j_id"].astype(int))
)
sites["is_paired"] = sites["site_id"].isin(paired_ids)
fig, ax = plt.subplots(figsize=(8.5, 6.5), dpi=200)
for _, row in pairs.iterrows():
i = int(row["site_i_id"])
j = int(row["site_j_id"])
si = sites.loc[sites["site_id"] == i].iloc[0]
sj = sites.loc[sites["site_id"] == j].iloc[0]
ax.plot(
[si["lon"], sj["lon"]],
[si["lat"], sj["lat"]],
color=fig1_colors["link"],
alpha=0.5,
linewidth=1.2,
zorder=1,
)
paired = sites[sites["is_paired"]]
unpaired = sites[~sites["is_paired"]]
ax.scatter(
paired["lon"],
paired["lat"],
s=18,
color=fig1_colors["paired"],
alpha=0.9,
label=f"Paired sites (n={len(paired)})",
zorder=2,
)
if len(unpaired) > 0:
ax.scatter(
unpaired["lon"],
unpaired["lat"],
s=22,
color=fig1_colors["unpaired"],
alpha=0.9,
marker="x",
label=f"Unpaired sites (n={len(unpaired)})",
zorder=3,
)
ax.set_title(f"Fig.1 Pairing map (sites + selected n={len(pairs)} links)")
ax.set_xlabel("Longitude")
ax.set_ylabel("Latitude")
ax.grid(True, alpha=0.55)
ax.legend(loc="best", frameon=True)
out = paths.figures_dir / "fig1_pairing_map.png"
fig.tight_layout()
fig.savefig(out, facecolor=FIG_BG)
plt.close(fig)
return out
def export_fig1_points_js(paths: Paths) -> Path:
sites = pd.read_excel(paths.distance_xlsx, sheet_name="sites").copy()
pairs = pd.read_excel(paths.pairing_xlsx, sheet_name="selected_pairs").copy()
sites["site_id"] = sites["site_id"].astype(int)
sites["k"] = sites["k"].astype(int)
paired_ids = set(pairs["site_i_id"].astype(int)).union(
set(pairs["site_j_id"].astype(int))
)
points = []
for _, r in sites.iterrows():
points.append(
{
"site_id": int(r["site_id"]),
"site_name": str(r["site_name"]),
"lat": float(r["lat"]),
"lng": float(r["lon"]),
"mu": float(r["mu"]),
"sigma": float(r["sigma"]),
"k": int(r["k"]),
"is_paired": int(r["site_id"]) in paired_ids,
}
)
links = []
for _, r in pairs.iterrows():
links.append(
{
"site_i_id": int(r["site_i_id"]),
"site_j_id": int(r["site_j_id"]),
"site_i_name": str(r["site_i_name"]),
"site_j_name": str(r["site_j_name"]),
"distance": float(r["distance"]),
}
)
out = BASE_DIR / "fig1_points.js"
payload = (
"// Auto-generated from `task3/01_distance.xlsx` (sites) + `task3/02_pairing.xlsx` (selected_pairs)\n"
"// Usage: open `task3/fig1_carto.html` in a browser (same directory must contain this file).\n"
f"window.FIG1_POINTS = {json.dumps(points, ensure_ascii=False, separators=(',', ':'))};\n"
f"window.FIG1_LINKS = {json.dumps(links, ensure_ascii=False, separators=(',', ':'))};\n"
)
out.write_text(payload, encoding="utf-8")
return out
def fig2_allocation_scatter(paths: Paths) -> Path:
_apply_morandi_style()
df = pd.read_excel(paths.allocation_xlsx, sheet_name="allocation")
q_ratio = df["q_ratio"].to_numpy(dtype=float)
mu_share = (df["mu_i"] / (df["mu_i"] + df["mu_j"])).to_numpy(dtype=float)
sigma_share = (df["sigma_i"] / (df["sigma_i"] + df["sigma_j"])).to_numpy(dtype=float)
sigma_j_share = 1.0 - sigma_share
fig, axes = plt.subplots(1, 3, figsize=(12, 3.8), dpi=200, sharey=True)
panels = [
("$q^*/Q$ vs $\\mu_i/(\\mu_i+\\mu_j)$", mu_share),
("$q^*/Q$ vs $\\sigma_i/(\\sigma_i+\\sigma_j)$", sigma_share),
("$q^*/Q$ vs $\\sigma_j/(\\sigma_i+\\sigma_j)$", sigma_j_share),
]
for ax, (title, x) in zip(axes, panels):
ax.scatter(
x,
q_ratio,
s=22,
alpha=0.8,
color=MORANDI["muted_blue"],
edgecolor="none",
)
if np.isfinite(x).all() and np.isfinite(q_ratio).all() and len(x) >= 2:
coef = np.polyfit(x, q_ratio, 1)
xx = np.linspace(float(np.min(x)), float(np.max(x)), 100)
yy = coef[0] * xx + coef[1]
ax.plot(xx, yy, color=MORANDI["terracotta"], linewidth=2.0, alpha=0.95)
ax.set_title(title)
ax.set_xlabel("x")
ax.grid(True, alpha=0.55)
axes[0].set_ylabel("$q^*/Q$")
fig.suptitle(f"Fig.2 Allocation strategy scatter (n={len(df)} pairs)", y=1.02)
fig.tight_layout()
out = paths.figures_dir / "fig2_allocation_scatter.png"
fig.savefig(out, bbox_inches="tight", facecolor=FIG_BG)
plt.close(fig)
return out
def fig4_calendar_heatmap(paths: Paths) -> Path:
_apply_morandi_style()
sites = pd.read_excel(paths.distance_xlsx, sheet_name="sites")[["site_id", "mu"]]
sites["site_id"] = sites["site_id"].astype(int)
mu_by_id = dict(zip(sites["site_id"], sites["mu"]))
alloc = pd.read_excel(paths.allocation_xlsx, sheet_name="allocation")[
["site_i_id", "site_j_id", "E_total"]
].copy()
alloc["site_i_id"] = alloc["site_i_id"].astype(int)
alloc["site_j_id"] = alloc["site_j_id"].astype(int)
pair_E = {_pair_key(i, j): float(e) for i, j, e in alloc.to_numpy()}
cal = pd.read_excel(paths.calendar_xlsx, sheet_name="calendar")
expected = np.full((365, 2), np.nan, dtype=float)
for idx, row in cal.iterrows():
day = int(row["day"])
for slot in (1, 2):
typ = str(row[f"slot_{slot}_type"]).strip().lower()
i = row[f"slot_{slot}_site_i"]
j = row.get(f"slot_{slot}_site_j", np.nan)
if pd.isna(i):
continue
i_id = int(i)
if typ == "single":
expected[day - 1, slot - 1] = float(mu_by_id.get(i_id, np.nan))
elif typ == "dual":
if pd.isna(j):
continue
j_id = int(j)
expected[day - 1, slot - 1] = pair_E.get(_pair_key(i_id, j_id), np.nan)
fig, ax = plt.subplots(figsize=(12, 2.6), dpi=200)
cmap = mcolors.LinearSegmentedColormap.from_list(
"morandi",
[
FIG_BG,
"#E7E1D7",
MORANDI["sage"],
MORANDI["muted_blue"],
"#5F7478",
],
)
im = ax.imshow(expected.T, aspect="auto", cmap=cmap)
ax.set_yticks([0, 1], labels=["Slot 1", "Slot 2"])
ax.set_xlabel("Day of year")
ax.set_title("Fig.4 Calendar heatmap (expected service per slot)")
month_days = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
month_starts = np.cumsum([0] + month_days[:-1]) # 0-based
month_labels = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]
ax.set_xticks(month_starts, labels=month_labels)
cbar = fig.colorbar(im, ax=ax, fraction=0.03, pad=0.02)
cbar.set_label("Expected service")
fig.tight_layout()
out = paths.figures_dir / "fig4_calendar_heatmap.png"
fig.savefig(out, bbox_inches="tight", facecolor=FIG_BG)
plt.close(fig)
return out
def fig5_risk_distribution(paths: Paths) -> Path:
_apply_morandi_style()
risk = pd.read_excel(paths.evaluate_xlsx, sheet_name="pair_risk").copy()
p = risk["shortfall_prob_either"].to_numpy(dtype=float)
p = p[np.isfinite(p)]
fig, axes = plt.subplots(1, 2, figsize=(11, 3.8), dpi=200)
ax = axes[0]
ax.hist(p, bins=10, color=MORANDI["sage"], alpha=0.9, edgecolor=FIG_BG)
ax.axvline(
float(np.mean(p)),
color=MORANDI["terracotta"],
linewidth=2.2,
label=f"mean={np.mean(p):.3f}",
)
ax.set_title("Histogram")
ax.set_xlabel("Shortfall probability (either site)")
ax.set_ylabel("Count")
ax.grid(True, alpha=0.55)
ax.legend(loc="best", frameon=True)
ax = axes[1]
p_sorted = np.sort(p)[::-1]
ax.plot(
np.arange(1, len(p_sorted) + 1),
p_sorted,
marker="o",
markersize=3.2,
linewidth=1.6,
color=MORANDI["muted_blue_dark"],
markerfacecolor=MORANDI["muted_blue"],
markeredgecolor=FIG_BG,
markeredgewidth=0.6,
)
ax.set_title("Sorted by risk (descending)")
ax.set_xlabel("Pair rank")
ax.set_ylabel("Shortfall probability")
ax.grid(True, alpha=0.55)
fig.suptitle(f"Fig.5 Risk distribution across n={len(p)} pairs", y=1.02)
fig.tight_layout()
out = paths.figures_dir / "fig5_risk_distribution.png"
fig.savefig(out, bbox_inches="tight", facecolor=FIG_BG)
plt.close(fig)
return out
def fig6_fairness_satisfaction(paths: Paths) -> Path:
_apply_morandi_style()
df = pd.read_excel(paths.evaluate_xlsx, sheet_name="site_satisfaction").copy()
r = df["satisfaction_rate_r"].to_numpy(dtype=float)
r = r[np.isfinite(r)]
r_sorted = np.sort(r)
# Lorenz curve (site share vs satisfaction share)
n = len(r_sorted)
if n == 0 or np.sum(r_sorted) <= 0:
x = np.array([0.0, 1.0])
y = np.array([0.0, 1.0])
gini = 0.0
else:
cum = np.cumsum(r_sorted)
x = np.concatenate([[0.0], np.arange(1, n + 1) / n])
y = np.concatenate([[0.0], cum / cum[-1]])
# Gini via Lorenz area
gini = 1.0 - 2.0 * float(np.trapezoid(y, x))
fig, axes = plt.subplots(1, 2, figsize=(11, 3.8), dpi=200)
ax = axes[0]
ax.hist(r, bins=12, color=MORANDI["muted_blue"], alpha=0.9, edgecolor=FIG_BG)
ax.axvline(float(np.mean(r)), color=MORANDI["terracotta"], linewidth=2.2, label=f"mean={np.mean(r):.2f}")
ax.set_title("Satisfaction rate distribution")
ax.set_xlabel("Satisfaction rate $r_i$")
ax.set_ylabel("Count")
ax.grid(True, alpha=0.55)
ax.legend(loc="best", frameon=True)
ax = axes[1]
ax.plot(x, y, color=MORANDI["sage"], linewidth=2.2, label="Lorenz curve")
ax.plot([0, 1], [0, 1], color=MORANDI["warm_gray"], linestyle="--", linewidth=1.4, label="Equality line")
ax.set_title(f"Lorenz curve (Gini={gini:.3f})")
ax.set_xlabel("Cumulative share of sites")
ax.set_ylabel("Cumulative share of satisfaction")
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
ax.grid(True, alpha=0.55)
ax.legend(loc="lower right", frameon=True)
fig.suptitle(f"Fig.6 Fairness diagnostics (n={len(r)} sites)", y=1.02)
fig.tight_layout()
out = paths.figures_dir / "fig6_fairness_satisfaction.png"
fig.savefig(out, bbox_inches="tight", facecolor=FIG_BG)
plt.close(fig)
return out
def main() -> None:
paths = _default_paths()
for p in [
paths.distance_xlsx,
paths.pairing_xlsx,
paths.allocation_xlsx,
paths.calendar_xlsx,
paths.evaluate_xlsx,
]:
_require_file(p)
paths.figures_dir.mkdir(parents=True, exist_ok=True)
print("=" * 60)
print("Task 3 - Step 8: Visualization")
print("=" * 60)
out1 = fig1_pairing_map(paths)
print(f"Saved: {out1.relative_to(BASE_DIR)}")
out1_js = export_fig1_points_js(paths)
print(f"Saved: {out1_js.relative_to(BASE_DIR)}")
out2 = fig2_allocation_scatter(paths)
print(f"Saved: {out2.relative_to(BASE_DIR)}")
out6 = fig6_fairness_satisfaction(paths)
print(f"Saved: {out6.relative_to(BASE_DIR)}")
out4 = fig4_calendar_heatmap(paths)
print(f"Saved: {out4.relative_to(BASE_DIR)}")
out5 = fig5_risk_distribution(paths)
print(f"Saved: {out5.relative_to(BASE_DIR)}")
if __name__ == "__main__":
main()