Files
mcm-mfp/task1/04_evaluate.py
2026-01-19 10:14:46 +08:00

234 lines
8.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Step 04: 评估指标计算
输入: 03_allocate.xlsx
输出: 04_metrics.xlsx
功能:
1. 计算有效性指标 (E1, E2)
2. 计算公平性指标 (F1, F2, F3)
3. 与基线方案对比
4. 输出综合评估报告
指标定义:
- E1: 原始总服务量 = Σ k_i × μ_i
- E2: 质量加权服务量 = Σ k_i × μ_i × q(μ_i), q(μ) = min(1, 250/μ)
- F1: 满足率基尼系数 Gini(r)
- F2: 最差站点满足率 min(r)
- F3: 满足率变异系数 CV(r)
"""
import pandas as pd
import numpy as np
from pathlib import Path
# 路径配置
INPUT_PATH = Path(__file__).parent / "03_allocate.xlsx"
OUTPUT_PATH = Path(__file__).parent / "04_metrics.xlsx"
# 评估参数
C_BAR = 250 # 典型服务量 (质量折扣阈值)
def quality_discount(mu: float, c_bar: float = C_BAR) -> float:
"""
质量折扣因子: 当 μ > c_bar 时,服务质量打折
q(μ) = min(1, c_bar / μ)
"""
return min(1.0, c_bar / mu) if mu > 0 else 1.0
def gini_coefficient(values: list) -> float:
"""
计算基尼系数
Gini = Σ_i Σ_j |x_i - x_j| / (2n Σ_i x_i)
"""
values = np.array(values)
n = len(values)
if n == 0 or values.sum() == 0:
return 0.0
# 排序后计算
sorted_values = np.sort(values)
cumsum = np.cumsum(sorted_values)
return (2 * np.sum((np.arange(1, n + 1) * sorted_values)) - (n + 1) * cumsum[-1]) / (n * cumsum[-1])
def compute_metrics(df: pd.DataFrame, method_name: str) -> dict:
"""计算单个方案的所有指标"""
# 有效性指标
E1 = (df['k'] * df['mu']).sum()
E2 = (df['k'] * df['mu'] * df['mu'].apply(quality_discount)).sum()
# 满足率
r = df['k'] * df['mu'] / df['mu_tilde']
# 公平性指标
F1 = gini_coefficient(r.tolist())
F2 = r.min()
F3 = r.std() / r.mean()
return {
'method': method_name,
'E1_total_service': E1,
'E2_quality_weighted': E2,
'F1_gini': F1,
'F2_min_satisfaction': F2,
'F3_cv_satisfaction': F3
}
def compute_baseline_uniform(df: pd.DataFrame, n_total: int = 730) -> pd.DataFrame:
"""基线1: 均匀分配"""
df_baseline = df.copy()
k_uniform = n_total // len(df)
remainder = n_total - k_uniform * len(df)
df_baseline['k'] = k_uniform
# 将余数分配给前 remainder 个站点
df_baseline.iloc[:remainder, df_baseline.columns.get_loc('k')] += 1
return df_baseline
def compute_baseline_2019_scaled(df: pd.DataFrame, n_total: int = 730) -> pd.DataFrame:
"""基线2: 2019年访问次数等比例缩放"""
df_baseline = df.copy()
total_2019 = df['visits_2019'].sum()
# 按比例缩放
k_float = df['visits_2019'] * n_total / total_2019
k_floor = k_float.astype(int)
remainder = n_total - k_floor.sum()
# 按余数分配
remainders = k_float - k_floor
indices = remainders.nlargest(remainder).index
k_floor.loc[indices] += 1
df_baseline['k'] = k_floor
return df_baseline
def compute_baseline_mu_raw(df: pd.DataFrame, n_total: int = 730) -> pd.DataFrame:
"""基线3: 按原始μ比例分配 (无截断修正)"""
df_baseline = df.copy()
# 覆盖约束
k_base = 1
extra = n_total - len(df) * k_base
# 按μ比例分配额外次数
weights = df['mu'].tolist()
w_sum = sum(weights)
quotas = [extra * w / w_sum for w in weights]
floors = [int(q) for q in quotas]
remainders = [q - f for q, f in zip(quotas, floors)]
leftover = extra - sum(floors)
indices = sorted(range(len(weights)), key=lambda i: -remainders[i])
for i in indices[:leftover]:
floors[i] += 1
df_baseline['k'] = [k_base + f for f in floors]
return df_baseline
def main():
print("=" * 60)
print("Step 04: 评估指标计算")
print("=" * 60)
# 1. 读取分配结果
print(f"\n[1] 读取输入: {INPUT_PATH}")
df = pd.read_excel(INPUT_PATH)
print(f" 读取 {len(df)} 条记录")
# 2. 计算推荐方案指标
print(f"\n[2] 计算推荐方案指标 (按μ̃比例分配)")
metrics_recommended = compute_metrics(df, 'Recommended (μ̃ proportional)')
for key, value in metrics_recommended.items():
if key != 'method':
print(f" {key}: {value:.4f}")
# 3. 计算基线方案指标
print(f"\n[3] 计算基线方案指标")
# 基线1: 均匀分配
df_b1 = compute_baseline_uniform(df)
metrics_b1 = compute_metrics(df_b1, 'Baseline 1: Uniform')
print(f"\n 基线1 (均匀分配, k={df_b1['k'].iloc[0]}~{df_b1['k'].max()}):")
for key, value in metrics_b1.items():
if key != 'method':
print(f" {key}: {value:.4f}")
# 基线2: 2019年缩放
df_b2 = compute_baseline_2019_scaled(df)
metrics_b2 = compute_metrics(df_b2, 'Baseline 2: 2019 Scaled')
print(f"\n 基线2 (2019年缩放):")
for key, value in metrics_b2.items():
if key != 'method':
print(f" {key}: {value:.4f}")
# 基线3: 按原始μ比例
df_b3 = compute_baseline_mu_raw(df)
metrics_b3 = compute_metrics(df_b3, 'Baseline 3: μ proportional (no correction)')
print(f"\n 基线3 (按原始μ比例, 无截断修正):")
for key, value in metrics_b3.items():
if key != 'method':
print(f" {key}: {value:.4f}")
# 4. 汇总对比
print(f"\n[4] 方案对比汇总")
all_metrics = [metrics_recommended, metrics_b1, metrics_b2, metrics_b3]
df_metrics = pd.DataFrame(all_metrics)
# 计算相对于推荐方案的变化
for col in ['E1_total_service', 'E2_quality_weighted']:
base_val = metrics_recommended[col]
df_metrics[f'{col}_pct'] = (df_metrics[col] / base_val - 1) * 100
print("\n " + "-" * 90)
print(f" {'方案':<45} {'E1':>12} {'E2':>12} {'F1(Gini)':>10} {'F2(min r)':>10} {'F3(CV)':>10}")
print(" " + "-" * 90)
for _, row in df_metrics.iterrows():
print(f" {row['method']:<45} {row['E1_total_service']:>12.1f} {row['E2_quality_weighted']:>12.1f} {row['F1_gini']:>10.4f} {row['F2_min_satisfaction']:>10.2f} {row['F3_cv_satisfaction']:>10.4f}")
print(" " + "-" * 90)
# 5. 推荐方案优势分析
print(f"\n[5] 推荐方案优势分析")
print(f" 相对于均匀分配 (基线1):")
print(f" - E1 提升: {(metrics_recommended['E1_total_service']/metrics_b1['E1_total_service']-1)*100:+.2f}%")
print(f" - E2 提升: {(metrics_recommended['E2_quality_weighted']/metrics_b1['E2_quality_weighted']-1)*100:+.2f}%")
print(f"\n 相对于2019缩放 (基线2):")
print(f" - E1 变化: {(metrics_recommended['E1_total_service']/metrics_b2['E1_total_service']-1)*100:+.2f}%")
print(f" - E2 变化: {(metrics_recommended['E2_quality_weighted']/metrics_b2['E2_quality_weighted']-1)*100:+.2f}%")
print(f" - F1(Gini)变化: {(metrics_recommended['F1_gini']-metrics_b2['F1_gini']):+.4f}")
# 6. 保存输出
print(f"\n[6] 保存输出: {OUTPUT_PATH}")
with pd.ExcelWriter(OUTPUT_PATH, engine='openpyxl') as writer:
# Sheet 1: 指标汇总
df_metrics.to_excel(writer, sheet_name='metrics_summary', index=False)
# Sheet 2: 站点级详情 (推荐方案)
df_detail = df[['site_id', 'site_name', 'mu', 'mu_tilde', 'k', 'annual_service', 'r']].copy()
df_detail['quality_factor'] = df['mu'].apply(quality_discount)
df_detail['service_quality_weighted'] = df_detail['k'] * df_detail['mu'] * df_detail['quality_factor']
df_detail.to_excel(writer, sheet_name='site_details', index=False)
# Sheet 3: 参数记录
params = pd.DataFrame([
{'parameter': 'C_BAR (quality threshold)', 'value': C_BAR},
{'parameter': 'N_TOTAL', 'value': 730},
{'parameter': 'n_sites', 'value': len(df)},
])
params.to_excel(writer, sheet_name='parameters', index=False)
print(f" 已保存3个工作表: metrics_summary, site_details, parameters")
print("\n" + "=" * 60)
print("Step 04 完成")
print("=" * 60)
return df_metrics
if __name__ == "__main__":
main()