""" Step 03: 频次分配 - Hamilton最大余数法 输入: 02_demand.xlsx 输出: 03_allocate.xlsx 功能: 1. 按真实需求 μ̃ 比例分配年度访问次数 k_i 2. 使用 Hamilton 方法保证整数分配且总和 = N 3. 满足覆盖约束: k_i >= 1 分配原则: - 先给每个站点分配1次 (覆盖约束) - 剩余 N-70 次按 μ̃ 比例分配 """ import pandas as pd import numpy as np from pathlib import Path # 路径配置 INPUT_PATH = Path(__file__).parent / "02_demand.xlsx" OUTPUT_PATH = Path(__file__).parent / "03_allocate.xlsx" # 分配参数 N_TOTAL = 730 # 年度总访问次数 (365天 × 2站点/天) MIN_VISITS = 1 # 每站点最少访问次数 (覆盖约束) def hamilton_allocation(total: int, weights: list) -> list: """ Hamilton最大余数法整数分配 Args: total: 待分配总数 weights: 各项权重 Returns: 整数分配结果列表 """ n = len(weights) w_sum = sum(weights) # 连续配额 quotas = [total * w / w_sum for w in weights] # 下取整 floors = [int(q) for q in quotas] remainders = [q - f for q, f in zip(quotas, floors)] # 剩余席位按余数从大到小分配 leftover = total - sum(floors) indices = sorted(range(n), key=lambda i: -remainders[i]) for i in indices[:leftover]: floors[i] += 1 return floors def main(): print("=" * 60) print("Step 03: 频次分配 - Hamilton最大余数法") print("=" * 60) # 1. 读取需求修正后的数据 print(f"\n[1] 读取输入: {INPUT_PATH}") df = pd.read_excel(INPUT_PATH) print(f" 读取 {len(df)} 条记录") n_sites = len(df) # 2. 显示参数 print(f"\n[2] 分配参数:") print(f" 年度总访问次数 N = {N_TOTAL}") print(f" 站点数 = {n_sites}") print(f" 覆盖约束: 每站点至少 {MIN_VISITS} 次") print(f" 剩余可分配次数 = {N_TOTAL} - {n_sites} × {MIN_VISITS} = {N_TOTAL - n_sites * MIN_VISITS}") # 3. Hamilton分配 print(f"\n[3] 执行Hamilton分配...") # 权重 = 修正后的真实需求 μ̃ weights = df['mu_tilde'].tolist() # 分配剩余次数 extra_visits = N_TOTAL - n_sites * MIN_VISITS k_extra = hamilton_allocation(extra_visits, weights) # 总访问次数 = 基础 + 额外 df['k'] = [MIN_VISITS + ke for ke in k_extra] # 4. 验证分配结果 print(f"\n[4] 分配结果验证:") print(f" 总访问次数: Σk_i = {df['k'].sum()} (应为 {N_TOTAL})") print(f" 最小访问次数: min(k_i) = {df['k'].min()} (应 >= {MIN_VISITS})") print(f" 最大访问次数: max(k_i) = {df['k'].max()}") print(f" 访问次数范围: [{df['k'].min()}, {df['k'].max()}]") assert df['k'].sum() == N_TOTAL, f"总访问次数不等于{N_TOTAL}" assert df['k'].min() >= MIN_VISITS, f"存在站点访问次数少于{MIN_VISITS}" # 5. 计算满足率 # r_i = k_i * μ_i / μ̃_i (年度服务量 / 真实需求) df['annual_service'] = df['k'] * df['mu'] # 年度预期服务量 df['r'] = df['annual_service'] / df['mu_tilde'] # 满足率代理 print(f"\n[5] 满足率统计:") print(f" 满足率 r = k × μ / μ̃") print(f" r 均值: {df['r'].mean():.2f}") print(f" r 标准差: {df['r'].std():.2f}") print(f" r 范围: [{df['r'].min():.2f}, {df['r'].max():.2f}]") print(f" r 变异系数: {df['r'].std() / df['r'].mean():.4f}") # 6. 分配结果分布 print(f"\n[6] 访问次数分布:") k_counts = df['k'].value_counts().sort_index() for k_val, count in k_counts.items(): print(f" k = {k_val:2d}: {count:2d} 个站点 {'█' * count}") # 7. 与2019年对比 print(f"\n[7] 与2019年访问次数对比:") df['k_2019_scaled'] = df['visits_2019'] * N_TOTAL / df['visits_2019'].sum() df['k_diff'] = df['k'] - df['k_2019_scaled'] print(f" 2019年总访问次数: {df['visits_2019'].sum()}") print(f" 2019年缩放后总次数: {df['k_2019_scaled'].sum():.1f}") print(f" 新方案 vs 2019缩放:") print(f" - 增加访问的站点: {(df['k_diff'] > 0.5).sum()} 个") print(f" - 减少访问的站点: {(df['k_diff'] < -0.5).sum()} 个") print(f" - 基本不变的站点: {((df['k_diff'] >= -0.5) & (df['k_diff'] <= 0.5)).sum()} 个") # 8. 保存输出 print(f"\n[8] 保存输出: {OUTPUT_PATH}") output_cols = ['site_id', 'site_name', 'lat', 'lon', 'visits_2019', 'mu', 'sigma', 'mu_tilde', 'k', 'annual_service', 'r'] df[output_cols].to_excel(OUTPUT_PATH, index=False) print(f" 已保存 {len(df)} 条记录") # 9. 输出预览 print(f"\n[9] 分配结果预览 (k 最高的10个站点):") top10 = df.nlargest(10, 'k')[['site_id', 'site_name', 'mu', 'mu_tilde', 'k', 'annual_service', 'r']] print(top10.to_string(index=False)) print(f"\n 分配结果预览 (k 最低的10个站点):") bottom10 = df.nsmallest(10, 'k')[['site_id', 'site_name', 'mu', 'mu_tilde', 'k', 'annual_service', 'r']] print(bottom10.to_string(index=False)) print("\n" + "=" * 60) print("Step 03 完成") print("=" * 60) return df if __name__ == "__main__": main()