""" Task 3 - Step 7: 敏感性分析 (Refined) ===================================== 分析以下参数对模型输出的影响 (高分辨率扫描): 1. 合并比例 r_merge: 0.1 ~ 0.9 (步长 0.01) 2. 距离阈值 l_max: 10 ~ 100 miles (步长 1) 3. 容量上限 μ_sum_max: 350 ~ 550 (步长 5) 4. CV阈值: 0.1 ~ 1.0 (步长 0.01) 输出: - 07_sensitivity.xlsx (详细数据) - figures/fig3_sensitivity.png (敏感性曲线图) """ import pandas as pd import numpy as np from scipy import stats import warnings import os import tempfile from pathlib import Path warnings.filterwarnings('ignore') # Matplotlib cache dirs(避免 home 目录不可写导致的警告) _mpl_config_dir = Path(tempfile.gettempdir()) / "mm_task3_mplconfig" _xdg_cache_dir = Path(tempfile.gettempdir()) / "mm_task3_xdg_cache" _mpl_config_dir.mkdir(parents=True, exist_ok=True) _xdg_cache_dir.mkdir(parents=True, exist_ok=True) os.environ.setdefault("MPLCONFIGDIR", str(_mpl_config_dir)) os.environ.setdefault("XDG_CACHE_HOME", str(_xdg_cache_dir)) import matplotlib.pyplot as plt from cycler import cycler # 设置绘图风格(低饱和度 Morandi 色系) MORANDI = { "grid": "#D7D1C7", "text": "#3F3F3F", "muted_blue": "#8FA3A7", "muted_blue_dark": "#6E858A", "sage": "#AEB8A6", "terracotta": "#B07A6A", "warm_gray": "#8C857A", } FIG_BG = "#FFFFFF" FIG_GRID = "#DADDE1" plt.rcParams.update( { "figure.facecolor": FIG_BG, "axes.facecolor": FIG_BG, "savefig.facecolor": FIG_BG, "axes.edgecolor": FIG_GRID, "axes.labelcolor": MORANDI["text"], "xtick.color": MORANDI["text"], "ytick.color": MORANDI["text"], "text.color": MORANDI["text"], "grid.color": FIG_GRID, "grid.alpha": 0.55, "axes.grid": True, "axes.prop_cycle": cycler( "color", [ MORANDI["muted_blue"], MORANDI["sage"], MORANDI["terracotta"], MORANDI["warm_gray"], ], ), "legend.frameon": True, "legend.framealpha": 0.9, "legend.facecolor": FIG_BG, "legend.edgecolor": FIG_GRID, "font.sans-serif": ["Arial Unicode MS", "SimHei", "DejaVu Sans"], "axes.unicode_minus": False, } ) # ============================================ # 基础参数和函数 (保持不变) # ============================================ Q = 400 QUALITY_THRESHOLD = 250 SHORTFALL_THRESHOLD = 0.8 def quality_factor(mu_total): return min(1.0, QUALITY_THRESHOLD / mu_total) if mu_total > 0 else 1.0 def expected_service(q, mu, sigma): if sigma == 0: return min(mu, q) z = (q - mu) / sigma return mu * stats.norm.cdf(z) - sigma * stats.norm.pdf(z) + q * (1 - stats.norm.cdf(z)) def gini_coefficient(values): values = np.array(values) values = values[~np.isnan(values)] if len(values) == 0: return 0 values = np.sort(values) n = len(values) cumsum = np.cumsum(values) return (2 * np.sum((np.arange(1, n + 1) * values)) - (n + 1) * cumsum[-1]) / (n * cumsum[-1]) if cumsum[-1] > 0 else 0 def shortfall_probability(q, mu, sigma, threshold=0.8): if sigma == 0: return 0 if q >= mu * threshold else 1 critical_demand = q / threshold return 1 - stats.norm.cdf((critical_demand - mu) / sigma) def optimal_allocation(mu_i, sigma_i, mu_j, sigma_j, Q=400): if sigma_i + sigma_j == 0: return mu_i return (sigma_j * mu_i + sigma_i * (Q - mu_j)) / (sigma_i + sigma_j) def calc_distance(lat1, lon1, lat2, lon2): lat_avg = (lat1 + lat2) / 2 lat_avg_rad = np.radians(lat_avg) delta_lat = lat1 - lat2 delta_lon = lon1 - lon2 return 69.0 * np.sqrt(delta_lat**2 + (np.cos(lat_avg_rad) * delta_lon)**2) def compute_distance_matrix(sites_df: pd.DataFrame) -> np.ndarray: lat = sites_df['lat'].to_numpy(dtype=float) lon = sites_df['lon'].to_numpy(dtype=float) lat_avg = (lat[:, None] + lat[None, :]) / 2.0 lat_avg_rad = np.radians(lat_avg) delta_lat = lat[:, None] - lat[None, :] delta_lon = lon[:, None] - lon[None, :] dist = 69.0 * np.sqrt(delta_lat**2 + (np.cos(lat_avg_rad) * delta_lon)**2) np.fill_diagonal(dist, 0.0) return dist # ============================================ # 完整流水线函数 # ============================================ def run_pipeline(sites_df, distance_matrix, l_max=50, mu_sum_max=450, cv_max=0.5, merge_ratio=0.5): """ 运行完整的Task 3流水线,返回评估指标 """ sites = sites_df.copy() sites['cv'] = sites['sigma'] / sites['mu'] n = len(sites) # Step 2: 配对筛选 candidates = [] for i in range(n): for j in range(i + 1, n): site_i = sites.iloc[i] site_j = sites.iloc[j] dist = float(distance_matrix[i, j]) if dist > l_max: continue mu_sum = site_i['mu'] + site_j['mu'] if mu_sum > mu_sum_max: continue if site_i['cv'] > cv_max or site_j['cv'] > cv_max: continue sigma_sq_sum = site_i['sigma']**2 + site_j['sigma']**2 value = (1.0 * mu_sum / Q - 0.3 * dist / l_max - 0.5 * sigma_sq_sum / mu_sum**2) candidates.append({ 'idx_i': i, 'idx_j': j, 'site_i_id': site_i['site_id'], 'site_j_id': site_j['site_id'], 'distance': dist, 'mu_i': site_i['mu'], 'mu_j': site_j['mu'], 'sigma_i': site_i['sigma'], 'sigma_j': site_j['sigma'], 'k_i': site_i['k'], 'k_j': site_j['k'], 'mu_tilde_i': site_i['mu_tilde'], 'mu_tilde_j': site_j['mu_tilde'], 'value': value }) if len(candidates) == 0: E1 = (sites['k'] * sites['mu']).sum() E2 = sum(sites['k'] * sites['mu'].apply(quality_factor) * sites['mu']) rates = [row['k'] * row['mu'] / row['mu_tilde'] for _, row in sites.iterrows()] F1 = gini_coefficient(rates) F2 = min(rates) return { 'num_pairs': 0, 'num_dual_visits': 0, 'E1': E1, 'E2': E2, 'F1': F1, 'F2': F2, 'R1': 0, 'RS': 0 } # 贪心配对选择 df_cand = pd.DataFrame(candidates).sort_values('value', ascending=False) selected = [] used = set() for _, row in df_cand.iterrows(): if row['idx_i'] not in used and row['idx_j'] not in used: selected.append(row.to_dict()) used.add(row['idx_i']) used.add(row['idx_j']) # Step 3: 计算最优分配 for pair in selected: q_star = optimal_allocation(pair['mu_i'], pair['sigma_i'], pair['mu_j'], pair['sigma_j'], Q) pair['q_final'] = q_star pair['E_Si'] = expected_service(q_star, pair['mu_i'], pair['sigma_i']) pair['E_Sj'] = expected_service(Q - q_star, pair['mu_j'], pair['sigma_j']) pair['E_total'] = pair['E_Si'] + pair['E_Sj'] # Step 4: 重分配访问次数 sites['k_single'] = sites['k'].copy() pair_k = {} for pair in selected: k_i, k_j = pair['k_i'], pair['k_j'] k_ij = int(min(k_i, k_j) * merge_ratio) if k_ij >= min(k_i, k_j): k_ij = min(k_i, k_j) - 1 if k_ij < 1: k_ij = 0 idx_i, idx_j = pair['idx_i'], pair['idx_j'] sites.loc[idx_i, 'k_single'] = k_i - k_ij sites.loc[idx_j, 'k_single'] = k_j - k_ij pair_k[(pair['site_i_id'], pair['site_j_id'])] = k_ij # 计算释放槽位并重分配 total_single = sites['k_single'].sum() total_dual = sum(pair_k.values()) delta_N = 730 - (total_single + total_dual) if delta_N > 0: total_demand = sites['mu_tilde'].sum() sites['k_extra'] = (delta_N * sites['mu_tilde'] / total_demand).apply(np.floor).astype(int) remainder = delta_N - sites['k_extra'].sum() if remainder > 0: fractional = delta_N * sites['mu_tilde'] / total_demand - sites['k_extra'] top_idx = fractional.nlargest(int(remainder)).index sites.loc[top_idx, 'k_extra'] += 1 sites['k_single_final'] = sites['k_single'] + sites['k_extra'] else: sites['k_single_final'] = sites['k_single'] # Step 5: 计算指标 E1 = (sites['k_single_final'] * sites['mu']).sum() for pair in selected: k_ij = pair_k.get((pair['site_i_id'], pair['site_j_id']), 0) E1 += k_ij * pair['E_total'] E2 = sum(sites['k_single_final'] * sites['mu'].apply(quality_factor) * sites['mu']) for pair in selected: k_ij = pair_k.get((pair['site_i_id'], pair['site_j_id']), 0) mu_sum = pair['mu_i'] + pair['mu_j'] q_factor = quality_factor(mu_sum) E2 += k_ij * q_factor * pair['E_total'] site_satisfaction = {} for idx, row in sites.iterrows(): r = row['k_single_final'] * row['mu'] / row['mu_tilde'] if row['mu_tilde'] > 0 else 0 site_satisfaction[row['site_id']] = r for pair in selected: k_ij = pair_k.get((pair['site_i_id'], pair['site_j_id']), 0) r_i = k_ij * pair['E_Si'] / pair['mu_tilde_i'] if pair['mu_tilde_i'] > 0 else 0 r_j = k_ij * pair['E_Sj'] / pair['mu_tilde_j'] if pair['mu_tilde_j'] > 0 else 0 site_satisfaction[pair['site_i_id']] += r_i site_satisfaction[pair['site_j_id']] += r_j rates = list(site_satisfaction.values()) F1 = gini_coefficient(rates) F2 = min(rates) if rates else 0 shortfall_probs = [] for pair in selected: q = pair['q_final'] p_i = shortfall_probability(q, pair['mu_i'], pair['sigma_i']) p_j = shortfall_probability(Q - q, pair['mu_j'], pair['sigma_j']) shortfall_probs.append(1 - (1 - p_i) * (1 - p_j)) R1 = np.mean(shortfall_probs) if shortfall_probs else 0 RS = total_dual / 730 return { 'num_pairs': len(selected), 'num_dual_visits': total_dual, 'E1': E1, 'E2': E2, 'F1': F1, 'F2': F2, 'R1': R1, 'RS': RS } # ============================================ # 主程序 # ============================================ print("=" * 60) print("Task 3 - Step 7: 敏感性分析 (High Res)") print("=" * 60) # 读取基础数据 BASE_DIR = Path(__file__).resolve().parent SITES_PATH = BASE_DIR.parent / 'task1' / '03_allocate.xlsx' OUTPUT_XLSX = BASE_DIR / '07_sensitivity.xlsx' OUTPUT_FIG = BASE_DIR / 'figures' / 'fig3_sensitivity.png' OUTPUT_FIG.parent.mkdir(parents=True, exist_ok=True) sites_df = pd.read_excel(SITES_PATH) print(f"站点数据: {len(sites_df)} 个") # 预计算距离矩阵(高分辨率扫描时显著加速) distance_matrix = compute_distance_matrix(sites_df) # 基准参数 BASE_L_MAX = 50 BASE_MU_SUM_MAX = 450 BASE_CV_MAX = 0.5 BASE_MERGE_RATIO = 0.5 # 计算基准结果 base_result = run_pipeline(sites_df, distance_matrix, BASE_L_MAX, BASE_MU_SUM_MAX, BASE_CV_MAX, BASE_MERGE_RATIO) print(f"基准结果: E1={base_result['E1']:.0f}, R1={base_result['R1']:.4f}") # 定义扫描参数 params_config = { 'merge_ratio': np.round(np.arange(0.10, 0.90 + 1e-9, 0.01), 2), 'l_max': np.arange(10, 100 + 1e-9, 1, dtype=float), 'mu_sum_max': np.arange(350, 550 + 1e-9, 5, dtype=float), 'cv_max': np.round(np.arange(0.10, 1.00 + 1e-9, 0.01), 2), } all_results = [] # 执行扫描 print("\n开始参数扫描...") # 1. Merge Ratio print("Scanning Merge Ratio...") for val in params_config['merge_ratio']: res = run_pipeline(sites_df, distance_matrix, BASE_L_MAX, BASE_MU_SUM_MAX, BASE_CV_MAX, val) res.update({'param': 'merge_ratio', 'value': val}) all_results.append(res) # 2. Distance Threshold print("Scanning Distance Threshold...") for val in params_config['l_max']: res = run_pipeline(sites_df, distance_matrix, val, BASE_MU_SUM_MAX, BASE_CV_MAX, BASE_MERGE_RATIO) res.update({'param': 'l_max', 'value': val}) all_results.append(res) # 3. Capacity Cap print("Scanning Capacity Cap...") for val in params_config['mu_sum_max']: res = run_pipeline(sites_df, distance_matrix, BASE_L_MAX, val, BASE_CV_MAX, BASE_MERGE_RATIO) res.update({'param': 'mu_sum_max', 'value': val}) all_results.append(res) # 4. CV Threshold print("Scanning CV Threshold...") for val in params_config['cv_max']: res = run_pipeline(sites_df, distance_matrix, BASE_L_MAX, BASE_MU_SUM_MAX, val, BASE_MERGE_RATIO) res.update({'param': 'cv_max', 'value': val}) all_results.append(res) # 转换为DataFrame df_results = pd.DataFrame(all_results) # ============================================ # 绘图 # ============================================ print("\n绘制敏感性曲线...") fig, axes = plt.subplots(2, 2, figsize=(15, 12)) fig.suptitle('Task 3 Sensitivity Analysis', fontsize=16, fontweight='bold') # 参数对应的中文标签和单位 param_labels = { 'merge_ratio': ('Merge Ratio', 'Ratio'), 'l_max': ('Distance Threshold (l_max)', 'Miles'), 'mu_sum_max': ('Capacity Cap (μ_sum)', 'lbs (proxy)'), 'cv_max': ('CV Threshold', 'CV') } # 绘图循环 params_list = ['merge_ratio', 'l_max', 'mu_sum_max', 'cv_max'] for i, param in enumerate(params_list): ax = axes[i // 2, i % 2] data = df_results[df_results['param'] == param].sort_values('value') # 绘制 E1/E2 - 左轴 line1, = ax.plot( data['value'], data['E1'], linestyle='-', color=MORANDI["muted_blue_dark"], linewidth=1.8, label='Expected Service (E1)', ) line2, = ax.plot( data['value'], data['E2'], linestyle='-', color=MORANDI["sage"], linewidth=1.8, alpha=0.95, label='Quality-weighted (E2)', ) ax.set_xlabel(param_labels[param][0]) ax.set_ylabel('Service (E1/E2)') # 绘制 R1 (风险) - 右轴 ax2 = ax.twinx() line3, = ax2.plot( data['value'], data['R1'], linestyle='--', color=MORANDI["terracotta"], linewidth=1.8, label='Shortfall Risk (R1)', ) ax2.set_ylabel('Risk Probability (R1)', color=MORANDI["terracotta"]) ax2.tick_params(axis='y', labelcolor=MORANDI["terracotta"]) # 添加基准线 if param == 'merge_ratio': base_val = BASE_MERGE_RATIO elif param == 'l_max': base_val = BASE_L_MAX elif param == 'mu_sum_max': base_val = BASE_MU_SUM_MAX elif param == 'cv_max': base_val = BASE_CV_MAX ax.axvline(x=base_val, color=MORANDI["warm_gray"], linestyle=':', alpha=0.65) # 图例 lines = [line1, line2, line3] labels = [l.get_label() for l in lines] ax.legend(lines, labels, loc='best', frameon=True) ax.set_title(f'Effect of {param_labels[param][0]}') ax.grid(True, alpha=0.55) plt.tight_layout(rect=[0, 0.03, 1, 0.95]) plt.savefig(OUTPUT_FIG, dpi=300, facecolor=FIG_BG) print(f"图表已保存至: {OUTPUT_FIG}") # ============================================ # 保存详细数据 # ============================================ with pd.ExcelWriter(OUTPUT_XLSX, engine='openpyxl') as writer: df_results.to_excel(writer, sheet_name='detailed_data', index=False) # 摘要统计 summary = [] for param in params_list: sub = df_results[df_results['param'] == param] summary.append({ 'Parameter': param, 'E1_Range': f"{sub['E1'].min():.0f} - {sub['E1'].max():.0f}", 'E1_Var%': (sub['E1'].max() - sub['E1'].min()) / base_result['E1'] * 100, 'E2_Range': f"{sub['E2'].min():.0f} - {sub['E2'].max():.0f}", 'E2_Var%': (sub['E2'].max() - sub['E2'].min()) / base_result['E2'] * 100, 'R1_Range': f"{sub['R1'].min():.4f} - {sub['R1'].max():.4f}" }) pd.DataFrame(summary).to_excel(writer, sheet_name='summary', index=False) print(f"数据已保存至: {OUTPUT_XLSX}") print("完成!")