2026-01-19 10:39:37 +08:00
|
|
|
|
"""
|
|
|
|
|
|
Step 08: 敏感性分析
|
|
|
|
|
|
|
|
|
|
|
|
输入: 01_clean.xlsx
|
|
|
|
|
|
输出: 08_sensitivity.xlsx
|
|
|
|
|
|
|
|
|
|
|
|
功能:
|
|
|
|
|
|
1. 参数扫描: C (有效容量), p_trunc阈值, C_bar (质量阈值)
|
|
|
|
|
|
2. 计算不同参数下的指标 (E1, E2, F1, F2)
|
|
|
|
|
|
3. 生成Pareto前沿分析
|
|
|
|
|
|
4. 识别稳健的参数区间
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
import pandas as pd
|
|
|
|
|
|
import numpy as np
|
|
|
|
|
|
from scipy.stats import norm
|
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
|
from itertools import product
|
|
|
|
|
|
|
|
|
|
|
|
# 路径配置
|
|
|
|
|
|
CLEAN_PATH = Path(__file__).parent / "01_clean.xlsx"
|
|
|
|
|
|
OUTPUT_PATH = Path(__file__).parent / "08_sensitivity.xlsx"
|
|
|
|
|
|
|
|
|
|
|
|
# 基准参数
|
2026-01-19 19:43:57 +08:00
|
|
|
|
BASE_C = 350
|
|
|
|
|
|
BASE_P_THRESH = 0.10
|
2026-01-19 10:39:37 +08:00
|
|
|
|
BASE_C_BAR = 250
|
|
|
|
|
|
N_TOTAL = 730
|
|
|
|
|
|
|
|
|
|
|
|
# 敏感性扫描范围
|
|
|
|
|
|
C_RANGE = [350, 375, 400, 425, 450]
|
|
|
|
|
|
P_THRESH_RANGE = [0.01, 0.02, 0.05, 0.10]
|
|
|
|
|
|
C_BAR_RANGE = [200, 225, 250, 275, 300]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def truncation_correction(mu, sigma, C, p_thresh):
|
|
|
|
|
|
"""截断回归修正"""
|
|
|
|
|
|
if sigma <= 0 or np.isnan(sigma):
|
|
|
|
|
|
return mu, 0.0, False
|
|
|
|
|
|
|
|
|
|
|
|
z = (C - mu) / sigma
|
|
|
|
|
|
p_trunc = 1 - norm.cdf(z)
|
|
|
|
|
|
|
|
|
|
|
|
if p_trunc < p_thresh:
|
|
|
|
|
|
return mu, p_trunc, False
|
|
|
|
|
|
else:
|
|
|
|
|
|
mu_tilde = mu * (1 + 0.4 * p_trunc)
|
|
|
|
|
|
return mu_tilde, p_trunc, True
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def hamilton_allocation(total, weights):
|
|
|
|
|
|
"""Hamilton最大余数法"""
|
|
|
|
|
|
n = len(weights)
|
|
|
|
|
|
w_sum = sum(weights)
|
|
|
|
|
|
quotas = [total * w / w_sum for w in weights]
|
|
|
|
|
|
floors = [int(q) for q in quotas]
|
|
|
|
|
|
remainders = [q - f for q, f in zip(quotas, floors)]
|
|
|
|
|
|
leftover = total - sum(floors)
|
|
|
|
|
|
indices = sorted(range(n), key=lambda i: -remainders[i])
|
|
|
|
|
|
for i in indices[:leftover]:
|
|
|
|
|
|
floors[i] += 1
|
|
|
|
|
|
return floors
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def gini_coefficient(values):
|
|
|
|
|
|
"""计算基尼系数"""
|
|
|
|
|
|
values = np.array(values)
|
|
|
|
|
|
n = len(values)
|
|
|
|
|
|
if n == 0 or values.sum() == 0:
|
|
|
|
|
|
return 0.0
|
|
|
|
|
|
sorted_values = np.sort(values)
|
|
|
|
|
|
cumsum = np.cumsum(sorted_values)
|
|
|
|
|
|
return (2 * np.sum((np.arange(1, n + 1) * sorted_values)) - (n + 1) * cumsum[-1]) / (n * cumsum[-1])
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def quality_discount(mu, c_bar):
|
|
|
|
|
|
"""质量折扣因子"""
|
|
|
|
|
|
return min(1.0, c_bar / mu) if mu > 0 else 1.0
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run_model(df, C, p_thresh, c_bar):
|
|
|
|
|
|
"""运行完整模型并返回指标"""
|
|
|
|
|
|
n_sites = len(df)
|
|
|
|
|
|
|
|
|
|
|
|
# 1. 截断修正
|
|
|
|
|
|
mu_tilde_list = []
|
|
|
|
|
|
n_corrected = 0
|
|
|
|
|
|
for _, row in df.iterrows():
|
|
|
|
|
|
mu_tilde, p_trunc, is_corrected = truncation_correction(row['mu'], row['sigma'], C, p_thresh)
|
|
|
|
|
|
mu_tilde_list.append(mu_tilde)
|
|
|
|
|
|
if is_corrected:
|
|
|
|
|
|
n_corrected += 1
|
|
|
|
|
|
|
|
|
|
|
|
# 2. 频次分配
|
|
|
|
|
|
k_extra = hamilton_allocation(N_TOTAL - n_sites, mu_tilde_list)
|
|
|
|
|
|
k_list = [1 + ke for ke in k_extra]
|
|
|
|
|
|
|
|
|
|
|
|
# 3. 计算指标
|
|
|
|
|
|
mu_arr = df['mu'].values
|
|
|
|
|
|
mu_tilde_arr = np.array(mu_tilde_list)
|
|
|
|
|
|
k_arr = np.array(k_list)
|
|
|
|
|
|
|
|
|
|
|
|
# E1: 总服务量
|
|
|
|
|
|
E1 = np.sum(k_arr * mu_arr)
|
|
|
|
|
|
|
|
|
|
|
|
# E2: 质量加权服务量
|
|
|
|
|
|
q_arr = np.array([quality_discount(mu, c_bar) for mu in mu_arr])
|
|
|
|
|
|
E2 = np.sum(k_arr * mu_arr * q_arr)
|
|
|
|
|
|
|
|
|
|
|
|
# 满足率
|
|
|
|
|
|
r_arr = k_arr * mu_arr / mu_tilde_arr
|
|
|
|
|
|
|
|
|
|
|
|
# F1: Gini系数
|
|
|
|
|
|
F1 = gini_coefficient(r_arr)
|
|
|
|
|
|
|
|
|
|
|
|
# F2: 最小满足率
|
|
|
|
|
|
F2 = np.min(r_arr)
|
|
|
|
|
|
|
|
|
|
|
|
# F3: 满足率CV
|
|
|
|
|
|
F3 = np.std(r_arr) / np.mean(r_arr)
|
|
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
|
'C': C,
|
|
|
|
|
|
'p_thresh': p_thresh,
|
|
|
|
|
|
'c_bar': c_bar,
|
|
|
|
|
|
'n_corrected': n_corrected,
|
|
|
|
|
|
'E1': E1,
|
|
|
|
|
|
'E2': E2,
|
|
|
|
|
|
'F1_gini': F1,
|
|
|
|
|
|
'F2_min_r': F2,
|
|
|
|
|
|
'F3_cv_r': F3,
|
|
|
|
|
|
'k_min': min(k_list),
|
|
|
|
|
|
'k_max': max(k_list)
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main():
|
|
|
|
|
|
print("=" * 60)
|
|
|
|
|
|
print("Step 08: 敏感性分析")
|
|
|
|
|
|
print("=" * 60)
|
|
|
|
|
|
|
|
|
|
|
|
# 1. 读取数据
|
|
|
|
|
|
print(f"\n[1] 读取输入数据: {CLEAN_PATH}")
|
|
|
|
|
|
df = pd.read_excel(CLEAN_PATH)
|
|
|
|
|
|
print(f" 读取 {len(df)} 站点数据")
|
|
|
|
|
|
|
|
|
|
|
|
# 2. 基准模型
|
|
|
|
|
|
print(f"\n[2] 基准模型 (C={BASE_C}, p_thresh={BASE_P_THRESH}, c_bar={BASE_C_BAR})")
|
|
|
|
|
|
base_result = run_model(df, BASE_C, BASE_P_THRESH, BASE_C_BAR)
|
|
|
|
|
|
print(f" E1 = {base_result['E1']:.1f}")
|
|
|
|
|
|
print(f" E2 = {base_result['E2']:.1f}")
|
|
|
|
|
|
print(f" F1 (Gini) = {base_result['F1_gini']:.4f}")
|
|
|
|
|
|
print(f" F2 (min r) = {base_result['F2_min_r']:.2f}")
|
|
|
|
|
|
print(f" 修正站点数 = {base_result['n_corrected']}")
|
|
|
|
|
|
|
|
|
|
|
|
# 3. 单参数敏感性: C
|
|
|
|
|
|
print(f"\n[3] 单参数敏感性分析: C (有效容量)")
|
|
|
|
|
|
print(f" {'C':<8} {'n_corr':<8} {'E1':<12} {'E2':<12} {'F1':<10} {'F2':<10}")
|
|
|
|
|
|
print(f" {'-'*60}")
|
|
|
|
|
|
|
|
|
|
|
|
results_C = []
|
|
|
|
|
|
for C in C_RANGE:
|
|
|
|
|
|
result = run_model(df, C, BASE_P_THRESH, BASE_C_BAR)
|
|
|
|
|
|
results_C.append(result)
|
|
|
|
|
|
marker = " *" if C == BASE_C else ""
|
|
|
|
|
|
print(f" {C:<8} {result['n_corrected']:<8} {result['E1']:<12.1f} {result['E2']:<12.1f} {result['F1_gini']:<10.4f} {result['F2_min_r']:<10.2f}{marker}")
|
|
|
|
|
|
|
|
|
|
|
|
# 4. 单参数敏感性: p_thresh
|
|
|
|
|
|
print(f"\n[4] 单参数敏感性分析: p_thresh (截断阈值)")
|
|
|
|
|
|
print(f" {'p_thresh':<10} {'n_corr':<8} {'E1':<12} {'E2':<12} {'F1':<10} {'F2':<10}")
|
|
|
|
|
|
print(f" {'-'*62}")
|
|
|
|
|
|
|
|
|
|
|
|
results_p = []
|
|
|
|
|
|
for p_thresh in P_THRESH_RANGE:
|
|
|
|
|
|
result = run_model(df, BASE_C, p_thresh, BASE_C_BAR)
|
|
|
|
|
|
results_p.append(result)
|
|
|
|
|
|
marker = " *" if p_thresh == BASE_P_THRESH else ""
|
|
|
|
|
|
print(f" {p_thresh:<10} {result['n_corrected']:<8} {result['E1']:<12.1f} {result['E2']:<12.1f} {result['F1_gini']:<10.4f} {result['F2_min_r']:<10.2f}{marker}")
|
|
|
|
|
|
|
|
|
|
|
|
# 5. 单参数敏感性: c_bar
|
|
|
|
|
|
print(f"\n[5] 单参数敏感性分析: c_bar (质量阈值)")
|
|
|
|
|
|
print(f" {'c_bar':<8} {'E1':<12} {'E2':<12} {'F1':<10} {'F2':<10}")
|
|
|
|
|
|
print(f" {'-'*54}")
|
|
|
|
|
|
|
|
|
|
|
|
results_cbar = []
|
|
|
|
|
|
for c_bar in C_BAR_RANGE:
|
|
|
|
|
|
result = run_model(df, BASE_C, BASE_P_THRESH, c_bar)
|
|
|
|
|
|
results_cbar.append(result)
|
|
|
|
|
|
marker = " *" if c_bar == BASE_C_BAR else ""
|
|
|
|
|
|
print(f" {c_bar:<8} {result['E1']:<12.1f} {result['E2']:<12.1f} {result['F1_gini']:<10.4f} {result['F2_min_r']:<10.2f}{marker}")
|
|
|
|
|
|
|
|
|
|
|
|
# 6. 多参数组合扫描 (C × p_thresh)
|
|
|
|
|
|
print(f"\n[6] 多参数组合扫描 (C × p_thresh)")
|
|
|
|
|
|
|
|
|
|
|
|
results_combo = []
|
|
|
|
|
|
for C, p_thresh in product(C_RANGE, P_THRESH_RANGE):
|
|
|
|
|
|
result = run_model(df, C, p_thresh, BASE_C_BAR)
|
|
|
|
|
|
results_combo.append(result)
|
|
|
|
|
|
|
|
|
|
|
|
# 找出Pareto最优点
|
|
|
|
|
|
df_combo = pd.DataFrame(results_combo)
|
|
|
|
|
|
|
|
|
|
|
|
print(f" 共 {len(results_combo)} 种组合")
|
|
|
|
|
|
print(f" E1 范围: [{df_combo['E1'].min():.1f}, {df_combo['E1'].max():.1f}]")
|
|
|
|
|
|
print(f" F1 范围: [{df_combo['F1_gini'].min():.4f}, {df_combo['F1_gini'].max():.4f}]")
|
|
|
|
|
|
|
|
|
|
|
|
# 7. 稳健性分析
|
|
|
|
|
|
print(f"\n[7] 稳健性分析")
|
|
|
|
|
|
|
|
|
|
|
|
# 计算指标相对于基准的变化率
|
|
|
|
|
|
E1_base = base_result['E1']
|
|
|
|
|
|
E2_base = base_result['E2']
|
|
|
|
|
|
F1_base = base_result['F1_gini']
|
|
|
|
|
|
|
|
|
|
|
|
# C的影响
|
|
|
|
|
|
E1_range_C = max(r['E1'] for r in results_C) - min(r['E1'] for r in results_C)
|
|
|
|
|
|
E1_pct_C = E1_range_C / E1_base * 100
|
|
|
|
|
|
|
|
|
|
|
|
# p_thresh的影响
|
|
|
|
|
|
E1_range_p = max(r['E1'] for r in results_p) - min(r['E1'] for r in results_p)
|
|
|
|
|
|
E1_pct_p = E1_range_p / E1_base * 100
|
|
|
|
|
|
|
|
|
|
|
|
print(f" C 变化 [{min(C_RANGE)}, {max(C_RANGE)}] 时:")
|
|
|
|
|
|
print(f" - E1 变化范围: {E1_range_C:.1f} ({E1_pct_C:.2f}%)")
|
|
|
|
|
|
|
|
|
|
|
|
print(f" p_thresh 变化 [{min(P_THRESH_RANGE)}, {max(P_THRESH_RANGE)}] 时:")
|
|
|
|
|
|
print(f" - E1 变化范围: {E1_range_p:.1f} ({E1_pct_p:.2f}%)")
|
|
|
|
|
|
|
|
|
|
|
|
if E1_pct_C < 5 and E1_pct_p < 5:
|
|
|
|
|
|
print(f" 结论: 模型对参数变化不敏感,结果稳健")
|
|
|
|
|
|
else:
|
|
|
|
|
|
print(f" 结论: 模型对某些参数敏感,需谨慎选择")
|
|
|
|
|
|
|
|
|
|
|
|
# 8. 保存输出
|
|
|
|
|
|
print(f"\n[8] 保存输出: {OUTPUT_PATH}")
|
|
|
|
|
|
|
|
|
|
|
|
with pd.ExcelWriter(OUTPUT_PATH, engine='openpyxl') as writer:
|
|
|
|
|
|
# Sheet 1: C敏感性
|
|
|
|
|
|
pd.DataFrame(results_C).to_excel(writer, sheet_name='sensitivity_C', index=False)
|
|
|
|
|
|
|
|
|
|
|
|
# Sheet 2: p_thresh敏感性
|
|
|
|
|
|
pd.DataFrame(results_p).to_excel(writer, sheet_name='sensitivity_p_thresh', index=False)
|
|
|
|
|
|
|
|
|
|
|
|
# Sheet 3: c_bar敏感性
|
|
|
|
|
|
pd.DataFrame(results_cbar).to_excel(writer, sheet_name='sensitivity_c_bar', index=False)
|
|
|
|
|
|
|
|
|
|
|
|
# Sheet 4: 组合扫描
|
|
|
|
|
|
df_combo.to_excel(writer, sheet_name='combo_scan', index=False)
|
|
|
|
|
|
|
|
|
|
|
|
# Sheet 5: 基准结果
|
|
|
|
|
|
pd.DataFrame([base_result]).to_excel(writer, sheet_name='baseline', index=False)
|
|
|
|
|
|
|
|
|
|
|
|
# Sheet 6: 稳健性汇总
|
|
|
|
|
|
robustness = pd.DataFrame([
|
|
|
|
|
|
{'parameter': 'C', 'range': f'[{min(C_RANGE)}, {max(C_RANGE)}]',
|
|
|
|
|
|
'E1_variation': E1_range_C, 'E1_variation_pct': E1_pct_C},
|
|
|
|
|
|
{'parameter': 'p_thresh', 'range': f'[{min(P_THRESH_RANGE)}, {max(P_THRESH_RANGE)}]',
|
|
|
|
|
|
'E1_variation': E1_range_p, 'E1_variation_pct': E1_pct_p},
|
|
|
|
|
|
])
|
|
|
|
|
|
robustness.to_excel(writer, sheet_name='robustness', index=False)
|
|
|
|
|
|
|
|
|
|
|
|
print(f" 已保存6个工作表")
|
|
|
|
|
|
|
|
|
|
|
|
print("\n" + "=" * 60)
|
|
|
|
|
|
print("Step 08 完成")
|
|
|
|
|
|
print("=" * 60)
|
|
|
|
|
|
|
|
|
|
|
|
return df_combo
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
|
main()
|