501 lines
16 KiB
Python
501 lines
16 KiB
Python
"""
|
||
Task 3 - Step 8: 可视化(Fig.1/2/4/5/6)
|
||
=====================================
|
||
|
||
输入:
|
||
- 01_distance.xlsx (sites: 经纬度、mu/sigma)
|
||
- 02_pairing.xlsx (selected_pairs: 配对连线)
|
||
- 03_allocation.xlsx (allocation: q*, E_total)
|
||
- 05_calendar.xlsx (calendar: 365天×2槽位排程)
|
||
- 06_evaluate.xlsx (pair_risk: 缺口概率分布)
|
||
|
||
输出:
|
||
- figures/fig1_pairing_map.png
|
||
- figures/fig2_allocation_scatter.png
|
||
- figures/fig6_fairness_satisfaction.png
|
||
- figures/fig4_calendar_heatmap.png
|
||
- figures/fig5_risk_distribution.png
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
from dataclasses import dataclass
|
||
import json
|
||
from pathlib import Path
|
||
|
||
import os
|
||
import tempfile
|
||
|
||
import numpy as np
|
||
import pandas as pd
|
||
|
||
_mpl_config_dir = Path(tempfile.gettempdir()) / "mm_task3_mplconfig"
|
||
_xdg_cache_dir = Path(tempfile.gettempdir()) / "mm_task3_xdg_cache"
|
||
_mpl_config_dir.mkdir(parents=True, exist_ok=True)
|
||
_xdg_cache_dir.mkdir(parents=True, exist_ok=True)
|
||
os.environ.setdefault("MPLCONFIGDIR", str(_mpl_config_dir))
|
||
os.environ.setdefault("XDG_CACHE_HOME", str(_xdg_cache_dir))
|
||
|
||
import matplotlib
|
||
|
||
matplotlib.use("Agg")
|
||
import matplotlib.pyplot as plt
|
||
from matplotlib import colors as mcolors
|
||
from cycler import cycler
|
||
|
||
|
||
BASE_DIR = Path(__file__).resolve().parent
|
||
|
||
|
||
MORANDI = {
|
||
"grid": "#D7D1C7",
|
||
"text": "#3F3F3F",
|
||
"muted_blue": "#8FA3A7",
|
||
"muted_blue_dark": "#6E858A",
|
||
"sage": "#AEB8A6",
|
||
"clay": "#C6AA9B",
|
||
"rose": "#D2B8B3",
|
||
"warm_gray": "#8C857A",
|
||
"terracotta": "#B07A6A",
|
||
}
|
||
|
||
FIG_BG = "#FFFFFF"
|
||
FIG_GRID = "#DADDE1"
|
||
|
||
|
||
def _apply_morandi_style() -> None:
|
||
plt.rcParams.update(
|
||
{
|
||
"figure.facecolor": FIG_BG,
|
||
"axes.facecolor": FIG_BG,
|
||
"savefig.facecolor": FIG_BG,
|
||
"axes.edgecolor": FIG_GRID,
|
||
"axes.labelcolor": MORANDI["text"],
|
||
"xtick.color": MORANDI["text"],
|
||
"ytick.color": MORANDI["text"],
|
||
"text.color": MORANDI["text"],
|
||
"grid.color": FIG_GRID,
|
||
"grid.alpha": 0.55,
|
||
"axes.grid": True,
|
||
"axes.titleweight": "semibold",
|
||
"axes.titlesize": 11,
|
||
"axes.labelsize": 10,
|
||
"legend.frameon": True,
|
||
"legend.framealpha": 0.9,
|
||
"legend.facecolor": FIG_BG,
|
||
"legend.edgecolor": FIG_GRID,
|
||
"axes.prop_cycle": cycler(
|
||
"color",
|
||
[
|
||
MORANDI["muted_blue"],
|
||
MORANDI["sage"],
|
||
MORANDI["clay"],
|
||
MORANDI["rose"],
|
||
MORANDI["warm_gray"],
|
||
],
|
||
),
|
||
}
|
||
)
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class Paths:
|
||
distance_xlsx: Path
|
||
pairing_xlsx: Path
|
||
allocation_xlsx: Path
|
||
calendar_xlsx: Path
|
||
evaluate_xlsx: Path
|
||
figures_dir: Path
|
||
|
||
|
||
def _default_paths() -> Paths:
|
||
return Paths(
|
||
distance_xlsx=BASE_DIR / "01_distance.xlsx",
|
||
pairing_xlsx=BASE_DIR / "02_pairing.xlsx",
|
||
allocation_xlsx=BASE_DIR / "03_allocation.xlsx",
|
||
calendar_xlsx=BASE_DIR / "05_calendar.xlsx",
|
||
evaluate_xlsx=BASE_DIR / "06_evaluate.xlsx",
|
||
figures_dir=BASE_DIR / "figures",
|
||
)
|
||
|
||
|
||
def _require_file(path: Path) -> None:
|
||
if not path.exists():
|
||
raise FileNotFoundError(
|
||
f"Missing required file: {path}. Run the earlier steps first (01~06)."
|
||
)
|
||
|
||
|
||
def _pair_key(a: int, b: int) -> tuple[int, int]:
|
||
return (a, b) if a <= b else (b, a)
|
||
|
||
|
||
def fig1_pairing_map(paths: Paths) -> Path:
|
||
_apply_morandi_style()
|
||
|
||
# Warmer + higher contrast for Fig.1 specifically (clearer on paper/screens)
|
||
fig1_colors = {
|
||
"paired": "#e76f51",
|
||
"unpaired": "#6c757d",
|
||
"link": "#bc6c25",
|
||
}
|
||
|
||
sites = pd.read_excel(paths.distance_xlsx, sheet_name="sites")
|
||
pairs = pd.read_excel(paths.pairing_xlsx, sheet_name="selected_pairs")
|
||
|
||
sites = sites.copy()
|
||
sites["site_id"] = sites["site_id"].astype(int)
|
||
|
||
paired_ids = set(pairs["site_i_id"].astype(int)).union(
|
||
set(pairs["site_j_id"].astype(int))
|
||
)
|
||
sites["is_paired"] = sites["site_id"].isin(paired_ids)
|
||
|
||
fig, ax = plt.subplots(figsize=(8.5, 6.5), dpi=200)
|
||
|
||
for _, row in pairs.iterrows():
|
||
i = int(row["site_i_id"])
|
||
j = int(row["site_j_id"])
|
||
si = sites.loc[sites["site_id"] == i].iloc[0]
|
||
sj = sites.loc[sites["site_id"] == j].iloc[0]
|
||
ax.plot(
|
||
[si["lon"], sj["lon"]],
|
||
[si["lat"], sj["lat"]],
|
||
color=fig1_colors["link"],
|
||
alpha=0.5,
|
||
linewidth=1.2,
|
||
zorder=1,
|
||
)
|
||
|
||
paired = sites[sites["is_paired"]]
|
||
unpaired = sites[~sites["is_paired"]]
|
||
|
||
ax.scatter(
|
||
paired["lon"],
|
||
paired["lat"],
|
||
s=18,
|
||
color=fig1_colors["paired"],
|
||
alpha=0.9,
|
||
label=f"Paired sites (n={len(paired)})",
|
||
zorder=2,
|
||
)
|
||
if len(unpaired) > 0:
|
||
ax.scatter(
|
||
unpaired["lon"],
|
||
unpaired["lat"],
|
||
s=22,
|
||
color=fig1_colors["unpaired"],
|
||
alpha=0.9,
|
||
marker="x",
|
||
label=f"Unpaired sites (n={len(unpaired)})",
|
||
zorder=3,
|
||
)
|
||
|
||
ax.set_title(f"Fig.1 Pairing map (sites + selected n={len(pairs)} links)")
|
||
ax.set_xlabel("Longitude")
|
||
ax.set_ylabel("Latitude")
|
||
ax.grid(True, alpha=0.55)
|
||
ax.legend(loc="best", frameon=True)
|
||
|
||
out = paths.figures_dir / "fig1_pairing_map.png"
|
||
fig.tight_layout()
|
||
fig.savefig(out, facecolor=FIG_BG)
|
||
plt.close(fig)
|
||
return out
|
||
|
||
|
||
def export_fig1_points_js(paths: Paths) -> Path:
|
||
sites = pd.read_excel(paths.distance_xlsx, sheet_name="sites").copy()
|
||
pairs = pd.read_excel(paths.pairing_xlsx, sheet_name="selected_pairs").copy()
|
||
|
||
sites["site_id"] = sites["site_id"].astype(int)
|
||
sites["k"] = sites["k"].astype(int)
|
||
|
||
paired_ids = set(pairs["site_i_id"].astype(int)).union(
|
||
set(pairs["site_j_id"].astype(int))
|
||
)
|
||
|
||
points = []
|
||
for _, r in sites.iterrows():
|
||
points.append(
|
||
{
|
||
"site_id": int(r["site_id"]),
|
||
"site_name": str(r["site_name"]),
|
||
"lat": float(r["lat"]),
|
||
"lng": float(r["lon"]),
|
||
"mu": float(r["mu"]),
|
||
"sigma": float(r["sigma"]),
|
||
"k": int(r["k"]),
|
||
"is_paired": int(r["site_id"]) in paired_ids,
|
||
}
|
||
)
|
||
|
||
links = []
|
||
for _, r in pairs.iterrows():
|
||
links.append(
|
||
{
|
||
"site_i_id": int(r["site_i_id"]),
|
||
"site_j_id": int(r["site_j_id"]),
|
||
"site_i_name": str(r["site_i_name"]),
|
||
"site_j_name": str(r["site_j_name"]),
|
||
"distance": float(r["distance"]),
|
||
}
|
||
)
|
||
|
||
out = BASE_DIR / "fig1_points.js"
|
||
payload = (
|
||
"// Auto-generated from `task3/01_distance.xlsx` (sites) + `task3/02_pairing.xlsx` (selected_pairs)\n"
|
||
"// Usage: open `task3/fig1_carto.html` in a browser (same directory must contain this file).\n"
|
||
f"window.FIG1_POINTS = {json.dumps(points, ensure_ascii=False, separators=(',', ':'))};\n"
|
||
f"window.FIG1_LINKS = {json.dumps(links, ensure_ascii=False, separators=(',', ':'))};\n"
|
||
)
|
||
out.write_text(payload, encoding="utf-8")
|
||
return out
|
||
|
||
|
||
def fig2_allocation_scatter(paths: Paths) -> Path:
|
||
_apply_morandi_style()
|
||
|
||
df = pd.read_excel(paths.allocation_xlsx, sheet_name="allocation")
|
||
|
||
q_ratio = df["q_ratio"].to_numpy(dtype=float)
|
||
mu_share = (df["mu_i"] / (df["mu_i"] + df["mu_j"])).to_numpy(dtype=float)
|
||
sigma_share = (df["sigma_i"] / (df["sigma_i"] + df["sigma_j"])).to_numpy(dtype=float)
|
||
sigma_j_share = 1.0 - sigma_share
|
||
|
||
fig, axes = plt.subplots(1, 3, figsize=(12, 3.8), dpi=200, sharey=True)
|
||
|
||
panels = [
|
||
("$q^*/Q$ vs $\\mu_i/(\\mu_i+\\mu_j)$", mu_share),
|
||
("$q^*/Q$ vs $\\sigma_i/(\\sigma_i+\\sigma_j)$", sigma_share),
|
||
("$q^*/Q$ vs $\\sigma_j/(\\sigma_i+\\sigma_j)$", sigma_j_share),
|
||
]
|
||
|
||
for ax, (title, x) in zip(axes, panels):
|
||
ax.scatter(
|
||
x,
|
||
q_ratio,
|
||
s=22,
|
||
alpha=0.8,
|
||
color=MORANDI["muted_blue"],
|
||
edgecolor="none",
|
||
)
|
||
if np.isfinite(x).all() and np.isfinite(q_ratio).all() and len(x) >= 2:
|
||
coef = np.polyfit(x, q_ratio, 1)
|
||
xx = np.linspace(float(np.min(x)), float(np.max(x)), 100)
|
||
yy = coef[0] * xx + coef[1]
|
||
ax.plot(xx, yy, color=MORANDI["terracotta"], linewidth=2.0, alpha=0.95)
|
||
ax.set_title(title)
|
||
ax.set_xlabel("x")
|
||
ax.grid(True, alpha=0.55)
|
||
|
||
axes[0].set_ylabel("$q^*/Q$")
|
||
fig.suptitle(f"Fig.2 Allocation strategy scatter (n={len(df)} pairs)", y=1.02)
|
||
fig.tight_layout()
|
||
|
||
out = paths.figures_dir / "fig2_allocation_scatter.png"
|
||
fig.savefig(out, bbox_inches="tight", facecolor=FIG_BG)
|
||
plt.close(fig)
|
||
return out
|
||
|
||
|
||
def fig4_calendar_heatmap(paths: Paths) -> Path:
|
||
_apply_morandi_style()
|
||
|
||
sites = pd.read_excel(paths.distance_xlsx, sheet_name="sites")[["site_id", "mu"]]
|
||
sites["site_id"] = sites["site_id"].astype(int)
|
||
mu_by_id = dict(zip(sites["site_id"], sites["mu"]))
|
||
|
||
alloc = pd.read_excel(paths.allocation_xlsx, sheet_name="allocation")[
|
||
["site_i_id", "site_j_id", "E_total"]
|
||
].copy()
|
||
alloc["site_i_id"] = alloc["site_i_id"].astype(int)
|
||
alloc["site_j_id"] = alloc["site_j_id"].astype(int)
|
||
pair_E = {_pair_key(i, j): float(e) for i, j, e in alloc.to_numpy()}
|
||
|
||
cal = pd.read_excel(paths.calendar_xlsx, sheet_name="calendar")
|
||
|
||
expected = np.full((365, 2), np.nan, dtype=float)
|
||
|
||
for idx, row in cal.iterrows():
|
||
day = int(row["day"])
|
||
for slot in (1, 2):
|
||
typ = str(row[f"slot_{slot}_type"]).strip().lower()
|
||
i = row[f"slot_{slot}_site_i"]
|
||
j = row.get(f"slot_{slot}_site_j", np.nan)
|
||
|
||
if pd.isna(i):
|
||
continue
|
||
i_id = int(i)
|
||
if typ == "single":
|
||
expected[day - 1, slot - 1] = float(mu_by_id.get(i_id, np.nan))
|
||
elif typ == "dual":
|
||
if pd.isna(j):
|
||
continue
|
||
j_id = int(j)
|
||
expected[day - 1, slot - 1] = pair_E.get(_pair_key(i_id, j_id), np.nan)
|
||
|
||
fig, ax = plt.subplots(figsize=(12, 2.6), dpi=200)
|
||
cmap = mcolors.LinearSegmentedColormap.from_list(
|
||
"morandi",
|
||
[
|
||
FIG_BG,
|
||
"#E7E1D7",
|
||
MORANDI["sage"],
|
||
MORANDI["muted_blue"],
|
||
"#5F7478",
|
||
],
|
||
)
|
||
im = ax.imshow(expected.T, aspect="auto", cmap=cmap)
|
||
ax.set_yticks([0, 1], labels=["Slot 1", "Slot 2"])
|
||
ax.set_xlabel("Day of year")
|
||
ax.set_title("Fig.4 Calendar heatmap (expected service per slot)")
|
||
|
||
month_days = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
|
||
month_starts = np.cumsum([0] + month_days[:-1]) # 0-based
|
||
month_labels = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]
|
||
ax.set_xticks(month_starts, labels=month_labels)
|
||
|
||
cbar = fig.colorbar(im, ax=ax, fraction=0.03, pad=0.02)
|
||
cbar.set_label("Expected service")
|
||
|
||
fig.tight_layout()
|
||
out = paths.figures_dir / "fig4_calendar_heatmap.png"
|
||
fig.savefig(out, bbox_inches="tight", facecolor=FIG_BG)
|
||
plt.close(fig)
|
||
return out
|
||
|
||
|
||
def fig5_risk_distribution(paths: Paths) -> Path:
|
||
_apply_morandi_style()
|
||
|
||
risk = pd.read_excel(paths.evaluate_xlsx, sheet_name="pair_risk").copy()
|
||
p = risk["shortfall_prob_either"].to_numpy(dtype=float)
|
||
p = p[np.isfinite(p)]
|
||
|
||
fig, axes = plt.subplots(1, 2, figsize=(11, 3.8), dpi=200)
|
||
|
||
ax = axes[0]
|
||
ax.hist(p, bins=10, color=MORANDI["sage"], alpha=0.9, edgecolor=FIG_BG)
|
||
ax.axvline(
|
||
float(np.mean(p)),
|
||
color=MORANDI["terracotta"],
|
||
linewidth=2.2,
|
||
label=f"mean={np.mean(p):.3f}",
|
||
)
|
||
ax.set_title("Histogram")
|
||
ax.set_xlabel("Shortfall probability (either site)")
|
||
ax.set_ylabel("Count")
|
||
ax.grid(True, alpha=0.55)
|
||
ax.legend(loc="best", frameon=True)
|
||
|
||
ax = axes[1]
|
||
p_sorted = np.sort(p)[::-1]
|
||
ax.plot(
|
||
np.arange(1, len(p_sorted) + 1),
|
||
p_sorted,
|
||
marker="o",
|
||
markersize=3.2,
|
||
linewidth=1.6,
|
||
color=MORANDI["muted_blue_dark"],
|
||
markerfacecolor=MORANDI["muted_blue"],
|
||
markeredgecolor=FIG_BG,
|
||
markeredgewidth=0.6,
|
||
)
|
||
ax.set_title("Sorted by risk (descending)")
|
||
ax.set_xlabel("Pair rank")
|
||
ax.set_ylabel("Shortfall probability")
|
||
ax.grid(True, alpha=0.55)
|
||
|
||
fig.suptitle(f"Fig.5 Risk distribution across n={len(p)} pairs", y=1.02)
|
||
fig.tight_layout()
|
||
out = paths.figures_dir / "fig5_risk_distribution.png"
|
||
fig.savefig(out, bbox_inches="tight", facecolor=FIG_BG)
|
||
plt.close(fig)
|
||
return out
|
||
|
||
|
||
def fig6_fairness_satisfaction(paths: Paths) -> Path:
|
||
_apply_morandi_style()
|
||
|
||
df = pd.read_excel(paths.evaluate_xlsx, sheet_name="site_satisfaction").copy()
|
||
r = df["satisfaction_rate_r"].to_numpy(dtype=float)
|
||
r = r[np.isfinite(r)]
|
||
r_sorted = np.sort(r)
|
||
|
||
# Lorenz curve (site share vs satisfaction share)
|
||
n = len(r_sorted)
|
||
if n == 0 or np.sum(r_sorted) <= 0:
|
||
x = np.array([0.0, 1.0])
|
||
y = np.array([0.0, 1.0])
|
||
gini = 0.0
|
||
else:
|
||
cum = np.cumsum(r_sorted)
|
||
x = np.concatenate([[0.0], np.arange(1, n + 1) / n])
|
||
y = np.concatenate([[0.0], cum / cum[-1]])
|
||
# Gini via Lorenz area
|
||
gini = 1.0 - 2.0 * float(np.trapezoid(y, x))
|
||
|
||
fig, axes = plt.subplots(1, 2, figsize=(11, 3.8), dpi=200)
|
||
|
||
ax = axes[0]
|
||
ax.hist(r, bins=12, color=MORANDI["muted_blue"], alpha=0.9, edgecolor=FIG_BG)
|
||
ax.axvline(float(np.mean(r)), color=MORANDI["terracotta"], linewidth=2.2, label=f"mean={np.mean(r):.2f}")
|
||
ax.set_title("Satisfaction rate distribution")
|
||
ax.set_xlabel("Satisfaction rate $r_i$")
|
||
ax.set_ylabel("Count")
|
||
ax.grid(True, alpha=0.55)
|
||
ax.legend(loc="best", frameon=True)
|
||
|
||
ax = axes[1]
|
||
ax.plot(x, y, color=MORANDI["sage"], linewidth=2.2, label="Lorenz curve")
|
||
ax.plot([0, 1], [0, 1], color=MORANDI["warm_gray"], linestyle="--", linewidth=1.4, label="Equality line")
|
||
ax.set_title(f"Lorenz curve (Gini={gini:.3f})")
|
||
ax.set_xlabel("Cumulative share of sites")
|
||
ax.set_ylabel("Cumulative share of satisfaction")
|
||
ax.set_xlim(0, 1)
|
||
ax.set_ylim(0, 1)
|
||
ax.grid(True, alpha=0.55)
|
||
ax.legend(loc="lower right", frameon=True)
|
||
|
||
fig.suptitle(f"Fig.6 Fairness diagnostics (n={len(r)} sites)", y=1.02)
|
||
fig.tight_layout()
|
||
out = paths.figures_dir / "fig6_fairness_satisfaction.png"
|
||
fig.savefig(out, bbox_inches="tight", facecolor=FIG_BG)
|
||
plt.close(fig)
|
||
return out
|
||
|
||
|
||
def main() -> None:
|
||
paths = _default_paths()
|
||
for p in [
|
||
paths.distance_xlsx,
|
||
paths.pairing_xlsx,
|
||
paths.allocation_xlsx,
|
||
paths.calendar_xlsx,
|
||
paths.evaluate_xlsx,
|
||
]:
|
||
_require_file(p)
|
||
paths.figures_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
print("=" * 60)
|
||
print("Task 3 - Step 8: Visualization")
|
||
print("=" * 60)
|
||
|
||
out1 = fig1_pairing_map(paths)
|
||
print(f"Saved: {out1.relative_to(BASE_DIR)}")
|
||
out1_js = export_fig1_points_js(paths)
|
||
print(f"Saved: {out1_js.relative_to(BASE_DIR)}")
|
||
out2 = fig2_allocation_scatter(paths)
|
||
print(f"Saved: {out2.relative_to(BASE_DIR)}")
|
||
out6 = fig6_fairness_satisfaction(paths)
|
||
print(f"Saved: {out6.relative_to(BASE_DIR)}")
|
||
out4 = fig4_calendar_heatmap(paths)
|
||
print(f"Saved: {out4.relative_to(BASE_DIR)}")
|
||
out5 = fig5_risk_distribution(paths)
|
||
print(f"Saved: {out5.relative_to(BASE_DIR)}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|