""" Step 04: 评估指标计算 输入: 03_allocate.xlsx 输出: 04_metrics.xlsx 功能: 1. 计算有效性指标 (E1, E2) 2. 计算公平性指标 (F1, F2, F3) 3. 与基线方案对比 4. 输出综合评估报告 指标定义: - E1: 原始总服务量 = Σ k_i × μ_i - E2: 质量加权服务量 = Σ k_i × μ_i × q(μ_i), q(μ) = min(1, 250/μ) - F1: 满足率基尼系数 Gini(r) - F2: 最差站点满足率 min(r) - F3: 满足率变异系数 CV(r) """ import pandas as pd import numpy as np from pathlib import Path # 路径配置 INPUT_PATH = Path(__file__).parent / "03_allocate.xlsx" OUTPUT_PATH = Path(__file__).parent / "04_metrics.xlsx" # 评估参数 C_BAR = 250 # 典型服务量 (质量折扣阈值) def quality_discount(mu: float, c_bar: float = C_BAR) -> float: """ 质量折扣因子: 当 μ > c_bar 时,服务质量打折 q(μ) = min(1, c_bar / μ) """ return min(1.0, c_bar / mu) if mu > 0 else 1.0 def gini_coefficient(values: list) -> float: """ 计算基尼系数 Gini = Σ_i Σ_j |x_i - x_j| / (2n Σ_i x_i) """ values = np.array(values) n = len(values) if n == 0 or values.sum() == 0: return 0.0 # 排序后计算 sorted_values = np.sort(values) cumsum = np.cumsum(sorted_values) return (2 * np.sum((np.arange(1, n + 1) * sorted_values)) - (n + 1) * cumsum[-1]) / (n * cumsum[-1]) def compute_metrics(df: pd.DataFrame, method_name: str) -> dict: """计算单个方案的所有指标""" # 有效性指标 E1 = (df['k'] * df['mu']).sum() E2 = (df['k'] * df['mu'] * df['mu'].apply(quality_discount)).sum() # 满足率 r = df['k'] * df['mu'] / df['mu_tilde'] # 公平性指标 F1 = gini_coefficient(r.tolist()) F2 = r.min() F3 = r.std() / r.mean() return { 'method': method_name, 'E1_total_service': E1, 'E2_quality_weighted': E2, 'F1_gini': F1, 'F2_min_satisfaction': F2, 'F3_cv_satisfaction': F3 } def compute_baseline_uniform(df: pd.DataFrame, n_total: int = 730) -> pd.DataFrame: """基线1: 均匀分配""" df_baseline = df.copy() k_uniform = n_total // len(df) remainder = n_total - k_uniform * len(df) df_baseline['k'] = k_uniform # 将余数分配给前 remainder 个站点 df_baseline.iloc[:remainder, df_baseline.columns.get_loc('k')] += 1 return df_baseline def compute_baseline_2019_scaled(df: pd.DataFrame, n_total: int = 730) -> pd.DataFrame: """基线2: 2019年访问次数等比例缩放""" df_baseline = df.copy() total_2019 = df['visits_2019'].sum() # 按比例缩放 k_float = df['visits_2019'] * n_total / total_2019 k_floor = k_float.astype(int) remainder = n_total - k_floor.sum() # 按余数分配 remainders = k_float - k_floor indices = remainders.nlargest(remainder).index k_floor.loc[indices] += 1 df_baseline['k'] = k_floor return df_baseline def compute_baseline_mu_raw(df: pd.DataFrame, n_total: int = 730) -> pd.DataFrame: """基线3: 按原始μ比例分配 (无截断修正)""" df_baseline = df.copy() # 覆盖约束 k_base = 1 extra = n_total - len(df) * k_base # 按μ比例分配额外次数 weights = df['mu'].tolist() w_sum = sum(weights) quotas = [extra * w / w_sum for w in weights] floors = [int(q) for q in quotas] remainders = [q - f for q, f in zip(quotas, floors)] leftover = extra - sum(floors) indices = sorted(range(len(weights)), key=lambda i: -remainders[i]) for i in indices[:leftover]: floors[i] += 1 df_baseline['k'] = [k_base + f for f in floors] return df_baseline def main(): print("=" * 60) print("Step 04: 评估指标计算") print("=" * 60) # 1. 读取分配结果 print(f"\n[1] 读取输入: {INPUT_PATH}") df = pd.read_excel(INPUT_PATH) print(f" 读取 {len(df)} 条记录") # 2. 计算推荐方案指标 print(f"\n[2] 计算推荐方案指标 (按μ̃比例分配)") metrics_recommended = compute_metrics(df, 'Recommended (μ̃ proportional)') for key, value in metrics_recommended.items(): if key != 'method': print(f" {key}: {value:.4f}") # 3. 计算基线方案指标 print(f"\n[3] 计算基线方案指标") # 基线1: 均匀分配 df_b1 = compute_baseline_uniform(df) metrics_b1 = compute_metrics(df_b1, 'Baseline 1: Uniform') print(f"\n 基线1 (均匀分配, k={df_b1['k'].iloc[0]}~{df_b1['k'].max()}):") for key, value in metrics_b1.items(): if key != 'method': print(f" {key}: {value:.4f}") # 基线2: 2019年缩放 df_b2 = compute_baseline_2019_scaled(df) metrics_b2 = compute_metrics(df_b2, 'Baseline 2: 2019 Scaled') print(f"\n 基线2 (2019年缩放):") for key, value in metrics_b2.items(): if key != 'method': print(f" {key}: {value:.4f}") # 基线3: 按原始μ比例 df_b3 = compute_baseline_mu_raw(df) metrics_b3 = compute_metrics(df_b3, 'Baseline 3: μ proportional (no correction)') print(f"\n 基线3 (按原始μ比例, 无截断修正):") for key, value in metrics_b3.items(): if key != 'method': print(f" {key}: {value:.4f}") # 4. 汇总对比 print(f"\n[4] 方案对比汇总") all_metrics = [metrics_recommended, metrics_b1, metrics_b2, metrics_b3] df_metrics = pd.DataFrame(all_metrics) # 计算相对于推荐方案的变化 for col in ['E1_total_service', 'E2_quality_weighted']: base_val = metrics_recommended[col] df_metrics[f'{col}_pct'] = (df_metrics[col] / base_val - 1) * 100 print("\n " + "-" * 90) print(f" {'方案':<45} {'E1':>12} {'E2':>12} {'F1(Gini)':>10} {'F2(min r)':>10} {'F3(CV)':>10}") print(" " + "-" * 90) for _, row in df_metrics.iterrows(): print(f" {row['method']:<45} {row['E1_total_service']:>12.1f} {row['E2_quality_weighted']:>12.1f} {row['F1_gini']:>10.4f} {row['F2_min_satisfaction']:>10.2f} {row['F3_cv_satisfaction']:>10.4f}") print(" " + "-" * 90) # 5. 推荐方案优势分析 print(f"\n[5] 推荐方案优势分析") print(f" 相对于均匀分配 (基线1):") print(f" - E1 提升: {(metrics_recommended['E1_total_service']/metrics_b1['E1_total_service']-1)*100:+.2f}%") print(f" - E2 提升: {(metrics_recommended['E2_quality_weighted']/metrics_b1['E2_quality_weighted']-1)*100:+.2f}%") print(f"\n 相对于2019缩放 (基线2):") print(f" - E1 变化: {(metrics_recommended['E1_total_service']/metrics_b2['E1_total_service']-1)*100:+.2f}%") print(f" - E2 变化: {(metrics_recommended['E2_quality_weighted']/metrics_b2['E2_quality_weighted']-1)*100:+.2f}%") print(f" - F1(Gini)变化: {(metrics_recommended['F1_gini']-metrics_b2['F1_gini']):+.4f}") # 6. 保存输出 print(f"\n[6] 保存输出: {OUTPUT_PATH}") with pd.ExcelWriter(OUTPUT_PATH, engine='openpyxl') as writer: # Sheet 1: 指标汇总 df_metrics.to_excel(writer, sheet_name='metrics_summary', index=False) # Sheet 2: 站点级详情 (推荐方案) df_detail = df[['site_id', 'site_name', 'mu', 'mu_tilde', 'k', 'annual_service', 'r']].copy() df_detail['quality_factor'] = df['mu'].apply(quality_discount) df_detail['service_quality_weighted'] = df_detail['k'] * df_detail['mu'] * df_detail['quality_factor'] df_detail.to_excel(writer, sheet_name='site_details', index=False) # Sheet 3: 参数记录 params = pd.DataFrame([ {'parameter': 'C_BAR (quality threshold)', 'value': C_BAR}, {'parameter': 'N_TOTAL', 'value': 730}, {'parameter': 'n_sites', 'value': len(df)}, ]) params.to_excel(writer, sheet_name='parameters', index=False) print(f" 已保存3个工作表: metrics_summary, site_details, parameters") print("\n" + "=" * 60) print("Step 04 完成") print("=" * 60) return df_metrics if __name__ == "__main__": main()