Files
mcm-mfp/task1/04_evaluate.py

234 lines
8.0 KiB
Python
Raw Normal View History

2026-01-19 10:14:46 +08:00
"""
Step 04: 评估指标计算
输入: 03_allocate.xlsx
输出: 04_metrics.xlsx
功能:
1. 计算有效性指标 (E1, E2)
2. 计算公平性指标 (F1, F2, F3)
3. 与基线方案对比
4. 输出综合评估报告
指标定义:
- E1: 原始总服务量 = Σ k_i × μ_i
- E2: 质量加权服务量 = Σ k_i × μ_i × q(μ_i), q(μ) = min(1, 250/μ)
- F1: 满足率基尼系数 Gini(r)
- F2: 最差站点满足率 min(r)
- F3: 满足率变异系数 CV(r)
"""
import pandas as pd
import numpy as np
from pathlib import Path
# 路径配置
INPUT_PATH = Path(__file__).parent / "03_allocate.xlsx"
OUTPUT_PATH = Path(__file__).parent / "04_metrics.xlsx"
# 评估参数
C_BAR = 250 # 典型服务量 (质量折扣阈值)
def quality_discount(mu: float, c_bar: float = C_BAR) -> float:
"""
质量折扣因子: μ > c_bar 服务质量打折
q(μ) = min(1, c_bar / μ)
"""
return min(1.0, c_bar / mu) if mu > 0 else 1.0
def gini_coefficient(values: list) -> float:
"""
计算基尼系数
Gini = Σ_i Σ_j |x_i - x_j| / (2n Σ_i x_i)
"""
values = np.array(values)
n = len(values)
if n == 0 or values.sum() == 0:
return 0.0
# 排序后计算
sorted_values = np.sort(values)
cumsum = np.cumsum(sorted_values)
return (2 * np.sum((np.arange(1, n + 1) * sorted_values)) - (n + 1) * cumsum[-1]) / (n * cumsum[-1])
def compute_metrics(df: pd.DataFrame, method_name: str) -> dict:
"""计算单个方案的所有指标"""
# 有效性指标
E1 = (df['k'] * df['mu']).sum()
E2 = (df['k'] * df['mu'] * df['mu'].apply(quality_discount)).sum()
# 满足率
r = df['k'] * df['mu'] / df['mu_tilde']
# 公平性指标
F1 = gini_coefficient(r.tolist())
F2 = r.min()
F3 = r.std() / r.mean()
return {
'method': method_name,
'E1_total_service': E1,
'E2_quality_weighted': E2,
'F1_gini': F1,
'F2_min_satisfaction': F2,
'F3_cv_satisfaction': F3
}
def compute_baseline_uniform(df: pd.DataFrame, n_total: int = 730) -> pd.DataFrame:
"""基线1: 均匀分配"""
df_baseline = df.copy()
k_uniform = n_total // len(df)
remainder = n_total - k_uniform * len(df)
df_baseline['k'] = k_uniform
# 将余数分配给前 remainder 个站点
df_baseline.iloc[:remainder, df_baseline.columns.get_loc('k')] += 1
return df_baseline
def compute_baseline_2019_scaled(df: pd.DataFrame, n_total: int = 730) -> pd.DataFrame:
"""基线2: 2019年访问次数等比例缩放"""
df_baseline = df.copy()
total_2019 = df['visits_2019'].sum()
# 按比例缩放
k_float = df['visits_2019'] * n_total / total_2019
k_floor = k_float.astype(int)
remainder = n_total - k_floor.sum()
# 按余数分配
remainders = k_float - k_floor
indices = remainders.nlargest(remainder).index
k_floor.loc[indices] += 1
df_baseline['k'] = k_floor
return df_baseline
def compute_baseline_mu_raw(df: pd.DataFrame, n_total: int = 730) -> pd.DataFrame:
"""基线3: 按原始μ比例分配 (无截断修正)"""
df_baseline = df.copy()
# 覆盖约束
k_base = 1
extra = n_total - len(df) * k_base
# 按μ比例分配额外次数
weights = df['mu'].tolist()
w_sum = sum(weights)
quotas = [extra * w / w_sum for w in weights]
floors = [int(q) for q in quotas]
remainders = [q - f for q, f in zip(quotas, floors)]
leftover = extra - sum(floors)
indices = sorted(range(len(weights)), key=lambda i: -remainders[i])
for i in indices[:leftover]:
floors[i] += 1
df_baseline['k'] = [k_base + f for f in floors]
return df_baseline
def main():
print("=" * 60)
print("Step 04: 评估指标计算")
print("=" * 60)
# 1. 读取分配结果
print(f"\n[1] 读取输入: {INPUT_PATH}")
df = pd.read_excel(INPUT_PATH)
print(f" 读取 {len(df)} 条记录")
# 2. 计算推荐方案指标
print(f"\n[2] 计算推荐方案指标 (按μ̃比例分配)")
metrics_recommended = compute_metrics(df, 'Recommended (μ̃ proportional)')
for key, value in metrics_recommended.items():
if key != 'method':
print(f" {key}: {value:.4f}")
# 3. 计算基线方案指标
print(f"\n[3] 计算基线方案指标")
# 基线1: 均匀分配
df_b1 = compute_baseline_uniform(df)
metrics_b1 = compute_metrics(df_b1, 'Baseline 1: Uniform')
print(f"\n 基线1 (均匀分配, k={df_b1['k'].iloc[0]}~{df_b1['k'].max()}):")
for key, value in metrics_b1.items():
if key != 'method':
print(f" {key}: {value:.4f}")
# 基线2: 2019年缩放
df_b2 = compute_baseline_2019_scaled(df)
metrics_b2 = compute_metrics(df_b2, 'Baseline 2: 2019 Scaled')
print(f"\n 基线2 (2019年缩放):")
for key, value in metrics_b2.items():
if key != 'method':
print(f" {key}: {value:.4f}")
# 基线3: 按原始μ比例
df_b3 = compute_baseline_mu_raw(df)
metrics_b3 = compute_metrics(df_b3, 'Baseline 3: μ proportional (no correction)')
print(f"\n 基线3 (按原始μ比例, 无截断修正):")
for key, value in metrics_b3.items():
if key != 'method':
print(f" {key}: {value:.4f}")
# 4. 汇总对比
print(f"\n[4] 方案对比汇总")
all_metrics = [metrics_recommended, metrics_b1, metrics_b2, metrics_b3]
df_metrics = pd.DataFrame(all_metrics)
# 计算相对于推荐方案的变化
for col in ['E1_total_service', 'E2_quality_weighted']:
base_val = metrics_recommended[col]
df_metrics[f'{col}_pct'] = (df_metrics[col] / base_val - 1) * 100
print("\n " + "-" * 90)
print(f" {'方案':<45} {'E1':>12} {'E2':>12} {'F1(Gini)':>10} {'F2(min r)':>10} {'F3(CV)':>10}")
print(" " + "-" * 90)
for _, row in df_metrics.iterrows():
print(f" {row['method']:<45} {row['E1_total_service']:>12.1f} {row['E2_quality_weighted']:>12.1f} {row['F1_gini']:>10.4f} {row['F2_min_satisfaction']:>10.2f} {row['F3_cv_satisfaction']:>10.4f}")
print(" " + "-" * 90)
# 5. 推荐方案优势分析
print(f"\n[5] 推荐方案优势分析")
print(f" 相对于均匀分配 (基线1):")
print(f" - E1 提升: {(metrics_recommended['E1_total_service']/metrics_b1['E1_total_service']-1)*100:+.2f}%")
print(f" - E2 提升: {(metrics_recommended['E2_quality_weighted']/metrics_b1['E2_quality_weighted']-1)*100:+.2f}%")
print(f"\n 相对于2019缩放 (基线2):")
print(f" - E1 变化: {(metrics_recommended['E1_total_service']/metrics_b2['E1_total_service']-1)*100:+.2f}%")
print(f" - E2 变化: {(metrics_recommended['E2_quality_weighted']/metrics_b2['E2_quality_weighted']-1)*100:+.2f}%")
print(f" - F1(Gini)变化: {(metrics_recommended['F1_gini']-metrics_b2['F1_gini']):+.4f}")
# 6. 保存输出
print(f"\n[6] 保存输出: {OUTPUT_PATH}")
with pd.ExcelWriter(OUTPUT_PATH, engine='openpyxl') as writer:
# Sheet 1: 指标汇总
df_metrics.to_excel(writer, sheet_name='metrics_summary', index=False)
# Sheet 2: 站点级详情 (推荐方案)
df_detail = df[['site_id', 'site_name', 'mu', 'mu_tilde', 'k', 'annual_service', 'r']].copy()
df_detail['quality_factor'] = df['mu'].apply(quality_discount)
df_detail['service_quality_weighted'] = df_detail['k'] * df_detail['mu'] * df_detail['quality_factor']
df_detail.to_excel(writer, sheet_name='site_details', index=False)
# Sheet 3: 参数记录
params = pd.DataFrame([
{'parameter': 'C_BAR (quality threshold)', 'value': C_BAR},
{'parameter': 'N_TOTAL', 'value': 730},
{'parameter': 'n_sites', 'value': len(df)},
])
params.to_excel(writer, sheet_name='parameters', index=False)
print(f" 已保存3个工作表: metrics_summary, site_details, parameters")
print("\n" + "=" * 60)
print("Step 04 完成")
print("=" * 60)
return df_metrics
if __name__ == "__main__":
main()