295 lines
9.2 KiB
Python
295 lines
9.2 KiB
Python
"""
|
||
Task 3 - Step 8: 可视化(Fig.1/2/4/5)
|
||
=====================================
|
||
|
||
输入:
|
||
- 01_distance.xlsx (sites: 经纬度、mu/sigma)
|
||
- 02_pairing.xlsx (selected_pairs: 34对配对连线)
|
||
- 03_allocation.xlsx (allocation: q*, E_total)
|
||
- 05_calendar.xlsx (calendar: 365天×2槽位排程)
|
||
- 06_evaluate.xlsx (pair_risk: 缺口概率分布)
|
||
|
||
输出:
|
||
- figures/fig1_pairing_map.png
|
||
- figures/fig2_allocation_scatter.png
|
||
- figures/fig4_calendar_heatmap.png
|
||
- figures/fig5_risk_distribution.png
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
from dataclasses import dataclass
|
||
from pathlib import Path
|
||
|
||
import os
|
||
import tempfile
|
||
|
||
import numpy as np
|
||
import pandas as pd
|
||
|
||
_mpl_config_dir = Path(tempfile.gettempdir()) / "mm_task3_mplconfig"
|
||
_xdg_cache_dir = Path(tempfile.gettempdir()) / "mm_task3_xdg_cache"
|
||
_mpl_config_dir.mkdir(parents=True, exist_ok=True)
|
||
_xdg_cache_dir.mkdir(parents=True, exist_ok=True)
|
||
os.environ.setdefault("MPLCONFIGDIR", str(_mpl_config_dir))
|
||
os.environ.setdefault("XDG_CACHE_HOME", str(_xdg_cache_dir))
|
||
|
||
import matplotlib
|
||
|
||
matplotlib.use("Agg")
|
||
import matplotlib.pyplot as plt
|
||
|
||
|
||
BASE_DIR = Path(__file__).resolve().parent
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class Paths:
|
||
distance_xlsx: Path
|
||
pairing_xlsx: Path
|
||
allocation_xlsx: Path
|
||
calendar_xlsx: Path
|
||
evaluate_xlsx: Path
|
||
figures_dir: Path
|
||
|
||
|
||
def _default_paths() -> Paths:
|
||
return Paths(
|
||
distance_xlsx=BASE_DIR / "01_distance.xlsx",
|
||
pairing_xlsx=BASE_DIR / "02_pairing.xlsx",
|
||
allocation_xlsx=BASE_DIR / "03_allocation.xlsx",
|
||
calendar_xlsx=BASE_DIR / "05_calendar.xlsx",
|
||
evaluate_xlsx=BASE_DIR / "06_evaluate.xlsx",
|
||
figures_dir=BASE_DIR / "figures",
|
||
)
|
||
|
||
|
||
def _require_file(path: Path) -> None:
|
||
if not path.exists():
|
||
raise FileNotFoundError(
|
||
f"Missing required file: {path}. Run the earlier steps first (01~06)."
|
||
)
|
||
|
||
|
||
def _pair_key(a: int, b: int) -> tuple[int, int]:
|
||
return (a, b) if a <= b else (b, a)
|
||
|
||
|
||
def fig1_pairing_map(paths: Paths) -> Path:
|
||
sites = pd.read_excel(paths.distance_xlsx, sheet_name="sites")
|
||
pairs = pd.read_excel(paths.pairing_xlsx, sheet_name="selected_pairs")
|
||
|
||
sites = sites.copy()
|
||
sites["site_id"] = sites["site_id"].astype(int)
|
||
|
||
paired_ids = set(pairs["site_i_id"].astype(int)).union(
|
||
set(pairs["site_j_id"].astype(int))
|
||
)
|
||
sites["is_paired"] = sites["site_id"].isin(paired_ids)
|
||
|
||
fig, ax = plt.subplots(figsize=(8.5, 6.5), dpi=200)
|
||
|
||
for _, row in pairs.iterrows():
|
||
i = int(row["site_i_id"])
|
||
j = int(row["site_j_id"])
|
||
si = sites.loc[sites["site_id"] == i].iloc[0]
|
||
sj = sites.loc[sites["site_id"] == j].iloc[0]
|
||
ax.plot(
|
||
[si["lon"], sj["lon"]],
|
||
[si["lat"], sj["lat"]],
|
||
color="#2c7fb8",
|
||
alpha=0.35,
|
||
linewidth=1.0,
|
||
zorder=1,
|
||
)
|
||
|
||
paired = sites[sites["is_paired"]]
|
||
unpaired = sites[~sites["is_paired"]]
|
||
|
||
ax.scatter(
|
||
paired["lon"],
|
||
paired["lat"],
|
||
s=18,
|
||
color="#d95f0e",
|
||
alpha=0.85,
|
||
label=f"Paired sites (n={len(paired)})",
|
||
zorder=2,
|
||
)
|
||
if len(unpaired) > 0:
|
||
ax.scatter(
|
||
unpaired["lon"],
|
||
unpaired["lat"],
|
||
s=22,
|
||
color="#636363",
|
||
alpha=0.9,
|
||
marker="x",
|
||
label=f"Unpaired sites (n={len(unpaired)})",
|
||
zorder=3,
|
||
)
|
||
|
||
ax.set_title("Fig.1 Pairing map (sites + selected 34 links)")
|
||
ax.set_xlabel("Longitude")
|
||
ax.set_ylabel("Latitude")
|
||
ax.grid(True, alpha=0.2)
|
||
ax.legend(loc="best", frameon=True)
|
||
|
||
out = paths.figures_dir / "fig1_pairing_map.png"
|
||
fig.tight_layout()
|
||
fig.savefig(out)
|
||
plt.close(fig)
|
||
return out
|
||
|
||
|
||
def fig2_allocation_scatter(paths: Paths) -> Path:
|
||
df = pd.read_excel(paths.allocation_xlsx, sheet_name="allocation")
|
||
|
||
q_ratio = df["q_ratio"].to_numpy(dtype=float)
|
||
mu_share = (df["mu_i"] / (df["mu_i"] + df["mu_j"])).to_numpy(dtype=float)
|
||
sigma_share = (df["sigma_i"] / (df["sigma_i"] + df["sigma_j"])).to_numpy(dtype=float)
|
||
sigma_j_share = 1.0 - sigma_share
|
||
|
||
fig, axes = plt.subplots(1, 3, figsize=(12, 3.8), dpi=200, sharey=True)
|
||
|
||
panels = [
|
||
("$q^*/Q$ vs $\\mu_i/(\\mu_i+\\mu_j)$", mu_share),
|
||
("$q^*/Q$ vs $\\sigma_i/(\\sigma_i+\\sigma_j)$", sigma_share),
|
||
("$q^*/Q$ vs $\\sigma_j/(\\sigma_i+\\sigma_j)$", sigma_j_share),
|
||
]
|
||
|
||
for ax, (title, x) in zip(axes, panels):
|
||
ax.scatter(x, q_ratio, s=22, alpha=0.75, color="#3182bd", edgecolor="none")
|
||
if np.isfinite(x).all() and np.isfinite(q_ratio).all() and len(x) >= 2:
|
||
coef = np.polyfit(x, q_ratio, 1)
|
||
xx = np.linspace(float(np.min(x)), float(np.max(x)), 100)
|
||
yy = coef[0] * xx + coef[1]
|
||
ax.plot(xx, yy, color="#de2d26", linewidth=2.0, alpha=0.9)
|
||
ax.set_title(title)
|
||
ax.set_xlabel("x")
|
||
ax.grid(True, alpha=0.2)
|
||
|
||
axes[0].set_ylabel("$q^*/Q$")
|
||
fig.suptitle("Fig.2 Allocation strategy scatter (34 pairs)", y=1.02)
|
||
fig.tight_layout()
|
||
|
||
out = paths.figures_dir / "fig2_allocation_scatter.png"
|
||
fig.savefig(out, bbox_inches="tight")
|
||
plt.close(fig)
|
||
return out
|
||
|
||
|
||
def fig4_calendar_heatmap(paths: Paths) -> Path:
|
||
sites = pd.read_excel(paths.distance_xlsx, sheet_name="sites")[["site_id", "mu"]]
|
||
sites["site_id"] = sites["site_id"].astype(int)
|
||
mu_by_id = dict(zip(sites["site_id"], sites["mu"]))
|
||
|
||
alloc = pd.read_excel(paths.allocation_xlsx, sheet_name="allocation")[
|
||
["site_i_id", "site_j_id", "E_total"]
|
||
].copy()
|
||
alloc["site_i_id"] = alloc["site_i_id"].astype(int)
|
||
alloc["site_j_id"] = alloc["site_j_id"].astype(int)
|
||
pair_E = {_pair_key(i, j): float(e) for i, j, e in alloc.to_numpy()}
|
||
|
||
cal = pd.read_excel(paths.calendar_xlsx, sheet_name="calendar")
|
||
|
||
expected = np.full((365, 2), np.nan, dtype=float)
|
||
|
||
for idx, row in cal.iterrows():
|
||
day = int(row["day"])
|
||
for slot in (1, 2):
|
||
typ = str(row[f"slot_{slot}_type"]).strip().lower()
|
||
i = row[f"slot_{slot}_site_i"]
|
||
j = row.get(f"slot_{slot}_site_j", np.nan)
|
||
|
||
if pd.isna(i):
|
||
continue
|
||
i_id = int(i)
|
||
if typ == "single":
|
||
expected[day - 1, slot - 1] = float(mu_by_id.get(i_id, np.nan))
|
||
elif typ == "dual":
|
||
if pd.isna(j):
|
||
continue
|
||
j_id = int(j)
|
||
expected[day - 1, slot - 1] = pair_E.get(_pair_key(i_id, j_id), np.nan)
|
||
|
||
fig, ax = plt.subplots(figsize=(12, 2.6), dpi=200)
|
||
im = ax.imshow(expected.T, aspect="auto", cmap="viridis")
|
||
ax.set_yticks([0, 1], labels=["Slot 1", "Slot 2"])
|
||
ax.set_xlabel("Day of year")
|
||
ax.set_title("Fig.4 Calendar heatmap (expected service per slot)")
|
||
|
||
month_days = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
|
||
month_starts = np.cumsum([0] + month_days[:-1]) # 0-based
|
||
month_labels = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]
|
||
ax.set_xticks(month_starts, labels=month_labels)
|
||
|
||
cbar = fig.colorbar(im, ax=ax, fraction=0.03, pad=0.02)
|
||
cbar.set_label("Expected service")
|
||
|
||
fig.tight_layout()
|
||
out = paths.figures_dir / "fig4_calendar_heatmap.png"
|
||
fig.savefig(out, bbox_inches="tight")
|
||
plt.close(fig)
|
||
return out
|
||
|
||
|
||
def fig5_risk_distribution(paths: Paths) -> Path:
|
||
risk = pd.read_excel(paths.evaluate_xlsx, sheet_name="pair_risk").copy()
|
||
p = risk["shortfall_prob_either"].to_numpy(dtype=float)
|
||
p = p[np.isfinite(p)]
|
||
|
||
fig, axes = plt.subplots(1, 2, figsize=(11, 3.8), dpi=200)
|
||
|
||
ax = axes[0]
|
||
ax.hist(p, bins=10, color="#3182bd", alpha=0.85, edgecolor="white")
|
||
ax.axvline(float(np.mean(p)), color="#de2d26", linewidth=2.0, label=f"mean={np.mean(p):.3f}")
|
||
ax.set_title("Histogram")
|
||
ax.set_xlabel("Shortfall probability (either site)")
|
||
ax.set_ylabel("Count")
|
||
ax.grid(True, alpha=0.2)
|
||
ax.legend(loc="best", frameon=True)
|
||
|
||
ax = axes[1]
|
||
p_sorted = np.sort(p)[::-1]
|
||
ax.plot(np.arange(1, len(p_sorted) + 1), p_sorted, marker="o", markersize=3.5, linewidth=1.5)
|
||
ax.set_title("Sorted by risk (descending)")
|
||
ax.set_xlabel("Pair rank")
|
||
ax.set_ylabel("Shortfall probability")
|
||
ax.grid(True, alpha=0.2)
|
||
|
||
fig.suptitle("Fig.5 Risk distribution across 34 pairs", y=1.02)
|
||
fig.tight_layout()
|
||
out = paths.figures_dir / "fig5_risk_distribution.png"
|
||
fig.savefig(out, bbox_inches="tight")
|
||
plt.close(fig)
|
||
return out
|
||
|
||
|
||
def main() -> None:
|
||
paths = _default_paths()
|
||
for p in [
|
||
paths.distance_xlsx,
|
||
paths.pairing_xlsx,
|
||
paths.allocation_xlsx,
|
||
paths.calendar_xlsx,
|
||
paths.evaluate_xlsx,
|
||
]:
|
||
_require_file(p)
|
||
paths.figures_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
print("=" * 60)
|
||
print("Task 3 - Step 8: Visualization")
|
||
print("=" * 60)
|
||
|
||
out1 = fig1_pairing_map(paths)
|
||
print(f"Saved: {out1.relative_to(BASE_DIR)}")
|
||
out2 = fig2_allocation_scatter(paths)
|
||
print(f"Saved: {out2.relative_to(BASE_DIR)}")
|
||
out4 = fig4_calendar_heatmap(paths)
|
||
print(f"Saved: {out4.relative_to(BASE_DIR)}")
|
||
out5 = fig5_risk_distribution(paths)
|
||
print(f"Saved: {out5.relative_to(BASE_DIR)}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|