Files
mcm-mfp/task3/07_sensitivity.py
2026-01-19 19:38:38 +08:00

464 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Task 3 - Step 7: 敏感性分析 (Refined)
=====================================
分析以下参数对模型输出的影响 (高分辨率扫描):
1. 合并比例 r_merge: 0.1 ~ 0.9 (步长 0.01)
2. 距离阈值 l_max: 10 ~ 100 miles (步长 1)
3. 容量上限 μ_sum_max: 350 ~ 550 (步长 5)
4. CV阈值: 0.1 ~ 1.0 (步长 0.01)
输出:
- 07_sensitivity.xlsx (详细数据)
- figures/fig3_sensitivity.png (敏感性曲线图)
"""
import pandas as pd
import numpy as np
from scipy import stats
import warnings
import os
import tempfile
from pathlib import Path
warnings.filterwarnings('ignore')
# Matplotlib cache dirs避免 home 目录不可写导致的警告)
_mpl_config_dir = Path(tempfile.gettempdir()) / "mm_task3_mplconfig"
_xdg_cache_dir = Path(tempfile.gettempdir()) / "mm_task3_xdg_cache"
_mpl_config_dir.mkdir(parents=True, exist_ok=True)
_xdg_cache_dir.mkdir(parents=True, exist_ok=True)
os.environ.setdefault("MPLCONFIGDIR", str(_mpl_config_dir))
os.environ.setdefault("XDG_CACHE_HOME", str(_xdg_cache_dir))
import matplotlib.pyplot as plt
from cycler import cycler
# 设置绘图风格(低饱和度 Morandi 色系)
MORANDI = {
"bg": "#F5F2ED",
"grid": "#D7D1C7",
"text": "#3F3F3F",
"muted_blue": "#8FA3A7",
"muted_blue_dark": "#6E858A",
"sage": "#AEB8A6",
"terracotta": "#B07A6A",
"warm_gray": "#8C857A",
}
plt.rcParams.update(
{
"figure.facecolor": MORANDI["bg"],
"axes.facecolor": MORANDI["bg"],
"savefig.facecolor": MORANDI["bg"],
"axes.edgecolor": MORANDI["grid"],
"axes.labelcolor": MORANDI["text"],
"xtick.color": MORANDI["text"],
"ytick.color": MORANDI["text"],
"text.color": MORANDI["text"],
"grid.color": MORANDI["grid"],
"grid.alpha": 0.55,
"axes.grid": True,
"axes.prop_cycle": cycler(
"color",
[
MORANDI["muted_blue"],
MORANDI["sage"],
MORANDI["terracotta"],
MORANDI["warm_gray"],
],
),
"legend.frameon": True,
"legend.framealpha": 0.9,
"legend.facecolor": MORANDI["bg"],
"legend.edgecolor": MORANDI["grid"],
"font.sans-serif": ["Arial Unicode MS", "SimHei", "DejaVu Sans"],
"axes.unicode_minus": False,
}
)
# ============================================
# 基础参数和函数 (保持不变)
# ============================================
Q = 400
QUALITY_THRESHOLD = 250
SHORTFALL_THRESHOLD = 0.8
def quality_factor(mu_total):
return min(1.0, QUALITY_THRESHOLD / mu_total) if mu_total > 0 else 1.0
def expected_service(q, mu, sigma):
if sigma == 0:
return min(mu, q)
z = (q - mu) / sigma
return mu * stats.norm.cdf(z) - sigma * stats.norm.pdf(z) + q * (1 - stats.norm.cdf(z))
def gini_coefficient(values):
values = np.array(values)
values = values[~np.isnan(values)]
if len(values) == 0:
return 0
values = np.sort(values)
n = len(values)
cumsum = np.cumsum(values)
return (2 * np.sum((np.arange(1, n + 1) * values)) - (n + 1) * cumsum[-1]) / (n * cumsum[-1]) if cumsum[-1] > 0 else 0
def shortfall_probability(q, mu, sigma, threshold=0.8):
if sigma == 0:
return 0 if q >= mu * threshold else 1
critical_demand = q / threshold
return 1 - stats.norm.cdf((critical_demand - mu) / sigma)
def optimal_allocation(mu_i, sigma_i, mu_j, sigma_j, Q=400):
if sigma_i + sigma_j == 0:
return mu_i
return (sigma_j * mu_i + sigma_i * (Q - mu_j)) / (sigma_i + sigma_j)
def calc_distance(lat1, lon1, lat2, lon2):
lat_avg = (lat1 + lat2) / 2
lat_avg_rad = np.radians(lat_avg)
delta_lat = lat1 - lat2
delta_lon = lon1 - lon2
return 69.0 * np.sqrt(delta_lat**2 + (np.cos(lat_avg_rad) * delta_lon)**2)
def compute_distance_matrix(sites_df: pd.DataFrame) -> np.ndarray:
lat = sites_df['lat'].to_numpy(dtype=float)
lon = sites_df['lon'].to_numpy(dtype=float)
lat_avg = (lat[:, None] + lat[None, :]) / 2.0
lat_avg_rad = np.radians(lat_avg)
delta_lat = lat[:, None] - lat[None, :]
delta_lon = lon[:, None] - lon[None, :]
dist = 69.0 * np.sqrt(delta_lat**2 + (np.cos(lat_avg_rad) * delta_lon)**2)
np.fill_diagonal(dist, 0.0)
return dist
# ============================================
# 完整流水线函数
# ============================================
def run_pipeline(sites_df, distance_matrix, l_max=50, mu_sum_max=450, cv_max=0.5, merge_ratio=0.5):
"""
运行完整的Task 3流水线返回评估指标
"""
sites = sites_df.copy()
sites['cv'] = sites['sigma'] / sites['mu']
n = len(sites)
# Step 2: 配对筛选
candidates = []
for i in range(n):
for j in range(i + 1, n):
site_i = sites.iloc[i]
site_j = sites.iloc[j]
dist = float(distance_matrix[i, j])
if dist > l_max:
continue
mu_sum = site_i['mu'] + site_j['mu']
if mu_sum > mu_sum_max:
continue
if site_i['cv'] > cv_max or site_j['cv'] > cv_max:
continue
sigma_sq_sum = site_i['sigma']**2 + site_j['sigma']**2
value = (1.0 * mu_sum / Q - 0.3 * dist / l_max - 0.5 * sigma_sq_sum / mu_sum**2)
candidates.append({
'idx_i': i, 'idx_j': j,
'site_i_id': site_i['site_id'], 'site_j_id': site_j['site_id'],
'distance': dist,
'mu_i': site_i['mu'], 'mu_j': site_j['mu'],
'sigma_i': site_i['sigma'], 'sigma_j': site_j['sigma'],
'k_i': site_i['k'], 'k_j': site_j['k'],
'mu_tilde_i': site_i['mu_tilde'], 'mu_tilde_j': site_j['mu_tilde'],
'value': value
})
if len(candidates) == 0:
E1 = (sites['k'] * sites['mu']).sum()
E2 = sum(sites['k'] * sites['mu'].apply(quality_factor) * sites['mu'])
rates = [row['k'] * row['mu'] / row['mu_tilde'] for _, row in sites.iterrows()]
F1 = gini_coefficient(rates)
F2 = min(rates)
return {
'num_pairs': 0, 'num_dual_visits': 0,
'E1': E1, 'E2': E2, 'F1': F1, 'F2': F2, 'R1': 0, 'RS': 0
}
# 贪心配对选择
df_cand = pd.DataFrame(candidates).sort_values('value', ascending=False)
selected = []
used = set()
for _, row in df_cand.iterrows():
if row['idx_i'] not in used and row['idx_j'] not in used:
selected.append(row.to_dict())
used.add(row['idx_i'])
used.add(row['idx_j'])
# Step 3: 计算最优分配
for pair in selected:
q_star = optimal_allocation(pair['mu_i'], pair['sigma_i'],
pair['mu_j'], pair['sigma_j'], Q)
pair['q_final'] = q_star
pair['E_Si'] = expected_service(q_star, pair['mu_i'], pair['sigma_i'])
pair['E_Sj'] = expected_service(Q - q_star, pair['mu_j'], pair['sigma_j'])
pair['E_total'] = pair['E_Si'] + pair['E_Sj']
# Step 4: 重分配访问次数
sites['k_single'] = sites['k'].copy()
pair_k = {}
for pair in selected:
k_i, k_j = pair['k_i'], pair['k_j']
k_ij = int(min(k_i, k_j) * merge_ratio)
if k_ij >= min(k_i, k_j):
k_ij = min(k_i, k_j) - 1
if k_ij < 1:
k_ij = 0
idx_i, idx_j = pair['idx_i'], pair['idx_j']
sites.loc[idx_i, 'k_single'] = k_i - k_ij
sites.loc[idx_j, 'k_single'] = k_j - k_ij
pair_k[(pair['site_i_id'], pair['site_j_id'])] = k_ij
# 计算释放槽位并重分配
total_single = sites['k_single'].sum()
total_dual = sum(pair_k.values())
delta_N = 730 - (total_single + total_dual)
if delta_N > 0:
total_demand = sites['mu_tilde'].sum()
sites['k_extra'] = (delta_N * sites['mu_tilde'] / total_demand).apply(np.floor).astype(int)
remainder = delta_N - sites['k_extra'].sum()
if remainder > 0:
fractional = delta_N * sites['mu_tilde'] / total_demand - sites['k_extra']
top_idx = fractional.nlargest(int(remainder)).index
sites.loc[top_idx, 'k_extra'] += 1
sites['k_single_final'] = sites['k_single'] + sites['k_extra']
else:
sites['k_single_final'] = sites['k_single']
# Step 5: 计算指标
E1 = (sites['k_single_final'] * sites['mu']).sum()
for pair in selected:
k_ij = pair_k.get((pair['site_i_id'], pair['site_j_id']), 0)
E1 += k_ij * pair['E_total']
E2 = sum(sites['k_single_final'] * sites['mu'].apply(quality_factor) * sites['mu'])
for pair in selected:
k_ij = pair_k.get((pair['site_i_id'], pair['site_j_id']), 0)
mu_sum = pair['mu_i'] + pair['mu_j']
q_factor = quality_factor(mu_sum)
E2 += k_ij * q_factor * pair['E_total']
site_satisfaction = {}
for idx, row in sites.iterrows():
r = row['k_single_final'] * row['mu'] / row['mu_tilde'] if row['mu_tilde'] > 0 else 0
site_satisfaction[row['site_id']] = r
for pair in selected:
k_ij = pair_k.get((pair['site_i_id'], pair['site_j_id']), 0)
r_i = k_ij * pair['E_Si'] / pair['mu_tilde_i'] if pair['mu_tilde_i'] > 0 else 0
r_j = k_ij * pair['E_Sj'] / pair['mu_tilde_j'] if pair['mu_tilde_j'] > 0 else 0
site_satisfaction[pair['site_i_id']] += r_i
site_satisfaction[pair['site_j_id']] += r_j
rates = list(site_satisfaction.values())
F1 = gini_coefficient(rates)
F2 = min(rates) if rates else 0
shortfall_probs = []
for pair in selected:
q = pair['q_final']
p_i = shortfall_probability(q, pair['mu_i'], pair['sigma_i'])
p_j = shortfall_probability(Q - q, pair['mu_j'], pair['sigma_j'])
shortfall_probs.append(1 - (1 - p_i) * (1 - p_j))
R1 = np.mean(shortfall_probs) if shortfall_probs else 0
RS = total_dual / 730
return {
'num_pairs': len(selected),
'num_dual_visits': total_dual,
'E1': E1, 'E2': E2, 'F1': F1, 'F2': F2, 'R1': R1, 'RS': RS
}
# ============================================
# 主程序
# ============================================
print("=" * 60)
print("Task 3 - Step 7: 敏感性分析 (High Res)")
print("=" * 60)
# 读取基础数据
BASE_DIR = Path(__file__).resolve().parent
SITES_PATH = BASE_DIR.parent / 'task1' / '03_allocate.xlsx'
OUTPUT_XLSX = BASE_DIR / '07_sensitivity.xlsx'
OUTPUT_FIG = BASE_DIR / 'figures' / 'fig3_sensitivity.png'
OUTPUT_FIG.parent.mkdir(parents=True, exist_ok=True)
sites_df = pd.read_excel(SITES_PATH)
print(f"站点数据: {len(sites_df)}")
# 预计算距离矩阵(高分辨率扫描时显著加速)
distance_matrix = compute_distance_matrix(sites_df)
# 基准参数
BASE_L_MAX = 50
BASE_MU_SUM_MAX = 450
BASE_CV_MAX = 0.5
BASE_MERGE_RATIO = 0.5
# 计算基准结果
base_result = run_pipeline(sites_df, distance_matrix, BASE_L_MAX, BASE_MU_SUM_MAX, BASE_CV_MAX, BASE_MERGE_RATIO)
print(f"基准结果: E1={base_result['E1']:.0f}, R1={base_result['R1']:.4f}")
# 定义扫描参数
params_config = {
'merge_ratio': np.round(np.arange(0.10, 0.90 + 1e-9, 0.01), 2),
'l_max': np.arange(10, 100 + 1e-9, 1, dtype=float),
'mu_sum_max': np.arange(350, 550 + 1e-9, 5, dtype=float),
'cv_max': np.round(np.arange(0.10, 1.00 + 1e-9, 0.01), 2),
}
all_results = []
# 执行扫描
print("\n开始参数扫描...")
# 1. Merge Ratio
print("Scanning Merge Ratio...")
for val in params_config['merge_ratio']:
res = run_pipeline(sites_df, distance_matrix, BASE_L_MAX, BASE_MU_SUM_MAX, BASE_CV_MAX, val)
res.update({'param': 'merge_ratio', 'value': val})
all_results.append(res)
# 2. Distance Threshold
print("Scanning Distance Threshold...")
for val in params_config['l_max']:
res = run_pipeline(sites_df, distance_matrix, val, BASE_MU_SUM_MAX, BASE_CV_MAX, BASE_MERGE_RATIO)
res.update({'param': 'l_max', 'value': val})
all_results.append(res)
# 3. Capacity Cap
print("Scanning Capacity Cap...")
for val in params_config['mu_sum_max']:
res = run_pipeline(sites_df, distance_matrix, BASE_L_MAX, val, BASE_CV_MAX, BASE_MERGE_RATIO)
res.update({'param': 'mu_sum_max', 'value': val})
all_results.append(res)
# 4. CV Threshold
print("Scanning CV Threshold...")
for val in params_config['cv_max']:
res = run_pipeline(sites_df, distance_matrix, BASE_L_MAX, BASE_MU_SUM_MAX, val, BASE_MERGE_RATIO)
res.update({'param': 'cv_max', 'value': val})
all_results.append(res)
# 转换为DataFrame
df_results = pd.DataFrame(all_results)
# ============================================
# 绘图
# ============================================
print("\n绘制敏感性曲线...")
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('Task 3 Sensitivity Analysis', fontsize=16, fontweight='bold')
# 参数对应的中文标签和单位
param_labels = {
'merge_ratio': ('Merge Ratio', 'Ratio'),
'l_max': ('Distance Threshold (l_max)', 'Miles'),
'mu_sum_max': ('Capacity Cap (μ_sum)', 'lbs (proxy)'),
'cv_max': ('CV Threshold', 'CV')
}
# 绘图循环
params_list = ['merge_ratio', 'l_max', 'mu_sum_max', 'cv_max']
for i, param in enumerate(params_list):
ax = axes[i // 2, i % 2]
data = df_results[df_results['param'] == param].sort_values('value')
# 绘制 E1/E2 - 左轴
line1, = ax.plot(
data['value'],
data['E1'],
linestyle='-',
color=MORANDI["muted_blue_dark"],
linewidth=1.8,
label='Expected Service (E1)',
)
line2, = ax.plot(
data['value'],
data['E2'],
linestyle='-',
color=MORANDI["sage"],
linewidth=1.8,
alpha=0.95,
label='Quality-weighted (E2)',
)
ax.set_xlabel(param_labels[param][0])
ax.set_ylabel('Service (E1/E2)')
# 绘制 R1 (风险) - 右轴
ax2 = ax.twinx()
line3, = ax2.plot(
data['value'],
data['R1'],
linestyle='--',
color=MORANDI["terracotta"],
linewidth=1.8,
label='Shortfall Risk (R1)',
)
ax2.set_ylabel('Risk Probability (R1)', color=MORANDI["terracotta"])
ax2.tick_params(axis='y', labelcolor=MORANDI["terracotta"])
# 添加基准线
if param == 'merge_ratio':
base_val = BASE_MERGE_RATIO
elif param == 'l_max':
base_val = BASE_L_MAX
elif param == 'mu_sum_max':
base_val = BASE_MU_SUM_MAX
elif param == 'cv_max':
base_val = BASE_CV_MAX
ax.axvline(x=base_val, color=MORANDI["warm_gray"], linestyle=':', alpha=0.65)
# 图例
lines = [line1, line2, line3]
labels = [l.get_label() for l in lines]
ax.legend(lines, labels, loc='best', frameon=True)
ax.set_title(f'Effect of {param_labels[param][0]}')
ax.grid(True, alpha=0.55)
plt.tight_layout(rect=[0, 0.03, 1, 0.95])
plt.savefig(OUTPUT_FIG, dpi=300)
print(f"图表已保存至: {OUTPUT_FIG}")
# ============================================
# 保存详细数据
# ============================================
with pd.ExcelWriter(OUTPUT_XLSX, engine='openpyxl') as writer:
df_results.to_excel(writer, sheet_name='detailed_data', index=False)
# 摘要统计
summary = []
for param in params_list:
sub = df_results[df_results['param'] == param]
summary.append({
'Parameter': param,
'E1_Range': f"{sub['E1'].min():.0f} - {sub['E1'].max():.0f}",
'E1_Var%': (sub['E1'].max() - sub['E1'].min()) / base_result['E1'] * 100,
'E2_Range': f"{sub['E2'].min():.0f} - {sub['E2'].max():.0f}",
'E2_Var%': (sub['E2'].max() - sub['E2'].min()) / base_result['E2'] * 100,
'R1_Range': f"{sub['R1'].min():.4f} - {sub['R1'].max():.4f}"
})
pd.DataFrame(summary).to_excel(writer, sheet_name='summary', index=False)
print(f"数据已保存至: {OUTPUT_XLSX}")
print("完成!")