This commit is contained in:
2026-01-19 10:39:37 +08:00
parent eae9dabf0e
commit 03d353a86a
15 changed files with 1263 additions and 76 deletions

272
task1/08_sensitivity.py Normal file
View File

@@ -0,0 +1,272 @@
"""
Step 08: 敏感性分析
输入: 01_clean.xlsx
输出: 08_sensitivity.xlsx
功能:
1. 参数扫描: C (有效容量), p_trunc阈值, C_bar (质量阈值)
2. 计算不同参数下的指标 (E1, E2, F1, F2)
3. 生成Pareto前沿分析
4. 识别稳健的参数区间
"""
import pandas as pd
import numpy as np
from scipy.stats import norm
from pathlib import Path
from itertools import product
# 路径配置
CLEAN_PATH = Path(__file__).parent / "01_clean.xlsx"
OUTPUT_PATH = Path(__file__).parent / "08_sensitivity.xlsx"
# 基准参数
BASE_C = 400
BASE_P_THRESH = 0.02
BASE_C_BAR = 250
N_TOTAL = 730
# 敏感性扫描范围
C_RANGE = [350, 375, 400, 425, 450]
P_THRESH_RANGE = [0.01, 0.02, 0.05, 0.10]
C_BAR_RANGE = [200, 225, 250, 275, 300]
def truncation_correction(mu, sigma, C, p_thresh):
"""截断回归修正"""
if sigma <= 0 or np.isnan(sigma):
return mu, 0.0, False
z = (C - mu) / sigma
p_trunc = 1 - norm.cdf(z)
if p_trunc < p_thresh:
return mu, p_trunc, False
else:
mu_tilde = mu * (1 + 0.4 * p_trunc)
return mu_tilde, p_trunc, True
def hamilton_allocation(total, weights):
"""Hamilton最大余数法"""
n = len(weights)
w_sum = sum(weights)
quotas = [total * w / w_sum for w in weights]
floors = [int(q) for q in quotas]
remainders = [q - f for q, f in zip(quotas, floors)]
leftover = total - sum(floors)
indices = sorted(range(n), key=lambda i: -remainders[i])
for i in indices[:leftover]:
floors[i] += 1
return floors
def gini_coefficient(values):
"""计算基尼系数"""
values = np.array(values)
n = len(values)
if n == 0 or values.sum() == 0:
return 0.0
sorted_values = np.sort(values)
cumsum = np.cumsum(sorted_values)
return (2 * np.sum((np.arange(1, n + 1) * sorted_values)) - (n + 1) * cumsum[-1]) / (n * cumsum[-1])
def quality_discount(mu, c_bar):
"""质量折扣因子"""
return min(1.0, c_bar / mu) if mu > 0 else 1.0
def run_model(df, C, p_thresh, c_bar):
"""运行完整模型并返回指标"""
n_sites = len(df)
# 1. 截断修正
mu_tilde_list = []
n_corrected = 0
for _, row in df.iterrows():
mu_tilde, p_trunc, is_corrected = truncation_correction(row['mu'], row['sigma'], C, p_thresh)
mu_tilde_list.append(mu_tilde)
if is_corrected:
n_corrected += 1
# 2. 频次分配
k_extra = hamilton_allocation(N_TOTAL - n_sites, mu_tilde_list)
k_list = [1 + ke for ke in k_extra]
# 3. 计算指标
mu_arr = df['mu'].values
mu_tilde_arr = np.array(mu_tilde_list)
k_arr = np.array(k_list)
# E1: 总服务量
E1 = np.sum(k_arr * mu_arr)
# E2: 质量加权服务量
q_arr = np.array([quality_discount(mu, c_bar) for mu in mu_arr])
E2 = np.sum(k_arr * mu_arr * q_arr)
# 满足率
r_arr = k_arr * mu_arr / mu_tilde_arr
# F1: Gini系数
F1 = gini_coefficient(r_arr)
# F2: 最小满足率
F2 = np.min(r_arr)
# F3: 满足率CV
F3 = np.std(r_arr) / np.mean(r_arr)
return {
'C': C,
'p_thresh': p_thresh,
'c_bar': c_bar,
'n_corrected': n_corrected,
'E1': E1,
'E2': E2,
'F1_gini': F1,
'F2_min_r': F2,
'F3_cv_r': F3,
'k_min': min(k_list),
'k_max': max(k_list)
}
def main():
print("=" * 60)
print("Step 08: 敏感性分析")
print("=" * 60)
# 1. 读取数据
print(f"\n[1] 读取输入数据: {CLEAN_PATH}")
df = pd.read_excel(CLEAN_PATH)
print(f" 读取 {len(df)} 站点数据")
# 2. 基准模型
print(f"\n[2] 基准模型 (C={BASE_C}, p_thresh={BASE_P_THRESH}, c_bar={BASE_C_BAR})")
base_result = run_model(df, BASE_C, BASE_P_THRESH, BASE_C_BAR)
print(f" E1 = {base_result['E1']:.1f}")
print(f" E2 = {base_result['E2']:.1f}")
print(f" F1 (Gini) = {base_result['F1_gini']:.4f}")
print(f" F2 (min r) = {base_result['F2_min_r']:.2f}")
print(f" 修正站点数 = {base_result['n_corrected']}")
# 3. 单参数敏感性: C
print(f"\n[3] 单参数敏感性分析: C (有效容量)")
print(f" {'C':<8} {'n_corr':<8} {'E1':<12} {'E2':<12} {'F1':<10} {'F2':<10}")
print(f" {'-'*60}")
results_C = []
for C in C_RANGE:
result = run_model(df, C, BASE_P_THRESH, BASE_C_BAR)
results_C.append(result)
marker = " *" if C == BASE_C else ""
print(f" {C:<8} {result['n_corrected']:<8} {result['E1']:<12.1f} {result['E2']:<12.1f} {result['F1_gini']:<10.4f} {result['F2_min_r']:<10.2f}{marker}")
# 4. 单参数敏感性: p_thresh
print(f"\n[4] 单参数敏感性分析: p_thresh (截断阈值)")
print(f" {'p_thresh':<10} {'n_corr':<8} {'E1':<12} {'E2':<12} {'F1':<10} {'F2':<10}")
print(f" {'-'*62}")
results_p = []
for p_thresh in P_THRESH_RANGE:
result = run_model(df, BASE_C, p_thresh, BASE_C_BAR)
results_p.append(result)
marker = " *" if p_thresh == BASE_P_THRESH else ""
print(f" {p_thresh:<10} {result['n_corrected']:<8} {result['E1']:<12.1f} {result['E2']:<12.1f} {result['F1_gini']:<10.4f} {result['F2_min_r']:<10.2f}{marker}")
# 5. 单参数敏感性: c_bar
print(f"\n[5] 单参数敏感性分析: c_bar (质量阈值)")
print(f" {'c_bar':<8} {'E1':<12} {'E2':<12} {'F1':<10} {'F2':<10}")
print(f" {'-'*54}")
results_cbar = []
for c_bar in C_BAR_RANGE:
result = run_model(df, BASE_C, BASE_P_THRESH, c_bar)
results_cbar.append(result)
marker = " *" if c_bar == BASE_C_BAR else ""
print(f" {c_bar:<8} {result['E1']:<12.1f} {result['E2']:<12.1f} {result['F1_gini']:<10.4f} {result['F2_min_r']:<10.2f}{marker}")
# 6. 多参数组合扫描 (C × p_thresh)
print(f"\n[6] 多参数组合扫描 (C × p_thresh)")
results_combo = []
for C, p_thresh in product(C_RANGE, P_THRESH_RANGE):
result = run_model(df, C, p_thresh, BASE_C_BAR)
results_combo.append(result)
# 找出Pareto最优点
df_combo = pd.DataFrame(results_combo)
print(f"{len(results_combo)} 种组合")
print(f" E1 范围: [{df_combo['E1'].min():.1f}, {df_combo['E1'].max():.1f}]")
print(f" F1 范围: [{df_combo['F1_gini'].min():.4f}, {df_combo['F1_gini'].max():.4f}]")
# 7. 稳健性分析
print(f"\n[7] 稳健性分析")
# 计算指标相对于基准的变化率
E1_base = base_result['E1']
E2_base = base_result['E2']
F1_base = base_result['F1_gini']
# C的影响
E1_range_C = max(r['E1'] for r in results_C) - min(r['E1'] for r in results_C)
E1_pct_C = E1_range_C / E1_base * 100
# p_thresh的影响
E1_range_p = max(r['E1'] for r in results_p) - min(r['E1'] for r in results_p)
E1_pct_p = E1_range_p / E1_base * 100
print(f" C 变化 [{min(C_RANGE)}, {max(C_RANGE)}] 时:")
print(f" - E1 变化范围: {E1_range_C:.1f} ({E1_pct_C:.2f}%)")
print(f" p_thresh 变化 [{min(P_THRESH_RANGE)}, {max(P_THRESH_RANGE)}] 时:")
print(f" - E1 变化范围: {E1_range_p:.1f} ({E1_pct_p:.2f}%)")
if E1_pct_C < 5 and E1_pct_p < 5:
print(f" 结论: 模型对参数变化不敏感,结果稳健")
else:
print(f" 结论: 模型对某些参数敏感,需谨慎选择")
# 8. 保存输出
print(f"\n[8] 保存输出: {OUTPUT_PATH}")
with pd.ExcelWriter(OUTPUT_PATH, engine='openpyxl') as writer:
# Sheet 1: C敏感性
pd.DataFrame(results_C).to_excel(writer, sheet_name='sensitivity_C', index=False)
# Sheet 2: p_thresh敏感性
pd.DataFrame(results_p).to_excel(writer, sheet_name='sensitivity_p_thresh', index=False)
# Sheet 3: c_bar敏感性
pd.DataFrame(results_cbar).to_excel(writer, sheet_name='sensitivity_c_bar', index=False)
# Sheet 4: 组合扫描
df_combo.to_excel(writer, sheet_name='combo_scan', index=False)
# Sheet 5: 基准结果
pd.DataFrame([base_result]).to_excel(writer, sheet_name='baseline', index=False)
# Sheet 6: 稳健性汇总
robustness = pd.DataFrame([
{'parameter': 'C', 'range': f'[{min(C_RANGE)}, {max(C_RANGE)}]',
'E1_variation': E1_range_C, 'E1_variation_pct': E1_pct_C},
{'parameter': 'p_thresh', 'range': f'[{min(P_THRESH_RANGE)}, {max(P_THRESH_RANGE)}]',
'E1_variation': E1_range_p, 'E1_variation_pct': E1_pct_p},
])
robustness.to_excel(writer, sheet_name='robustness', index=False)
print(f" 已保存6个工作表")
print("\n" + "=" * 60)
print("Step 08 完成")
print("=" * 60)
return df_combo
if __name__ == "__main__":
main()