Files
mcm-mfp/task3/06_evaluate.py
2026-01-19 11:57:19 +08:00

349 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Task 3 - Step 6: 效果评估
=========================
输入:
- 04_reschedule.xlsx (访问次数)
- 03_allocation.xlsx (配对分配方案)
- ../task1/03_allocate.xlsx (Task 1结果用于对比)
- ../task1/04_metrics.xlsx (Task 1指标用于对比)
输出: 06_evaluate.xlsx (评估指标对比)
评估指标:
- E1': 期望总服务量
- E2': 质量加权服务量(总量计算衰减)
- F1': 满足率Gini系数
- F2': 最低满足率
- R1: 服务缺口风险
- RS: 资源节省率
有效性衰减公式(总量计算):
q(μ_i + μ_j) = min(1, 250 / (μ_i + μ_j))
"""
import pandas as pd
import numpy as np
from scipy import stats
# ============================================
# 参数设置
# ============================================
RESCHEDULE_FILE = '04_reschedule.xlsx'
ALLOCATION_FILE = '03_allocation.xlsx'
TASK1_ALLOC_FILE = '../task1/03_allocate.xlsx'
TASK1_METRIC_FILE = '../task1/04_metrics.xlsx'
OUTPUT_FILE = '06_evaluate.xlsx'
Q = 400 # 卡车容量
QUALITY_THRESHOLD = 250 # 质量衰减阈值
SHORTFALL_THRESHOLD = 0.8 # 服务缺口阈值
# ============================================
# 辅助函数
# ============================================
def quality_factor(mu_total):
"""质量折扣因子(总量计算)"""
return min(1.0, QUALITY_THRESHOLD / mu_total) if mu_total > 0 else 1.0
def expected_service(q, mu, sigma):
"""E[min(D, q)]"""
if sigma == 0:
return min(mu, q)
z = (q - mu) / sigma
return mu * stats.norm.cdf(z) - sigma * stats.norm.pdf(z) + q * (1 - stats.norm.cdf(z))
def gini_coefficient(values):
"""计算Gini系数"""
values = np.array(values)
values = values[~np.isnan(values)]
if len(values) == 0:
return 0
values = np.sort(values)
n = len(values)
cumsum = np.cumsum(values)
return (2 * np.sum((np.arange(1, n + 1) * values)) - (n + 1) * cumsum[-1]) / (n * cumsum[-1]) if cumsum[-1] > 0 else 0
def shortfall_probability(q, mu, sigma, threshold=0.8):
"""P(S/D < threshold),即服务不足的概率"""
# 简化计算P(min(D,q) / D < 0.8) ≈ P(D > q/0.8)
if sigma == 0:
return 0 if q >= mu * threshold else 1
critical_demand = q / threshold
return 1 - stats.norm.cdf((critical_demand - mu) / sigma)
# ============================================
# 读取数据
# ============================================
print("=" * 60)
print("Task 3 - Step 6: 效果评估")
print("=" * 60)
# Task 3 数据
df_sites = pd.read_excel(RESCHEDULE_FILE, sheet_name='sites_schedule')
df_pairs = pd.read_excel(ALLOCATION_FILE, sheet_name='allocation')
df_pair_visits = pd.read_excel(RESCHEDULE_FILE, sheet_name='pair_visits')
print(f"\n读取Task 3数据:")
print(f" - 站点数: {len(df_sites)}")
print(f" - 配对数: {len(df_pairs)}")
# Task 1 数据(用于对比)
df_task1 = pd.read_excel(TASK1_ALLOC_FILE)
try:
df_task1_metrics = pd.read_excel(TASK1_METRIC_FILE)
has_task1_metrics = True
except:
has_task1_metrics = False
print(" - 未找到Task 1指标文件将重新计算")
# ============================================
# 计算 Task 3 指标
# ============================================
print(f"\n" + "-" * 40)
print("计算 Task 3 指标")
print("-" * 40)
# 合并配对访问信息
pair_k = {(row['site_i_id'], row['site_j_id']): row['k_dual']
for _, row in df_pair_visits.iterrows()}
# === E1': 期望总服务量 ===
E1_prime = 0
# 单站点贡献
for _, row in df_sites.iterrows():
k_single = row['k_single_final']
mu = row['mu']
E1_prime += k_single * mu
# 双站点贡献
for _, row in df_pairs.iterrows():
pair_key = (row['site_i_id'], row['site_j_id'])
k_dual = pair_k.get(pair_key, 0)
E_total = row['E_total']
E1_prime += k_dual * E_total
print(f"E1' (期望总服务量): {E1_prime:.0f}")
# === E2': 质量加权服务量 ===
E2_prime = 0
# 单站点贡献
for _, row in df_sites.iterrows():
k_single = row['k_single_final']
mu = row['mu']
q_factor = quality_factor(mu)
E2_prime += k_single * q_factor * mu
# 双站点贡献(总量计算衰减)
for _, row in df_pairs.iterrows():
pair_key = (row['site_i_id'], row['site_j_id'])
k_dual = pair_k.get(pair_key, 0)
mu_sum = row['mu_i'] + row['mu_j']
E_total = row['E_total']
q_factor = quality_factor(mu_sum) # 总量计算
E2_prime += k_dual * q_factor * E_total
print(f"E2' (质量加权服务量): {E2_prime:.0f}")
# === 满足率计算 ===
# Task 1定义: r_i = k_i * μ_i / μ̃_i (等效服务次数)
# Task 3: 需要分别计算单站点和双站点的贡献
site_satisfaction = {}
for _, row in df_sites.iterrows():
site_id = row['site_id']
mu = row['mu']
mu_tilde = row['mu_tilde']
k_single = row['k_single_final']
# 单站点贡献: k_single * μ / μ̃
r_single = k_single * mu / mu_tilde if mu_tilde > 0 else 0
site_satisfaction[site_id] = r_single
# 加上双站点贡献
for _, row in df_pairs.iterrows():
pair_key = (row['site_i_id'], row['site_j_id'])
k_dual = pair_k.get(pair_key, 0)
# 双站点贡献: k_dual * E[S_i] / μ̃_i
mu_tilde_i = df_sites[df_sites['site_id'] == row['site_i_id']]['mu_tilde'].values[0]
mu_tilde_j = df_sites[df_sites['site_id'] == row['site_j_id']]['mu_tilde'].values[0]
r_dual_i = k_dual * row['E_Si'] / mu_tilde_i if mu_tilde_i > 0 else 0
r_dual_j = k_dual * row['E_Sj'] / mu_tilde_j if mu_tilde_j > 0 else 0
site_satisfaction[row['site_i_id']] += r_dual_i
site_satisfaction[row['site_j_id']] += r_dual_j
# 计算满足率列表
satisfaction_rates = list(site_satisfaction.values())
# === F1': Gini系数 ===
F1_prime = gini_coefficient(satisfaction_rates)
print(f"F1' (满足率Gini): {F1_prime:.4f}")
# === F2': 最低满足率 ===
F2_prime = min(satisfaction_rates) if satisfaction_rates else 0
print(f"F2' (最低满足率): {F2_prime:.4f}")
# === R1: 服务缺口风险 ===
# 双站点访问时,任一站点服务不足的概率
shortfall_probs = []
for _, row in df_pairs.iterrows():
q_final = row['q_final']
mu_i, sigma_i = row['mu_i'], row['sigma_i']
mu_j, sigma_j = row['mu_j'], row['sigma_j']
# 站点i的缺口概率
p_i = shortfall_probability(q_final, mu_i, sigma_i, SHORTFALL_THRESHOLD)
# 站点j的缺口概率
p_j = shortfall_probability(Q - q_final, mu_j, sigma_j, SHORTFALL_THRESHOLD)
# 至少一个站点缺口的概率
p_either = 1 - (1 - p_i) * (1 - p_j)
shortfall_probs.append(p_either)
R1 = np.mean(shortfall_probs) if shortfall_probs else 0
print(f"R1 (平均缺口风险): {R1:.4f}")
# === RS: 资源节省率 ===
total_dual = df_pair_visits['k_dual'].sum()
RS = total_dual / 730 * 100
print(f"RS (资源节省率): {RS:.2f}%")
# ============================================
# 计算 Task 1 指标(用于对比)
# ============================================
print(f"\n" + "-" * 40)
print("计算 Task 1 指标(对比基准)")
print("-" * 40)
# E1: 总服务量
E1_task1 = (df_task1['k'] * df_task1['mu']).sum()
print(f"E1 (Task 1 总服务量): {E1_task1:.0f}")
# E2: 质量加权
E2_task1 = 0
for _, row in df_task1.iterrows():
q_factor = quality_factor(row['mu'])
E2_task1 += row['k'] * q_factor * row['mu']
print(f"E2 (Task 1 质量加权): {E2_task1:.0f}")
# F1: Gini系数 - 使用与Task 3相同的r定义
# r_i = k_i * μ_i / μ̃_i
task1_rates = []
for _, row in df_task1.iterrows():
r = row['k'] * row['mu'] / row['mu_tilde'] if row['mu_tilde'] > 0 else 0
task1_rates.append(r)
F1_task1 = gini_coefficient(task1_rates)
print(f"F1 (Task 1 Gini): {F1_task1:.4f}")
# F2: 最低满足率
F2_task1 = min(task1_rates) if task1_rates else 0
print(f"F2 (Task 1 最低满足率): {F2_task1:.4f}")
# ============================================
# 对比分析
# ============================================
print(f"\n" + "-" * 40)
print("Task 3 vs Task 1 对比")
print("-" * 40)
comparison = pd.DataFrame({
'Metric': ['E1 (总服务量)', 'E2 (质量加权)', 'F1 (Gini系数)', 'F2 (最低满足率)'],
'Task 1': [E1_task1, E2_task1, F1_task1, F2_task1],
'Task 3': [E1_prime, E2_prime, F1_prime, F2_prime],
'Change': [E1_prime - E1_task1, E2_prime - E2_task1,
F1_prime - F1_task1, F2_prime - F2_task1],
'Change %': [(E1_prime - E1_task1) / E1_task1 * 100,
(E2_prime - E2_task1) / E2_task1 * 100,
(F1_prime - F1_task1) / F1_task1 * 100 if F1_task1 != 0 else 0,
(F2_prime - F2_task1) / F2_task1 * 100 if F2_task1 != 0 else 0]
})
print(comparison.to_string(index=False))
# ============================================
# 保存结果
# ============================================
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
# Sheet 1: 指标对比
comparison.to_excel(writer, sheet_name='comparison', index=False)
# Sheet 2: Task 3 详细指标
task3_metrics = pd.DataFrame({
'metric': ['E1_prime', 'E2_prime', 'F1_prime', 'F2_prime',
'R1', 'RS', 'total_pairs', 'total_dual_visits'],
'value': [E1_prime, E2_prime, F1_prime, F2_prime,
R1, RS/100, len(df_pairs), total_dual],
'description': ['期望总服务量', '质量加权服务量', '满足率Gini系数', '最低满足率',
'平均缺口风险', '资源节省率', '配对数', '双站点访问次数']
})
task3_metrics.to_excel(writer, sheet_name='task3_metrics', index=False)
# Sheet 3: 站点级别满足率
site_rates = []
for site_id, r in site_satisfaction.items():
site_row = df_sites[df_sites['site_id'] == site_id]
if len(site_row) > 0:
site_name = site_row['site_name'].values[0]
mu = site_row['mu'].values[0]
mu_tilde = site_row['mu_tilde'].values[0]
k_single = site_row['k_single_final'].values[0]
k_dual = site_row['k_dual'].values[0]
else:
site_name = f"Site_{site_id}"
mu, mu_tilde, k_single, k_dual = 0, 0, 0, 0
site_rates.append({
'site_id': site_id,
'site_name': site_name,
'mu': mu,
'mu_tilde': mu_tilde,
'k_single': k_single,
'k_dual': k_dual,
'satisfaction_rate_r': r
})
df_site_rates = pd.DataFrame(site_rates)
df_site_rates = df_site_rates.sort_values('satisfaction_rate_r')
df_site_rates.to_excel(writer, sheet_name='site_satisfaction', index=False)
# Sheet 4: 配对风险分析
pair_risk = []
for idx, row in df_pairs.iterrows():
pair_key = (row['site_i_id'], row['site_j_id'])
k_dual = pair_k.get(pair_key, 0)
q_final = row['q_final']
mu_i, sigma_i = row['mu_i'], row['sigma_i']
mu_j, sigma_j = row['mu_j'], row['sigma_j']
p_i = shortfall_probability(q_final, mu_i, sigma_i, SHORTFALL_THRESHOLD)
p_j = shortfall_probability(Q - q_final, mu_j, sigma_j, SHORTFALL_THRESHOLD)
pair_risk.append({
'site_i_name': row['site_i_name'],
'site_j_name': row['site_j_name'],
'k_dual': k_dual,
'q_final': q_final,
'shortfall_prob_i': p_i,
'shortfall_prob_j': p_j,
'shortfall_prob_either': 1 - (1 - p_i) * (1 - p_j)
})
df_pair_risk = pd.DataFrame(pair_risk)
df_pair_risk = df_pair_risk.sort_values('shortfall_prob_either', ascending=False)
df_pair_risk.to_excel(writer, sheet_name='pair_risk', index=False)
print(f"\n结果已保存至: {OUTPUT_FILE}")
print(" - Sheet 'comparison': Task 1 vs Task 3 对比")
print(" - Sheet 'task3_metrics': Task 3 详细指标")
print(" - Sheet 'site_satisfaction': 站点满足率")
print(" - Sheet 'pair_risk': 配对风险分析")
print("\n" + "=" * 60)