273 lines
8.6 KiB
Python
273 lines
8.6 KiB
Python
"""
|
||
Step 08: 敏感性分析
|
||
|
||
输入: 01_clean.xlsx
|
||
输出: 08_sensitivity.xlsx
|
||
|
||
功能:
|
||
1. 参数扫描: C (有效容量), p_trunc阈值, C_bar (质量阈值)
|
||
2. 计算不同参数下的指标 (E1, E2, F1, F2)
|
||
3. 生成Pareto前沿分析
|
||
4. 识别稳健的参数区间
|
||
"""
|
||
|
||
import pandas as pd
|
||
import numpy as np
|
||
from scipy.stats import norm
|
||
from pathlib import Path
|
||
from itertools import product
|
||
|
||
# 路径配置
|
||
CLEAN_PATH = Path(__file__).parent / "01_clean.xlsx"
|
||
OUTPUT_PATH = Path(__file__).parent / "08_sensitivity.xlsx"
|
||
|
||
# 基准参数
|
||
BASE_C = 400
|
||
BASE_P_THRESH = 0.02
|
||
BASE_C_BAR = 250
|
||
N_TOTAL = 730
|
||
|
||
# 敏感性扫描范围
|
||
C_RANGE = [350, 375, 400, 425, 450]
|
||
P_THRESH_RANGE = [0.01, 0.02, 0.05, 0.10]
|
||
C_BAR_RANGE = [200, 225, 250, 275, 300]
|
||
|
||
|
||
def truncation_correction(mu, sigma, C, p_thresh):
|
||
"""截断回归修正"""
|
||
if sigma <= 0 or np.isnan(sigma):
|
||
return mu, 0.0, False
|
||
|
||
z = (C - mu) / sigma
|
||
p_trunc = 1 - norm.cdf(z)
|
||
|
||
if p_trunc < p_thresh:
|
||
return mu, p_trunc, False
|
||
else:
|
||
mu_tilde = mu * (1 + 0.4 * p_trunc)
|
||
return mu_tilde, p_trunc, True
|
||
|
||
|
||
def hamilton_allocation(total, weights):
|
||
"""Hamilton最大余数法"""
|
||
n = len(weights)
|
||
w_sum = sum(weights)
|
||
quotas = [total * w / w_sum for w in weights]
|
||
floors = [int(q) for q in quotas]
|
||
remainders = [q - f for q, f in zip(quotas, floors)]
|
||
leftover = total - sum(floors)
|
||
indices = sorted(range(n), key=lambda i: -remainders[i])
|
||
for i in indices[:leftover]:
|
||
floors[i] += 1
|
||
return floors
|
||
|
||
|
||
def gini_coefficient(values):
|
||
"""计算基尼系数"""
|
||
values = np.array(values)
|
||
n = len(values)
|
||
if n == 0 or values.sum() == 0:
|
||
return 0.0
|
||
sorted_values = np.sort(values)
|
||
cumsum = np.cumsum(sorted_values)
|
||
return (2 * np.sum((np.arange(1, n + 1) * sorted_values)) - (n + 1) * cumsum[-1]) / (n * cumsum[-1])
|
||
|
||
|
||
def quality_discount(mu, c_bar):
|
||
"""质量折扣因子"""
|
||
return min(1.0, c_bar / mu) if mu > 0 else 1.0
|
||
|
||
|
||
def run_model(df, C, p_thresh, c_bar):
|
||
"""运行完整模型并返回指标"""
|
||
n_sites = len(df)
|
||
|
||
# 1. 截断修正
|
||
mu_tilde_list = []
|
||
n_corrected = 0
|
||
for _, row in df.iterrows():
|
||
mu_tilde, p_trunc, is_corrected = truncation_correction(row['mu'], row['sigma'], C, p_thresh)
|
||
mu_tilde_list.append(mu_tilde)
|
||
if is_corrected:
|
||
n_corrected += 1
|
||
|
||
# 2. 频次分配
|
||
k_extra = hamilton_allocation(N_TOTAL - n_sites, mu_tilde_list)
|
||
k_list = [1 + ke for ke in k_extra]
|
||
|
||
# 3. 计算指标
|
||
mu_arr = df['mu'].values
|
||
mu_tilde_arr = np.array(mu_tilde_list)
|
||
k_arr = np.array(k_list)
|
||
|
||
# E1: 总服务量
|
||
E1 = np.sum(k_arr * mu_arr)
|
||
|
||
# E2: 质量加权服务量
|
||
q_arr = np.array([quality_discount(mu, c_bar) for mu in mu_arr])
|
||
E2 = np.sum(k_arr * mu_arr * q_arr)
|
||
|
||
# 满足率
|
||
r_arr = k_arr * mu_arr / mu_tilde_arr
|
||
|
||
# F1: Gini系数
|
||
F1 = gini_coefficient(r_arr)
|
||
|
||
# F2: 最小满足率
|
||
F2 = np.min(r_arr)
|
||
|
||
# F3: 满足率CV
|
||
F3 = np.std(r_arr) / np.mean(r_arr)
|
||
|
||
return {
|
||
'C': C,
|
||
'p_thresh': p_thresh,
|
||
'c_bar': c_bar,
|
||
'n_corrected': n_corrected,
|
||
'E1': E1,
|
||
'E2': E2,
|
||
'F1_gini': F1,
|
||
'F2_min_r': F2,
|
||
'F3_cv_r': F3,
|
||
'k_min': min(k_list),
|
||
'k_max': max(k_list)
|
||
}
|
||
|
||
|
||
def main():
|
||
print("=" * 60)
|
||
print("Step 08: 敏感性分析")
|
||
print("=" * 60)
|
||
|
||
# 1. 读取数据
|
||
print(f"\n[1] 读取输入数据: {CLEAN_PATH}")
|
||
df = pd.read_excel(CLEAN_PATH)
|
||
print(f" 读取 {len(df)} 站点数据")
|
||
|
||
# 2. 基准模型
|
||
print(f"\n[2] 基准模型 (C={BASE_C}, p_thresh={BASE_P_THRESH}, c_bar={BASE_C_BAR})")
|
||
base_result = run_model(df, BASE_C, BASE_P_THRESH, BASE_C_BAR)
|
||
print(f" E1 = {base_result['E1']:.1f}")
|
||
print(f" E2 = {base_result['E2']:.1f}")
|
||
print(f" F1 (Gini) = {base_result['F1_gini']:.4f}")
|
||
print(f" F2 (min r) = {base_result['F2_min_r']:.2f}")
|
||
print(f" 修正站点数 = {base_result['n_corrected']}")
|
||
|
||
# 3. 单参数敏感性: C
|
||
print(f"\n[3] 单参数敏感性分析: C (有效容量)")
|
||
print(f" {'C':<8} {'n_corr':<8} {'E1':<12} {'E2':<12} {'F1':<10} {'F2':<10}")
|
||
print(f" {'-'*60}")
|
||
|
||
results_C = []
|
||
for C in C_RANGE:
|
||
result = run_model(df, C, BASE_P_THRESH, BASE_C_BAR)
|
||
results_C.append(result)
|
||
marker = " *" if C == BASE_C else ""
|
||
print(f" {C:<8} {result['n_corrected']:<8} {result['E1']:<12.1f} {result['E2']:<12.1f} {result['F1_gini']:<10.4f} {result['F2_min_r']:<10.2f}{marker}")
|
||
|
||
# 4. 单参数敏感性: p_thresh
|
||
print(f"\n[4] 单参数敏感性分析: p_thresh (截断阈值)")
|
||
print(f" {'p_thresh':<10} {'n_corr':<8} {'E1':<12} {'E2':<12} {'F1':<10} {'F2':<10}")
|
||
print(f" {'-'*62}")
|
||
|
||
results_p = []
|
||
for p_thresh in P_THRESH_RANGE:
|
||
result = run_model(df, BASE_C, p_thresh, BASE_C_BAR)
|
||
results_p.append(result)
|
||
marker = " *" if p_thresh == BASE_P_THRESH else ""
|
||
print(f" {p_thresh:<10} {result['n_corrected']:<8} {result['E1']:<12.1f} {result['E2']:<12.1f} {result['F1_gini']:<10.4f} {result['F2_min_r']:<10.2f}{marker}")
|
||
|
||
# 5. 单参数敏感性: c_bar
|
||
print(f"\n[5] 单参数敏感性分析: c_bar (质量阈值)")
|
||
print(f" {'c_bar':<8} {'E1':<12} {'E2':<12} {'F1':<10} {'F2':<10}")
|
||
print(f" {'-'*54}")
|
||
|
||
results_cbar = []
|
||
for c_bar in C_BAR_RANGE:
|
||
result = run_model(df, BASE_C, BASE_P_THRESH, c_bar)
|
||
results_cbar.append(result)
|
||
marker = " *" if c_bar == BASE_C_BAR else ""
|
||
print(f" {c_bar:<8} {result['E1']:<12.1f} {result['E2']:<12.1f} {result['F1_gini']:<10.4f} {result['F2_min_r']:<10.2f}{marker}")
|
||
|
||
# 6. 多参数组合扫描 (C × p_thresh)
|
||
print(f"\n[6] 多参数组合扫描 (C × p_thresh)")
|
||
|
||
results_combo = []
|
||
for C, p_thresh in product(C_RANGE, P_THRESH_RANGE):
|
||
result = run_model(df, C, p_thresh, BASE_C_BAR)
|
||
results_combo.append(result)
|
||
|
||
# 找出Pareto最优点
|
||
df_combo = pd.DataFrame(results_combo)
|
||
|
||
print(f" 共 {len(results_combo)} 种组合")
|
||
print(f" E1 范围: [{df_combo['E1'].min():.1f}, {df_combo['E1'].max():.1f}]")
|
||
print(f" F1 范围: [{df_combo['F1_gini'].min():.4f}, {df_combo['F1_gini'].max():.4f}]")
|
||
|
||
# 7. 稳健性分析
|
||
print(f"\n[7] 稳健性分析")
|
||
|
||
# 计算指标相对于基准的变化率
|
||
E1_base = base_result['E1']
|
||
E2_base = base_result['E2']
|
||
F1_base = base_result['F1_gini']
|
||
|
||
# C的影响
|
||
E1_range_C = max(r['E1'] for r in results_C) - min(r['E1'] for r in results_C)
|
||
E1_pct_C = E1_range_C / E1_base * 100
|
||
|
||
# p_thresh的影响
|
||
E1_range_p = max(r['E1'] for r in results_p) - min(r['E1'] for r in results_p)
|
||
E1_pct_p = E1_range_p / E1_base * 100
|
||
|
||
print(f" C 变化 [{min(C_RANGE)}, {max(C_RANGE)}] 时:")
|
||
print(f" - E1 变化范围: {E1_range_C:.1f} ({E1_pct_C:.2f}%)")
|
||
|
||
print(f" p_thresh 变化 [{min(P_THRESH_RANGE)}, {max(P_THRESH_RANGE)}] 时:")
|
||
print(f" - E1 变化范围: {E1_range_p:.1f} ({E1_pct_p:.2f}%)")
|
||
|
||
if E1_pct_C < 5 and E1_pct_p < 5:
|
||
print(f" 结论: 模型对参数变化不敏感,结果稳健")
|
||
else:
|
||
print(f" 结论: 模型对某些参数敏感,需谨慎选择")
|
||
|
||
# 8. 保存输出
|
||
print(f"\n[8] 保存输出: {OUTPUT_PATH}")
|
||
|
||
with pd.ExcelWriter(OUTPUT_PATH, engine='openpyxl') as writer:
|
||
# Sheet 1: C敏感性
|
||
pd.DataFrame(results_C).to_excel(writer, sheet_name='sensitivity_C', index=False)
|
||
|
||
# Sheet 2: p_thresh敏感性
|
||
pd.DataFrame(results_p).to_excel(writer, sheet_name='sensitivity_p_thresh', index=False)
|
||
|
||
# Sheet 3: c_bar敏感性
|
||
pd.DataFrame(results_cbar).to_excel(writer, sheet_name='sensitivity_c_bar', index=False)
|
||
|
||
# Sheet 4: 组合扫描
|
||
df_combo.to_excel(writer, sheet_name='combo_scan', index=False)
|
||
|
||
# Sheet 5: 基准结果
|
||
pd.DataFrame([base_result]).to_excel(writer, sheet_name='baseline', index=False)
|
||
|
||
# Sheet 6: 稳健性汇总
|
||
robustness = pd.DataFrame([
|
||
{'parameter': 'C', 'range': f'[{min(C_RANGE)}, {max(C_RANGE)}]',
|
||
'E1_variation': E1_range_C, 'E1_variation_pct': E1_pct_C},
|
||
{'parameter': 'p_thresh', 'range': f'[{min(P_THRESH_RANGE)}, {max(P_THRESH_RANGE)}]',
|
||
'E1_variation': E1_range_p, 'E1_variation_pct': E1_pct_p},
|
||
])
|
||
robustness.to_excel(writer, sheet_name='robustness', index=False)
|
||
|
||
print(f" 已保存6个工作表")
|
||
|
||
print("\n" + "=" * 60)
|
||
print("Step 08 完成")
|
||
print("=" * 60)
|
||
|
||
return df_combo
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|