234 lines
8.0 KiB
Python
234 lines
8.0 KiB
Python
"""
|
||
Step 04: 评估指标计算
|
||
|
||
输入: 03_allocate.xlsx
|
||
输出: 04_metrics.xlsx
|
||
|
||
功能:
|
||
1. 计算有效性指标 (E1, E2)
|
||
2. 计算公平性指标 (F1, F2, F3)
|
||
3. 与基线方案对比
|
||
4. 输出综合评估报告
|
||
|
||
指标定义:
|
||
- E1: 原始总服务量 = Σ k_i × μ_i
|
||
- E2: 质量加权服务量 = Σ k_i × μ_i × q(μ_i), q(μ) = min(1, 250/μ)
|
||
- F1: 满足率基尼系数 Gini(r)
|
||
- F2: 最差站点满足率 min(r)
|
||
- F3: 满足率变异系数 CV(r)
|
||
"""
|
||
|
||
import pandas as pd
|
||
import numpy as np
|
||
from pathlib import Path
|
||
|
||
# 路径配置
|
||
INPUT_PATH = Path(__file__).parent / "03_allocate.xlsx"
|
||
OUTPUT_PATH = Path(__file__).parent / "04_metrics.xlsx"
|
||
|
||
# 评估参数
|
||
C_BAR = 250 # 典型服务量 (质量折扣阈值)
|
||
|
||
|
||
def quality_discount(mu: float, c_bar: float = C_BAR) -> float:
|
||
"""
|
||
质量折扣因子: 当 μ > c_bar 时,服务质量打折
|
||
|
||
q(μ) = min(1, c_bar / μ)
|
||
"""
|
||
return min(1.0, c_bar / mu) if mu > 0 else 1.0
|
||
|
||
|
||
def gini_coefficient(values: list) -> float:
|
||
"""
|
||
计算基尼系数
|
||
|
||
Gini = Σ_i Σ_j |x_i - x_j| / (2n Σ_i x_i)
|
||
"""
|
||
values = np.array(values)
|
||
n = len(values)
|
||
if n == 0 or values.sum() == 0:
|
||
return 0.0
|
||
|
||
# 排序后计算
|
||
sorted_values = np.sort(values)
|
||
cumsum = np.cumsum(sorted_values)
|
||
return (2 * np.sum((np.arange(1, n + 1) * sorted_values)) - (n + 1) * cumsum[-1]) / (n * cumsum[-1])
|
||
|
||
|
||
def compute_metrics(df: pd.DataFrame, method_name: str) -> dict:
|
||
"""计算单个方案的所有指标"""
|
||
# 有效性指标
|
||
E1 = (df['k'] * df['mu']).sum()
|
||
E2 = (df['k'] * df['mu'] * df['mu'].apply(quality_discount)).sum()
|
||
|
||
# 满足率
|
||
r = df['k'] * df['mu'] / df['mu_tilde']
|
||
|
||
# 公平性指标
|
||
F1 = gini_coefficient(r.tolist())
|
||
F2 = r.min()
|
||
F3 = r.std() / r.mean()
|
||
|
||
return {
|
||
'method': method_name,
|
||
'E1_total_service': E1,
|
||
'E2_quality_weighted': E2,
|
||
'F1_gini': F1,
|
||
'F2_min_satisfaction': F2,
|
||
'F3_cv_satisfaction': F3
|
||
}
|
||
|
||
|
||
def compute_baseline_uniform(df: pd.DataFrame, n_total: int = 730) -> pd.DataFrame:
|
||
"""基线1: 均匀分配"""
|
||
df_baseline = df.copy()
|
||
k_uniform = n_total // len(df)
|
||
remainder = n_total - k_uniform * len(df)
|
||
df_baseline['k'] = k_uniform
|
||
# 将余数分配给前 remainder 个站点
|
||
df_baseline.iloc[:remainder, df_baseline.columns.get_loc('k')] += 1
|
||
return df_baseline
|
||
|
||
|
||
def compute_baseline_2019_scaled(df: pd.DataFrame, n_total: int = 730) -> pd.DataFrame:
|
||
"""基线2: 2019年访问次数等比例缩放"""
|
||
df_baseline = df.copy()
|
||
total_2019 = df['visits_2019'].sum()
|
||
# 按比例缩放
|
||
k_float = df['visits_2019'] * n_total / total_2019
|
||
k_floor = k_float.astype(int)
|
||
remainder = n_total - k_floor.sum()
|
||
# 按余数分配
|
||
remainders = k_float - k_floor
|
||
indices = remainders.nlargest(remainder).index
|
||
k_floor.loc[indices] += 1
|
||
df_baseline['k'] = k_floor
|
||
return df_baseline
|
||
|
||
|
||
def compute_baseline_mu_raw(df: pd.DataFrame, n_total: int = 730) -> pd.DataFrame:
|
||
"""基线3: 按原始μ比例分配 (无截断修正)"""
|
||
df_baseline = df.copy()
|
||
# 覆盖约束
|
||
k_base = 1
|
||
extra = n_total - len(df) * k_base
|
||
# 按μ比例分配额外次数
|
||
weights = df['mu'].tolist()
|
||
w_sum = sum(weights)
|
||
quotas = [extra * w / w_sum for w in weights]
|
||
floors = [int(q) for q in quotas]
|
||
remainders = [q - f for q, f in zip(quotas, floors)]
|
||
leftover = extra - sum(floors)
|
||
indices = sorted(range(len(weights)), key=lambda i: -remainders[i])
|
||
for i in indices[:leftover]:
|
||
floors[i] += 1
|
||
df_baseline['k'] = [k_base + f for f in floors]
|
||
return df_baseline
|
||
|
||
|
||
def main():
|
||
print("=" * 60)
|
||
print("Step 04: 评估指标计算")
|
||
print("=" * 60)
|
||
|
||
# 1. 读取分配结果
|
||
print(f"\n[1] 读取输入: {INPUT_PATH}")
|
||
df = pd.read_excel(INPUT_PATH)
|
||
print(f" 读取 {len(df)} 条记录")
|
||
|
||
# 2. 计算推荐方案指标
|
||
print(f"\n[2] 计算推荐方案指标 (按μ̃比例分配)")
|
||
metrics_recommended = compute_metrics(df, 'Recommended (μ̃ proportional)')
|
||
for key, value in metrics_recommended.items():
|
||
if key != 'method':
|
||
print(f" {key}: {value:.4f}")
|
||
|
||
# 3. 计算基线方案指标
|
||
print(f"\n[3] 计算基线方案指标")
|
||
|
||
# 基线1: 均匀分配
|
||
df_b1 = compute_baseline_uniform(df)
|
||
metrics_b1 = compute_metrics(df_b1, 'Baseline 1: Uniform')
|
||
print(f"\n 基线1 (均匀分配, k={df_b1['k'].iloc[0]}~{df_b1['k'].max()}):")
|
||
for key, value in metrics_b1.items():
|
||
if key != 'method':
|
||
print(f" {key}: {value:.4f}")
|
||
|
||
# 基线2: 2019年缩放
|
||
df_b2 = compute_baseline_2019_scaled(df)
|
||
metrics_b2 = compute_metrics(df_b2, 'Baseline 2: 2019 Scaled')
|
||
print(f"\n 基线2 (2019年缩放):")
|
||
for key, value in metrics_b2.items():
|
||
if key != 'method':
|
||
print(f" {key}: {value:.4f}")
|
||
|
||
# 基线3: 按原始μ比例
|
||
df_b3 = compute_baseline_mu_raw(df)
|
||
metrics_b3 = compute_metrics(df_b3, 'Baseline 3: μ proportional (no correction)')
|
||
print(f"\n 基线3 (按原始μ比例, 无截断修正):")
|
||
for key, value in metrics_b3.items():
|
||
if key != 'method':
|
||
print(f" {key}: {value:.4f}")
|
||
|
||
# 4. 汇总对比
|
||
print(f"\n[4] 方案对比汇总")
|
||
all_metrics = [metrics_recommended, metrics_b1, metrics_b2, metrics_b3]
|
||
df_metrics = pd.DataFrame(all_metrics)
|
||
|
||
# 计算相对于推荐方案的变化
|
||
for col in ['E1_total_service', 'E2_quality_weighted']:
|
||
base_val = metrics_recommended[col]
|
||
df_metrics[f'{col}_pct'] = (df_metrics[col] / base_val - 1) * 100
|
||
|
||
print("\n " + "-" * 90)
|
||
print(f" {'方案':<45} {'E1':>12} {'E2':>12} {'F1(Gini)':>10} {'F2(min r)':>10} {'F3(CV)':>10}")
|
||
print(" " + "-" * 90)
|
||
for _, row in df_metrics.iterrows():
|
||
print(f" {row['method']:<45} {row['E1_total_service']:>12.1f} {row['E2_quality_weighted']:>12.1f} {row['F1_gini']:>10.4f} {row['F2_min_satisfaction']:>10.2f} {row['F3_cv_satisfaction']:>10.4f}")
|
||
print(" " + "-" * 90)
|
||
|
||
# 5. 推荐方案优势分析
|
||
print(f"\n[5] 推荐方案优势分析")
|
||
print(f" 相对于均匀分配 (基线1):")
|
||
print(f" - E1 提升: {(metrics_recommended['E1_total_service']/metrics_b1['E1_total_service']-1)*100:+.2f}%")
|
||
print(f" - E2 提升: {(metrics_recommended['E2_quality_weighted']/metrics_b1['E2_quality_weighted']-1)*100:+.2f}%")
|
||
|
||
print(f"\n 相对于2019缩放 (基线2):")
|
||
print(f" - E1 变化: {(metrics_recommended['E1_total_service']/metrics_b2['E1_total_service']-1)*100:+.2f}%")
|
||
print(f" - E2 变化: {(metrics_recommended['E2_quality_weighted']/metrics_b2['E2_quality_weighted']-1)*100:+.2f}%")
|
||
print(f" - F1(Gini)变化: {(metrics_recommended['F1_gini']-metrics_b2['F1_gini']):+.4f}")
|
||
|
||
# 6. 保存输出
|
||
print(f"\n[6] 保存输出: {OUTPUT_PATH}")
|
||
|
||
with pd.ExcelWriter(OUTPUT_PATH, engine='openpyxl') as writer:
|
||
# Sheet 1: 指标汇总
|
||
df_metrics.to_excel(writer, sheet_name='metrics_summary', index=False)
|
||
|
||
# Sheet 2: 站点级详情 (推荐方案)
|
||
df_detail = df[['site_id', 'site_name', 'mu', 'mu_tilde', 'k', 'annual_service', 'r']].copy()
|
||
df_detail['quality_factor'] = df['mu'].apply(quality_discount)
|
||
df_detail['service_quality_weighted'] = df_detail['k'] * df_detail['mu'] * df_detail['quality_factor']
|
||
df_detail.to_excel(writer, sheet_name='site_details', index=False)
|
||
|
||
# Sheet 3: 参数记录
|
||
params = pd.DataFrame([
|
||
{'parameter': 'C_BAR (quality threshold)', 'value': C_BAR},
|
||
{'parameter': 'N_TOTAL', 'value': 730},
|
||
{'parameter': 'n_sites', 'value': len(df)},
|
||
])
|
||
params.to_excel(writer, sheet_name='parameters', index=False)
|
||
|
||
print(f" 已保存3个工作表: metrics_summary, site_details, parameters")
|
||
|
||
print("\n" + "=" * 60)
|
||
print("Step 04 完成")
|
||
print("=" * 60)
|
||
|
||
return df_metrics
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|