Files
mcm-mfp/task1/07_backtest.py

216 lines
8.3 KiB
Python
Raw Permalink Normal View History

2026-01-19 10:39:37 +08:00
"""
Step 07: 历史回测验证
输入: 01_clean.xlsx, 02_demand.xlsx, 03_allocate.xlsx
输出: 07_backtest.xlsx
功能:
1. 验证截断修正模型的合理性
2. 对比2019年实际访问次数与模型建议的分配
3. 分析残差分布
4. 计算模型拟合指标 (, MAPE)
"""
import pandas as pd
import numpy as np
from scipy.stats import norm, pearsonr, spearmanr
from pathlib import Path
# 路径配置
CLEAN_PATH = Path(__file__).parent / "01_clean.xlsx"
DEMAND_PATH = Path(__file__).parent / "02_demand.xlsx"
ALLOCATE_PATH = Path(__file__).parent / "03_allocate.xlsx"
OUTPUT_PATH = Path(__file__).parent / "07_backtest.xlsx"
def compute_metrics(actual, predicted):
"""计算回归评估指标"""
actual = np.array(actual)
predicted = np.array(predicted)
# 避免除零
mask = actual != 0
actual_nz = actual[mask]
predicted_nz = predicted[mask]
# MAPE (Mean Absolute Percentage Error)
mape = np.mean(np.abs((actual_nz - predicted_nz) / actual_nz)) * 100
# MAE (Mean Absolute Error)
mae = np.mean(np.abs(actual - predicted))
# RMSE (Root Mean Squared Error)
rmse = np.sqrt(np.mean((actual - predicted) ** 2))
# R² (Coefficient of Determination)
ss_res = np.sum((actual - predicted) ** 2)
ss_tot = np.sum((actual - np.mean(actual)) ** 2)
r2 = 1 - ss_res / ss_tot if ss_tot != 0 else 0
# Pearson correlation
pearson_r, pearson_p = pearsonr(actual, predicted)
# Spearman correlation (rank-based)
spearman_r, spearman_p = spearmanr(actual, predicted)
return {
'MAPE': mape,
'MAE': mae,
'RMSE': rmse,
'R2': r2,
'Pearson_r': pearson_r,
'Pearson_p': pearson_p,
'Spearman_r': spearman_r,
'Spearman_p': spearman_p
}
def main():
print("=" * 60)
print("Step 07: 历史回测验证")
print("=" * 60)
# 1. 读取数据
print(f"\n[1] 读取输入数据")
df_clean = pd.read_excel(CLEAN_PATH)
df_demand = pd.read_excel(DEMAND_PATH)
df_allocate = pd.read_excel(ALLOCATE_PATH)
# 合并数据
df = df_clean.merge(df_demand[['site_id', 'mu_tilde', 'p_trunc', 'is_corrected']], on='site_id')
df = df.merge(df_allocate[['site_id', 'k']], on='site_id')
print(f" 读取 {len(df)} 站点数据")
# 2. 分析1: 2019年访问次数 vs 需求μ的关系
print(f"\n[2] 分析1: 2019年访问次数 vs 需求μ的相关性")
# 2019年的访问次数是否与需求成正比
corr_2019_mu = pearsonr(df['visits_2019'], df['mu'])
corr_2019_mu_tilde = pearsonr(df['visits_2019'], df['mu_tilde'])
print(f" visits_2019 vs μ: r = {corr_2019_mu[0]:.4f}, p = {corr_2019_mu[1]:.4e}")
print(f" visits_2019 vs μ̃: r = {corr_2019_mu_tilde[0]:.4f}, p = {corr_2019_mu_tilde[1]:.4e}")
# 解读
if abs(corr_2019_mu[0]) < 0.3:
print(f" 解读: 2019年访问次数与需求弱相关说明历史分配未充分考虑需求")
else:
print(f" 解读: 2019年访问次数与需求有一定相关性")
# 3. 分析2: 模型建议的k vs 2019年实际
print(f"\n[3] 分析2: 模型建议的k vs 2019年实际访问次数")
# 将2019年访问次数缩放到730总次数
total_2019 = df['visits_2019'].sum()
df['visits_2019_scaled'] = df['visits_2019'] * 730 / total_2019
# 计算差异
df['k_diff'] = df['k'] - df['visits_2019_scaled']
df['k_diff_pct'] = df['k_diff'] / df['visits_2019_scaled'] * 100
# 统计
increased = (df['k_diff'] > 0.5).sum()
decreased = (df['k_diff'] < -0.5).sum()
unchanged = len(df) - increased - decreased
print(f" 相比2019缩放方案:")
print(f" - 增加访问的站点: {increased}")
print(f" - 减少访问的站点: {decreased}")
print(f" - 基本不变的站点: {unchanged}")
# 增加最多和减少最多的站点
print(f"\n 增加访问最多的5个站点:")
top_increase = df.nlargest(5, 'k_diff')[['site_id', 'site_name', 'mu', 'visits_2019', 'k', 'k_diff']]
for _, row in top_increase.iterrows():
print(f" {row['site_name'][:30]:<30}: 2019={row['visits_2019']}, 建议k={row['k']}, 差异={row['k_diff']:+.1f}")
print(f"\n 减少访问最多的5个站点:")
top_decrease = df.nsmallest(5, 'k_diff')[['site_id', 'site_name', 'mu', 'visits_2019', 'k', 'k_diff']]
for _, row in top_decrease.iterrows():
print(f" {row['site_name'][:30]:<30}: 2019={row['visits_2019']}, 建议k={row['k']}, 差异={row['k_diff']:+.1f}")
# 4. 分析3: 截断修正的合理性检验
print(f"\n[4] 分析3: 截断修正的合理性检验")
corrected_sites = df[df['is_corrected']]
uncorrected_sites = df[~df['is_corrected']]
print(f" 被修正站点 ({len(corrected_sites)} 个):")
print(f" - μ 均值: {corrected_sites['mu'].mean():.1f}")
print(f" - σ 均值: {corrected_sites['sigma'].mean():.1f}")
print(f" - CV (σ/μ) 均值: {(corrected_sites['sigma']/corrected_sites['mu']).mean():.3f}")
print(f" 未修正站点 ({len(uncorrected_sites)} 个):")
print(f" - μ 均值: {uncorrected_sites['mu'].mean():.1f}")
print(f" - σ 均值: {uncorrected_sites['sigma'].mean():.1f}")
print(f" - CV (σ/μ) 均值: {(uncorrected_sites['sigma']/uncorrected_sites['mu']).mean():.3f}")
# 5. 分析4: 模型一致性 - k是否与μ̃成正比
print(f"\n[5] 分析4: 模型一致性检验 (k 应与 μ̃ 成正比)")
# 理论上 k_i / μ̃_i 应该近似为常数
df['k_per_mu_tilde'] = df['k'] / df['mu_tilde']
cv_ratio = df['k_per_mu_tilde'].std() / df['k_per_mu_tilde'].mean()
print(f" k / μ̃ 的统计:")
print(f" - 均值: {df['k_per_mu_tilde'].mean():.4f}")
print(f" - 标准差: {df['k_per_mu_tilde'].std():.4f}")
print(f" - CV (变异系数): {cv_ratio:.4f}")
if cv_ratio < 0.1:
print(f" 解读: k 与 μ̃ 高度成正比 (CV < 0.1),模型内部一致")
else:
print(f" 解读: 存在一些偏差,主要来自整数化舍入")
# 6. 分析5: 与2019方案的总服务量对比
print(f"\n[6] 分析5: 服务量对比")
service_2019 = (df['visits_2019'] * df['mu']).sum()
service_2019_scaled = (df['visits_2019_scaled'] * df['mu']).sum()
service_model = (df['k'] * df['mu']).sum()
print(f" 2019年实际服务量 (722次): {service_2019:.0f}")
print(f" 2019年缩放服务量 (730次): {service_2019_scaled:.0f}")
print(f" 模型建议服务量 (730次): {service_model:.0f}")
print(f" 模型改进: {(service_model/service_2019_scaled - 1)*100:+.2f}%")
# 7. 保存输出
print(f"\n[7] 保存输出: {OUTPUT_PATH}")
with pd.ExcelWriter(OUTPUT_PATH, engine='openpyxl') as writer:
# Sheet 1: 站点级对比
output_cols = ['site_id', 'site_name', 'mu', 'sigma', 'mu_tilde', 'is_corrected',
'visits_2019', 'visits_2019_scaled', 'k', 'k_diff', 'k_diff_pct', 'k_per_mu_tilde']
df[output_cols].to_excel(writer, sheet_name='site_comparison', index=False)
# Sheet 2: 汇总指标
summary = pd.DataFrame([
{'metric': 'corr(visits_2019, μ)', 'value': corr_2019_mu[0], 'p_value': corr_2019_mu[1]},
{'metric': 'corr(visits_2019, μ̃)', 'value': corr_2019_mu_tilde[0], 'p_value': corr_2019_mu_tilde[1]},
{'metric': 'sites_increased', 'value': increased, 'p_value': None},
{'metric': 'sites_decreased', 'value': decreased, 'p_value': None},
{'metric': 'sites_unchanged', 'value': unchanged, 'p_value': None},
{'metric': 'CV(k/μ̃)', 'value': cv_ratio, 'p_value': None},
{'metric': 'service_2019_actual', 'value': service_2019, 'p_value': None},
{'metric': 'service_model', 'value': service_model, 'p_value': None},
{'metric': 'improvement_pct', 'value': (service_model/service_2019_scaled - 1)*100, 'p_value': None},
])
summary.to_excel(writer, sheet_name='summary_metrics', index=False)
# Sheet 3: 修正站点详情
corrected_sites[['site_id', 'site_name', 'mu', 'sigma', 'p_trunc', 'mu_tilde']].to_excel(
writer, sheet_name='corrected_sites', index=False
)
print(f" 已保存3个工作表")
print("\n" + "=" * 60)
print("Step 07 完成")
print("=" * 60)
return df
if __name__ == "__main__":
main()