Files
mcm-mfp/task3/07_sensitivity.py
2026-01-19 12:59:03 +08:00

401 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Task 3 - Step 7: 敏感性分析
============================
分析以下参数对模型输出的影响:
1. 合并比例 r_merge: [1/3, 1/2, 2/3]
2. 距离阈值 l_max: [30, 40, 50, 60, 70]
3. 容量上限 μ_sum_max: [400, 425, 450, 475, 500]
4. CV阈值: [0.3, 0.4, 0.5, 0.6]
输出: 07_sensitivity.xlsx (各参数对E1', E2', F1', R1的影响)
"""
import pandas as pd
import numpy as np
from scipy import stats
import warnings
warnings.filterwarnings('ignore')
# ============================================
# 基础参数和函数
# ============================================
Q = 400
QUALITY_THRESHOLD = 250
SHORTFALL_THRESHOLD = 0.8
def quality_factor(mu_total):
return min(1.0, QUALITY_THRESHOLD / mu_total) if mu_total > 0 else 1.0
def expected_service(q, mu, sigma):
if sigma == 0:
return min(mu, q)
z = (q - mu) / sigma
return mu * stats.norm.cdf(z) - sigma * stats.norm.pdf(z) + q * (1 - stats.norm.cdf(z))
def gini_coefficient(values):
values = np.array(values)
values = values[~np.isnan(values)]
if len(values) == 0:
return 0
values = np.sort(values)
n = len(values)
cumsum = np.cumsum(values)
return (2 * np.sum((np.arange(1, n + 1) * values)) - (n + 1) * cumsum[-1]) / (n * cumsum[-1]) if cumsum[-1] > 0 else 0
def shortfall_probability(q, mu, sigma, threshold=0.8):
if sigma == 0:
return 0 if q >= mu * threshold else 1
critical_demand = q / threshold
return 1 - stats.norm.cdf((critical_demand - mu) / sigma)
def optimal_allocation(mu_i, sigma_i, mu_j, sigma_j, Q=400):
if sigma_i + sigma_j == 0:
return mu_i
return (sigma_j * mu_i + sigma_i * (Q - mu_j)) / (sigma_i + sigma_j)
def calc_distance(lat1, lon1, lat2, lon2):
lat_avg = (lat1 + lat2) / 2
lat_avg_rad = np.radians(lat_avg)
delta_lat = lat1 - lat2
delta_lon = lon1 - lon2
return 69.0 * np.sqrt(delta_lat**2 + (np.cos(lat_avg_rad) * delta_lon)**2)
# ============================================
# 完整流水线函数
# ============================================
def run_pipeline(sites_df, l_max=50, mu_sum_max=450, cv_max=0.5, merge_ratio=0.5):
"""
运行完整的Task 3流水线返回评估指标
"""
sites = sites_df.copy()
sites['cv'] = sites['sigma'] / sites['mu']
n = len(sites)
# Step 1: 计算距离矩阵
distance_matrix = np.zeros((n, n))
for i in range(n):
for j in range(n):
if i != j:
distance_matrix[i, j] = calc_distance(
sites.iloc[i]['lat'], sites.iloc[i]['lon'],
sites.iloc[j]['lat'], sites.iloc[j]['lon']
)
# Step 2: 配对筛选
candidates = []
for i in range(n):
for j in range(i + 1, n):
site_i = sites.iloc[i]
site_j = sites.iloc[j]
dist = distance_matrix[i, j]
if dist > l_max:
continue
mu_sum = site_i['mu'] + site_j['mu']
if mu_sum > mu_sum_max:
continue
if site_i['cv'] > cv_max or site_j['cv'] > cv_max:
continue
sigma_sq_sum = site_i['sigma']**2 + site_j['sigma']**2
value = (1.0 * mu_sum / Q - 0.3 * dist / l_max - 0.5 * sigma_sq_sum / mu_sum**2)
candidates.append({
'idx_i': i, 'idx_j': j,
'site_i_id': site_i['site_id'], 'site_j_id': site_j['site_id'],
'distance': dist,
'mu_i': site_i['mu'], 'mu_j': site_j['mu'],
'sigma_i': site_i['sigma'], 'sigma_j': site_j['sigma'],
'k_i': site_i['k'], 'k_j': site_j['k'],
'mu_tilde_i': site_i['mu_tilde'], 'mu_tilde_j': site_j['mu_tilde'],
'value': value
})
if len(candidates) == 0:
# 无可行配对返回Task 1的指标
E1 = (sites['k'] * sites['mu']).sum()
E2 = sum(sites['k'] * sites['mu'].apply(quality_factor) * sites['mu'])
rates = [row['k'] * row['mu'] / row['mu_tilde'] for _, row in sites.iterrows()]
F1 = gini_coefficient(rates)
F2 = min(rates)
return {
'num_pairs': 0, 'num_dual_visits': 0,
'E1': E1, 'E2': E2, 'F1': F1, 'F2': F2, 'R1': 0, 'RS': 0
}
# 贪心配对选择
df_cand = pd.DataFrame(candidates).sort_values('value', ascending=False)
selected = []
used = set()
for _, row in df_cand.iterrows():
if row['idx_i'] not in used and row['idx_j'] not in used:
selected.append(row.to_dict())
used.add(row['idx_i'])
used.add(row['idx_j'])
# Step 3: 计算最优分配
for pair in selected:
q_star = optimal_allocation(pair['mu_i'], pair['sigma_i'],
pair['mu_j'], pair['sigma_j'], Q)
pair['q_final'] = q_star
pair['E_Si'] = expected_service(q_star, pair['mu_i'], pair['sigma_i'])
pair['E_Sj'] = expected_service(Q - q_star, pair['mu_j'], pair['sigma_j'])
pair['E_total'] = pair['E_Si'] + pair['E_Sj']
# Step 4: 重分配访问次数
sites['k_single'] = sites['k'].copy()
sites['k_dual'] = 0
pair_k = {}
for pair in selected:
k_i, k_j = pair['k_i'], pair['k_j']
k_ij = int(min(k_i, k_j) * merge_ratio)
if k_ij >= min(k_i, k_j):
k_ij = min(k_i, k_j) - 1
if k_ij < 1:
k_ij = 0
idx_i, idx_j = pair['idx_i'], pair['idx_j']
sites.loc[idx_i, 'k_single'] = k_i - k_ij
sites.loc[idx_i, 'k_dual'] = k_ij
sites.loc[idx_j, 'k_single'] = k_j - k_ij
sites.loc[idx_j, 'k_dual'] = k_ij
pair_k[(pair['site_i_id'], pair['site_j_id'])] = k_ij
# 计算释放槽位并重分配
total_single = sites['k_single'].sum()
total_dual = sum(pair_k.values())
delta_N = 730 - (total_single + total_dual)
if delta_N > 0:
total_demand = sites['mu_tilde'].sum()
sites['k_extra'] = (delta_N * sites['mu_tilde'] / total_demand).apply(np.floor).astype(int)
remainder = delta_N - sites['k_extra'].sum()
if remainder > 0:
fractional = delta_N * sites['mu_tilde'] / total_demand - sites['k_extra']
top_idx = fractional.nlargest(int(remainder)).index
sites.loc[top_idx, 'k_extra'] += 1
sites['k_single_final'] = sites['k_single'] + sites['k_extra']
else:
sites['k_single_final'] = sites['k_single']
# Step 5: 计算指标
# E1'
E1 = (sites['k_single_final'] * sites['mu']).sum()
for pair in selected:
k_ij = pair_k.get((pair['site_i_id'], pair['site_j_id']), 0)
E1 += k_ij * pair['E_total']
# E2'
E2 = sum(sites['k_single_final'] * sites['mu'].apply(quality_factor) * sites['mu'])
for pair in selected:
k_ij = pair_k.get((pair['site_i_id'], pair['site_j_id']), 0)
mu_sum = pair['mu_i'] + pair['mu_j']
q_factor = quality_factor(mu_sum)
E2 += k_ij * q_factor * pair['E_total']
# 满足率
site_satisfaction = {}
for idx, row in sites.iterrows():
r = row['k_single_final'] * row['mu'] / row['mu_tilde'] if row['mu_tilde'] > 0 else 0
site_satisfaction[row['site_id']] = r
for pair in selected:
k_ij = pair_k.get((pair['site_i_id'], pair['site_j_id']), 0)
r_i = k_ij * pair['E_Si'] / pair['mu_tilde_i'] if pair['mu_tilde_i'] > 0 else 0
r_j = k_ij * pair['E_Sj'] / pair['mu_tilde_j'] if pair['mu_tilde_j'] > 0 else 0
site_satisfaction[pair['site_i_id']] += r_i
site_satisfaction[pair['site_j_id']] += r_j
rates = list(site_satisfaction.values())
F1 = gini_coefficient(rates)
F2 = min(rates) if rates else 0
# R1: 缺口风险
shortfall_probs = []
for pair in selected:
q = pair['q_final']
p_i = shortfall_probability(q, pair['mu_i'], pair['sigma_i'])
p_j = shortfall_probability(Q - q, pair['mu_j'], pair['sigma_j'])
shortfall_probs.append(1 - (1 - p_i) * (1 - p_j))
R1 = np.mean(shortfall_probs) if shortfall_probs else 0
# RS: 资源节省率
RS = total_dual / 730
return {
'num_pairs': len(selected),
'num_dual_visits': total_dual,
'E1': E1, 'E2': E2, 'F1': F1, 'F2': F2, 'R1': R1, 'RS': RS
}
# ============================================
# 主程序
# ============================================
print("=" * 60)
print("Task 3 - Step 7: 敏感性分析")
print("=" * 60)
# 读取基础数据
sites_df = pd.read_excel('../task1/03_allocate.xlsx')
print(f"\n读取站点数据: {len(sites_df)} 个站点")
# 基准参数
BASE_L_MAX = 50
BASE_MU_SUM_MAX = 450
BASE_CV_MAX = 0.5
BASE_MERGE_RATIO = 0.5
# 计算基准结果
print(f"\n计算基准结果...")
base_result = run_pipeline(sites_df, BASE_L_MAX, BASE_MU_SUM_MAX, BASE_CV_MAX, BASE_MERGE_RATIO)
print(f"基准: E1={base_result['E1']:.0f}, E2={base_result['E2']:.0f}, F1={base_result['F1']:.4f}, R1={base_result['R1']:.4f}")
# ============================================
# 敏感性分析
# ============================================
all_results = []
# 1. 合并比例敏感性
print(f"\n" + "-" * 40)
print("1. 合并比例敏感性 (merge_ratio)")
print("-" * 40)
merge_ratios = [1/3, 0.5, 2/3]
for mr in merge_ratios:
result = run_pipeline(sites_df, BASE_L_MAX, BASE_MU_SUM_MAX, BASE_CV_MAX, mr)
result['param'] = 'merge_ratio'
result['param_value'] = mr
all_results.append(result)
print(f" merge_ratio={mr:.3f}: pairs={result['num_pairs']}, dual={result['num_dual_visits']}, "
f"E1={result['E1']:.0f}, E2={result['E2']:.0f}, F1={result['F1']:.4f}, R1={result['R1']:.4f}")
# 2. 距离阈值敏感性
print(f"\n" + "-" * 40)
print("2. 距离阈值敏感性 (l_max)")
print("-" * 40)
l_max_values = [30, 40, 50, 60, 70]
for lm in l_max_values:
result = run_pipeline(sites_df, lm, BASE_MU_SUM_MAX, BASE_CV_MAX, BASE_MERGE_RATIO)
result['param'] = 'l_max'
result['param_value'] = lm
all_results.append(result)
print(f" l_max={lm}: pairs={result['num_pairs']}, dual={result['num_dual_visits']}, "
f"E1={result['E1']:.0f}, E2={result['E2']:.0f}, F1={result['F1']:.4f}, R1={result['R1']:.4f}")
# 3. 容量上限敏感性
print(f"\n" + "-" * 40)
print("3. 容量上限敏感性 (mu_sum_max)")
print("-" * 40)
mu_sum_values = [400, 425, 450, 475, 500]
for ms in mu_sum_values:
result = run_pipeline(sites_df, BASE_L_MAX, ms, BASE_CV_MAX, BASE_MERGE_RATIO)
result['param'] = 'mu_sum_max'
result['param_value'] = ms
all_results.append(result)
print(f" mu_sum_max={ms}: pairs={result['num_pairs']}, dual={result['num_dual_visits']}, "
f"E1={result['E1']:.0f}, E2={result['E2']:.0f}, F1={result['F1']:.4f}, R1={result['R1']:.4f}")
# 4. CV阈值敏感性
print(f"\n" + "-" * 40)
print("4. CV阈值敏感性 (cv_max)")
print("-" * 40)
cv_max_values = [0.3, 0.4, 0.5, 0.6]
for cv in cv_max_values:
result = run_pipeline(sites_df, BASE_L_MAX, BASE_MU_SUM_MAX, cv, BASE_MERGE_RATIO)
result['param'] = 'cv_max'
result['param_value'] = cv
all_results.append(result)
print(f" cv_max={cv}: pairs={result['num_pairs']}, dual={result['num_dual_visits']}, "
f"E1={result['E1']:.0f}, E2={result['E2']:.0f}, F1={result['F1']:.4f}, R1={result['R1']:.4f}")
# ============================================
# 汇总分析
# ============================================
df_results = pd.DataFrame(all_results)
print(f"\n" + "=" * 60)
print("敏感性分析汇总")
print("=" * 60)
# 计算各参数的影响范围
for param in ['merge_ratio', 'l_max', 'mu_sum_max', 'cv_max']:
subset = df_results[df_results['param'] == param]
print(f"\n{param}:")
print(f" E1 变化范围: [{subset['E1'].min():.0f}, {subset['E1'].max():.0f}], "
f"变化幅度: {(subset['E1'].max() - subset['E1'].min()) / base_result['E1'] * 100:.2f}%")
print(f" E2 变化范围: [{subset['E2'].min():.0f}, {subset['E2'].max():.0f}], "
f"变化幅度: {(subset['E2'].max() - subset['E2'].min()) / base_result['E2'] * 100:.2f}%")
print(f" F1 变化范围: [{subset['F1'].min():.4f}, {subset['F1'].max():.4f}]")
print(f" R1 变化范围: [{subset['R1'].min():.4f}, {subset['R1'].max():.4f}]")
# ============================================
# 保存结果
# ============================================
OUTPUT_FILE = '07_sensitivity.xlsx'
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
# Sheet 1: 所有结果
df_results.to_excel(writer, sheet_name='all_results', index=False)
# Sheet 2: 合并比例敏感性
df_merge = df_results[df_results['param'] == 'merge_ratio'].copy()
df_merge.to_excel(writer, sheet_name='merge_ratio', index=False)
# Sheet 3: 距离阈值敏感性
df_lmax = df_results[df_results['param'] == 'l_max'].copy()
df_lmax.to_excel(writer, sheet_name='l_max', index=False)
# Sheet 4: 容量上限敏感性
df_musum = df_results[df_results['param'] == 'mu_sum_max'].copy()
df_musum.to_excel(writer, sheet_name='mu_sum_max', index=False)
# Sheet 5: CV阈值敏感性
df_cv = df_results[df_results['param'] == 'cv_max'].copy()
df_cv.to_excel(writer, sheet_name='cv_max', index=False)
# Sheet 6: 基准结果
base_df = pd.DataFrame([{
'param': 'baseline',
'l_max': BASE_L_MAX,
'mu_sum_max': BASE_MU_SUM_MAX,
'cv_max': BASE_CV_MAX,
'merge_ratio': BASE_MERGE_RATIO,
**base_result
}])
base_df.to_excel(writer, sheet_name='baseline', index=False)
# Sheet 7: 汇总统计
summary_rows = []
for param in ['merge_ratio', 'l_max', 'mu_sum_max', 'cv_max']:
subset = df_results[df_results['param'] == param]
summary_rows.append({
'param': param,
'E1_min': subset['E1'].min(),
'E1_max': subset['E1'].max(),
'E1_range_pct': (subset['E1'].max() - subset['E1'].min()) / base_result['E1'] * 100,
'E2_min': subset['E2'].min(),
'E2_max': subset['E2'].max(),
'E2_range_pct': (subset['E2'].max() - subset['E2'].min()) / base_result['E2'] * 100,
'F1_min': subset['F1'].min(),
'F1_max': subset['F1'].max(),
'R1_min': subset['R1'].min(),
'R1_max': subset['R1'].max()
})
df_summary = pd.DataFrame(summary_rows)
df_summary.to_excel(writer, sheet_name='summary', index=False)
print(f"\n结果已保存至: {OUTPUT_FILE}")
print(" - Sheet 'all_results': 所有结果")
print(" - Sheet 'merge_ratio': 合并比例敏感性")
print(" - Sheet 'l_max': 距离阈值敏感性")
print(" - Sheet 'mu_sum_max': 容量上限敏感性")
print(" - Sheet 'cv_max': CV阈值敏感性")
print(" - Sheet 'baseline': 基准结果")
print(" - Sheet 'summary': 汇总统计")
print("\n" + "=" * 60)