Files
mcm-mfp/task3/07_sensitivity.py

466 lines
16 KiB
Python
Raw Permalink Normal View History

2026-01-19 12:59:03 +08:00
"""
2026-01-19 14:15:51 +08:00
Task 3 - Step 7: 敏感性分析 (Refined)
=====================================
分析以下参数对模型输出的影响 (高分辨率扫描):
1. 合并比例 r_merge: 0.1 ~ 0.9 (步长 0.01)
2. 距离阈值 l_max: 10 ~ 100 miles (步长 1)
3. 容量上限 μ_sum_max: 350 ~ 550 (步长 5)
4. CV阈值: 0.1 ~ 1.0 (步长 0.01)
输出:
- 07_sensitivity.xlsx (详细数据)
- figures/fig3_sensitivity.png (敏感性曲线图)
2026-01-19 12:59:03 +08:00
"""
import pandas as pd
import numpy as np
from scipy import stats
import warnings
2026-01-19 14:15:51 +08:00
import os
2026-01-19 19:38:38 +08:00
import tempfile
2026-01-19 14:15:51 +08:00
from pathlib import Path
2026-01-19 12:59:03 +08:00
warnings.filterwarnings('ignore')
2026-01-19 19:38:38 +08:00
# Matplotlib cache dirs避免 home 目录不可写导致的警告)
_mpl_config_dir = Path(tempfile.gettempdir()) / "mm_task3_mplconfig"
_xdg_cache_dir = Path(tempfile.gettempdir()) / "mm_task3_xdg_cache"
_mpl_config_dir.mkdir(parents=True, exist_ok=True)
_xdg_cache_dir.mkdir(parents=True, exist_ok=True)
os.environ.setdefault("MPLCONFIGDIR", str(_mpl_config_dir))
os.environ.setdefault("XDG_CACHE_HOME", str(_xdg_cache_dir))
import matplotlib.pyplot as plt
from cycler import cycler
# 设置绘图风格(低饱和度 Morandi 色系)
MORANDI = {
"grid": "#D7D1C7",
"text": "#3F3F3F",
"muted_blue": "#8FA3A7",
"muted_blue_dark": "#6E858A",
"sage": "#AEB8A6",
"terracotta": "#B07A6A",
"warm_gray": "#8C857A",
}
2026-01-20 01:55:46 +08:00
FIG_BG = "#FFFFFF"
FIG_GRID = "#DADDE1"
2026-01-19 19:38:38 +08:00
plt.rcParams.update(
{
2026-01-20 01:55:46 +08:00
"figure.facecolor": FIG_BG,
"axes.facecolor": FIG_BG,
"savefig.facecolor": FIG_BG,
"axes.edgecolor": FIG_GRID,
2026-01-19 19:38:38 +08:00
"axes.labelcolor": MORANDI["text"],
"xtick.color": MORANDI["text"],
"ytick.color": MORANDI["text"],
"text.color": MORANDI["text"],
2026-01-20 01:55:46 +08:00
"grid.color": FIG_GRID,
2026-01-19 19:38:38 +08:00
"grid.alpha": 0.55,
"axes.grid": True,
"axes.prop_cycle": cycler(
"color",
[
MORANDI["muted_blue"],
MORANDI["sage"],
MORANDI["terracotta"],
MORANDI["warm_gray"],
],
),
"legend.frameon": True,
"legend.framealpha": 0.9,
2026-01-20 01:55:46 +08:00
"legend.facecolor": FIG_BG,
"legend.edgecolor": FIG_GRID,
2026-01-19 19:38:38 +08:00
"font.sans-serif": ["Arial Unicode MS", "SimHei", "DejaVu Sans"],
"axes.unicode_minus": False,
}
)
2026-01-19 14:15:51 +08:00
2026-01-19 12:59:03 +08:00
# ============================================
2026-01-19 14:15:51 +08:00
# 基础参数和函数 (保持不变)
2026-01-19 12:59:03 +08:00
# ============================================
Q = 400
QUALITY_THRESHOLD = 250
SHORTFALL_THRESHOLD = 0.8
def quality_factor(mu_total):
return min(1.0, QUALITY_THRESHOLD / mu_total) if mu_total > 0 else 1.0
def expected_service(q, mu, sigma):
if sigma == 0:
return min(mu, q)
z = (q - mu) / sigma
return mu * stats.norm.cdf(z) - sigma * stats.norm.pdf(z) + q * (1 - stats.norm.cdf(z))
def gini_coefficient(values):
values = np.array(values)
values = values[~np.isnan(values)]
if len(values) == 0:
return 0
values = np.sort(values)
n = len(values)
cumsum = np.cumsum(values)
return (2 * np.sum((np.arange(1, n + 1) * values)) - (n + 1) * cumsum[-1]) / (n * cumsum[-1]) if cumsum[-1] > 0 else 0
def shortfall_probability(q, mu, sigma, threshold=0.8):
if sigma == 0:
return 0 if q >= mu * threshold else 1
critical_demand = q / threshold
return 1 - stats.norm.cdf((critical_demand - mu) / sigma)
def optimal_allocation(mu_i, sigma_i, mu_j, sigma_j, Q=400):
if sigma_i + sigma_j == 0:
return mu_i
return (sigma_j * mu_i + sigma_i * (Q - mu_j)) / (sigma_i + sigma_j)
def calc_distance(lat1, lon1, lat2, lon2):
lat_avg = (lat1 + lat2) / 2
lat_avg_rad = np.radians(lat_avg)
delta_lat = lat1 - lat2
delta_lon = lon1 - lon2
return 69.0 * np.sqrt(delta_lat**2 + (np.cos(lat_avg_rad) * delta_lon)**2)
2026-01-19 14:15:51 +08:00
def compute_distance_matrix(sites_df: pd.DataFrame) -> np.ndarray:
lat = sites_df['lat'].to_numpy(dtype=float)
lon = sites_df['lon'].to_numpy(dtype=float)
lat_avg = (lat[:, None] + lat[None, :]) / 2.0
lat_avg_rad = np.radians(lat_avg)
delta_lat = lat[:, None] - lat[None, :]
delta_lon = lon[:, None] - lon[None, :]
dist = 69.0 * np.sqrt(delta_lat**2 + (np.cos(lat_avg_rad) * delta_lon)**2)
np.fill_diagonal(dist, 0.0)
return dist
2026-01-19 12:59:03 +08:00
# ============================================
# 完整流水线函数
# ============================================
2026-01-19 14:15:51 +08:00
def run_pipeline(sites_df, distance_matrix, l_max=50, mu_sum_max=450, cv_max=0.5, merge_ratio=0.5):
2026-01-19 12:59:03 +08:00
"""
运行完整的Task 3流水线返回评估指标
"""
sites = sites_df.copy()
sites['cv'] = sites['sigma'] / sites['mu']
n = len(sites)
# Step 2: 配对筛选
candidates = []
2026-01-19 14:15:51 +08:00
2026-01-19 12:59:03 +08:00
for i in range(n):
for j in range(i + 1, n):
site_i = sites.iloc[i]
site_j = sites.iloc[j]
2026-01-19 14:15:51 +08:00
dist = float(distance_matrix[i, j])
2026-01-19 12:59:03 +08:00
if dist > l_max:
continue
mu_sum = site_i['mu'] + site_j['mu']
if mu_sum > mu_sum_max:
continue
if site_i['cv'] > cv_max or site_j['cv'] > cv_max:
continue
sigma_sq_sum = site_i['sigma']**2 + site_j['sigma']**2
value = (1.0 * mu_sum / Q - 0.3 * dist / l_max - 0.5 * sigma_sq_sum / mu_sum**2)
candidates.append({
'idx_i': i, 'idx_j': j,
'site_i_id': site_i['site_id'], 'site_j_id': site_j['site_id'],
'distance': dist,
'mu_i': site_i['mu'], 'mu_j': site_j['mu'],
'sigma_i': site_i['sigma'], 'sigma_j': site_j['sigma'],
'k_i': site_i['k'], 'k_j': site_j['k'],
'mu_tilde_i': site_i['mu_tilde'], 'mu_tilde_j': site_j['mu_tilde'],
'value': value
})
if len(candidates) == 0:
E1 = (sites['k'] * sites['mu']).sum()
E2 = sum(sites['k'] * sites['mu'].apply(quality_factor) * sites['mu'])
rates = [row['k'] * row['mu'] / row['mu_tilde'] for _, row in sites.iterrows()]
F1 = gini_coefficient(rates)
F2 = min(rates)
return {
'num_pairs': 0, 'num_dual_visits': 0,
'E1': E1, 'E2': E2, 'F1': F1, 'F2': F2, 'R1': 0, 'RS': 0
}
# 贪心配对选择
df_cand = pd.DataFrame(candidates).sort_values('value', ascending=False)
selected = []
used = set()
for _, row in df_cand.iterrows():
if row['idx_i'] not in used and row['idx_j'] not in used:
selected.append(row.to_dict())
used.add(row['idx_i'])
used.add(row['idx_j'])
# Step 3: 计算最优分配
for pair in selected:
q_star = optimal_allocation(pair['mu_i'], pair['sigma_i'],
pair['mu_j'], pair['sigma_j'], Q)
pair['q_final'] = q_star
pair['E_Si'] = expected_service(q_star, pair['mu_i'], pair['sigma_i'])
pair['E_Sj'] = expected_service(Q - q_star, pair['mu_j'], pair['sigma_j'])
pair['E_total'] = pair['E_Si'] + pair['E_Sj']
# Step 4: 重分配访问次数
sites['k_single'] = sites['k'].copy()
pair_k = {}
for pair in selected:
k_i, k_j = pair['k_i'], pair['k_j']
k_ij = int(min(k_i, k_j) * merge_ratio)
if k_ij >= min(k_i, k_j):
k_ij = min(k_i, k_j) - 1
if k_ij < 1:
k_ij = 0
idx_i, idx_j = pair['idx_i'], pair['idx_j']
sites.loc[idx_i, 'k_single'] = k_i - k_ij
sites.loc[idx_j, 'k_single'] = k_j - k_ij
pair_k[(pair['site_i_id'], pair['site_j_id'])] = k_ij
# 计算释放槽位并重分配
total_single = sites['k_single'].sum()
total_dual = sum(pair_k.values())
delta_N = 730 - (total_single + total_dual)
if delta_N > 0:
total_demand = sites['mu_tilde'].sum()
sites['k_extra'] = (delta_N * sites['mu_tilde'] / total_demand).apply(np.floor).astype(int)
remainder = delta_N - sites['k_extra'].sum()
if remainder > 0:
fractional = delta_N * sites['mu_tilde'] / total_demand - sites['k_extra']
top_idx = fractional.nlargest(int(remainder)).index
sites.loc[top_idx, 'k_extra'] += 1
sites['k_single_final'] = sites['k_single'] + sites['k_extra']
else:
sites['k_single_final'] = sites['k_single']
# Step 5: 计算指标
E1 = (sites['k_single_final'] * sites['mu']).sum()
for pair in selected:
k_ij = pair_k.get((pair['site_i_id'], pair['site_j_id']), 0)
E1 += k_ij * pair['E_total']
E2 = sum(sites['k_single_final'] * sites['mu'].apply(quality_factor) * sites['mu'])
for pair in selected:
k_ij = pair_k.get((pair['site_i_id'], pair['site_j_id']), 0)
mu_sum = pair['mu_i'] + pair['mu_j']
q_factor = quality_factor(mu_sum)
E2 += k_ij * q_factor * pair['E_total']
site_satisfaction = {}
for idx, row in sites.iterrows():
r = row['k_single_final'] * row['mu'] / row['mu_tilde'] if row['mu_tilde'] > 0 else 0
site_satisfaction[row['site_id']] = r
for pair in selected:
k_ij = pair_k.get((pair['site_i_id'], pair['site_j_id']), 0)
r_i = k_ij * pair['E_Si'] / pair['mu_tilde_i'] if pair['mu_tilde_i'] > 0 else 0
r_j = k_ij * pair['E_Sj'] / pair['mu_tilde_j'] if pair['mu_tilde_j'] > 0 else 0
site_satisfaction[pair['site_i_id']] += r_i
site_satisfaction[pair['site_j_id']] += r_j
rates = list(site_satisfaction.values())
F1 = gini_coefficient(rates)
F2 = min(rates) if rates else 0
shortfall_probs = []
for pair in selected:
q = pair['q_final']
p_i = shortfall_probability(q, pair['mu_i'], pair['sigma_i'])
p_j = shortfall_probability(Q - q, pair['mu_j'], pair['sigma_j'])
shortfall_probs.append(1 - (1 - p_i) * (1 - p_j))
R1 = np.mean(shortfall_probs) if shortfall_probs else 0
RS = total_dual / 730
return {
'num_pairs': len(selected),
'num_dual_visits': total_dual,
'E1': E1, 'E2': E2, 'F1': F1, 'F2': F2, 'R1': R1, 'RS': RS
}
# ============================================
# 主程序
# ============================================
print("=" * 60)
2026-01-19 14:15:51 +08:00
print("Task 3 - Step 7: 敏感性分析 (High Res)")
2026-01-19 12:59:03 +08:00
print("=" * 60)
# 读取基础数据
2026-01-19 14:15:51 +08:00
BASE_DIR = Path(__file__).resolve().parent
SITES_PATH = BASE_DIR.parent / 'task1' / '03_allocate.xlsx'
OUTPUT_XLSX = BASE_DIR / '07_sensitivity.xlsx'
OUTPUT_FIG = BASE_DIR / 'figures' / 'fig3_sensitivity.png'
OUTPUT_FIG.parent.mkdir(parents=True, exist_ok=True)
sites_df = pd.read_excel(SITES_PATH)
print(f"站点数据: {len(sites_df)}")
# 预计算距离矩阵(高分辨率扫描时显著加速)
distance_matrix = compute_distance_matrix(sites_df)
2026-01-19 12:59:03 +08:00
# 基准参数
BASE_L_MAX = 50
BASE_MU_SUM_MAX = 450
BASE_CV_MAX = 0.5
BASE_MERGE_RATIO = 0.5
# 计算基准结果
2026-01-19 14:15:51 +08:00
base_result = run_pipeline(sites_df, distance_matrix, BASE_L_MAX, BASE_MU_SUM_MAX, BASE_CV_MAX, BASE_MERGE_RATIO)
print(f"基准结果: E1={base_result['E1']:.0f}, R1={base_result['R1']:.4f}")
# 定义扫描参数
params_config = {
'merge_ratio': np.round(np.arange(0.10, 0.90 + 1e-9, 0.01), 2),
'l_max': np.arange(10, 100 + 1e-9, 1, dtype=float),
'mu_sum_max': np.arange(350, 550 + 1e-9, 5, dtype=float),
'cv_max': np.round(np.arange(0.10, 1.00 + 1e-9, 0.01), 2),
}
2026-01-19 12:59:03 +08:00
all_results = []
2026-01-19 14:15:51 +08:00
# 执行扫描
print("\n开始参数扫描...")
# 1. Merge Ratio
print("Scanning Merge Ratio...")
for val in params_config['merge_ratio']:
res = run_pipeline(sites_df, distance_matrix, BASE_L_MAX, BASE_MU_SUM_MAX, BASE_CV_MAX, val)
res.update({'param': 'merge_ratio', 'value': val})
all_results.append(res)
# 2. Distance Threshold
print("Scanning Distance Threshold...")
for val in params_config['l_max']:
res = run_pipeline(sites_df, distance_matrix, val, BASE_MU_SUM_MAX, BASE_CV_MAX, BASE_MERGE_RATIO)
res.update({'param': 'l_max', 'value': val})
all_results.append(res)
# 3. Capacity Cap
print("Scanning Capacity Cap...")
for val in params_config['mu_sum_max']:
res = run_pipeline(sites_df, distance_matrix, BASE_L_MAX, val, BASE_CV_MAX, BASE_MERGE_RATIO)
res.update({'param': 'mu_sum_max', 'value': val})
all_results.append(res)
# 4. CV Threshold
print("Scanning CV Threshold...")
for val in params_config['cv_max']:
res = run_pipeline(sites_df, distance_matrix, BASE_L_MAX, BASE_MU_SUM_MAX, val, BASE_MERGE_RATIO)
res.update({'param': 'cv_max', 'value': val})
all_results.append(res)
# 转换为DataFrame
df_results = pd.DataFrame(all_results)
2026-01-19 12:59:03 +08:00
# ============================================
2026-01-19 14:15:51 +08:00
# 绘图
2026-01-19 12:59:03 +08:00
# ============================================
2026-01-19 14:15:51 +08:00
print("\n绘制敏感性曲线...")
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('Task 3 Sensitivity Analysis', fontsize=16, fontweight='bold')
# 参数对应的中文标签和单位
param_labels = {
'merge_ratio': ('Merge Ratio', 'Ratio'),
'l_max': ('Distance Threshold (l_max)', 'Miles'),
'mu_sum_max': ('Capacity Cap (μ_sum)', 'lbs (proxy)'),
'cv_max': ('CV Threshold', 'CV')
}
# 绘图循环
params_list = ['merge_ratio', 'l_max', 'mu_sum_max', 'cv_max']
for i, param in enumerate(params_list):
ax = axes[i // 2, i % 2]
data = df_results[df_results['param'] == param].sort_values('value')
# 绘制 E1/E2 - 左轴
2026-01-19 19:38:38 +08:00
line1, = ax.plot(
data['value'],
data['E1'],
linestyle='-',
color=MORANDI["muted_blue_dark"],
linewidth=1.8,
label='Expected Service (E1)',
)
line2, = ax.plot(
data['value'],
data['E2'],
linestyle='-',
color=MORANDI["sage"],
linewidth=1.8,
alpha=0.95,
label='Quality-weighted (E2)',
)
2026-01-19 14:15:51 +08:00
ax.set_xlabel(param_labels[param][0])
ax.set_ylabel('Service (E1/E2)')
# 绘制 R1 (风险) - 右轴
ax2 = ax.twinx()
2026-01-19 19:38:38 +08:00
line3, = ax2.plot(
data['value'],
data['R1'],
linestyle='--',
color=MORANDI["terracotta"],
linewidth=1.8,
label='Shortfall Risk (R1)',
)
ax2.set_ylabel('Risk Probability (R1)', color=MORANDI["terracotta"])
ax2.tick_params(axis='y', labelcolor=MORANDI["terracotta"])
2026-01-19 14:15:51 +08:00
# 添加基准线
if param == 'merge_ratio':
base_val = BASE_MERGE_RATIO
elif param == 'l_max':
base_val = BASE_L_MAX
elif param == 'mu_sum_max':
base_val = BASE_MU_SUM_MAX
elif param == 'cv_max':
base_val = BASE_CV_MAX
2026-01-19 19:38:38 +08:00
ax.axvline(x=base_val, color=MORANDI["warm_gray"], linestyle=':', alpha=0.65)
2026-01-19 14:15:51 +08:00
# 图例
lines = [line1, line2, line3]
labels = [l.get_label() for l in lines]
ax.legend(lines, labels, loc='best', frameon=True)
ax.set_title(f'Effect of {param_labels[param][0]}')
2026-01-19 19:38:38 +08:00
ax.grid(True, alpha=0.55)
2026-01-19 14:15:51 +08:00
plt.tight_layout(rect=[0, 0.03, 1, 0.95])
2026-01-20 01:55:46 +08:00
plt.savefig(OUTPUT_FIG, dpi=300, facecolor=FIG_BG)
2026-01-19 14:15:51 +08:00
print(f"图表已保存至: {OUTPUT_FIG}")
2026-01-19 12:59:03 +08:00
# ============================================
2026-01-19 14:15:51 +08:00
# 保存详细数据
2026-01-19 12:59:03 +08:00
# ============================================
2026-01-19 14:15:51 +08:00
with pd.ExcelWriter(OUTPUT_XLSX, engine='openpyxl') as writer:
df_results.to_excel(writer, sheet_name='detailed_data', index=False)
# 摘要统计
summary = []
for param in params_list:
sub = df_results[df_results['param'] == param]
summary.append({
'Parameter': param,
'E1_Range': f"{sub['E1'].min():.0f} - {sub['E1'].max():.0f}",
'E1_Var%': (sub['E1'].max() - sub['E1'].min()) / base_result['E1'] * 100,
'E2_Range': f"{sub['E2'].min():.0f} - {sub['E2'].max():.0f}",
'E2_Var%': (sub['E2'].max() - sub['E2'].min()) / base_result['E2'] * 100,
'R1_Range': f"{sub['R1'].min():.4f} - {sub['R1'].max():.4f}"
2026-01-19 12:59:03 +08:00
})
2026-01-19 14:15:51 +08:00
pd.DataFrame(summary).to_excel(writer, sheet_name='summary', index=False)
print(f"数据已保存至: {OUTPUT_XLSX}")
print("完成!")