156 lines
5.2 KiB
Python
156 lines
5.2 KiB
Python
"""
|
||
Step 03: 频次分配 - Hamilton最大余数法
|
||
|
||
输入: 02_demand.xlsx
|
||
输出: 03_allocate.xlsx
|
||
|
||
功能:
|
||
1. 按真实需求 μ̃ 比例分配年度访问次数 k_i
|
||
2. 使用 Hamilton 方法保证整数分配且总和 = N
|
||
3. 满足覆盖约束: k_i >= 1
|
||
|
||
分配原则:
|
||
- 先给每个站点分配1次 (覆盖约束)
|
||
- 剩余 N-70 次按 μ̃ 比例分配
|
||
"""
|
||
|
||
import pandas as pd
|
||
import numpy as np
|
||
from pathlib import Path
|
||
|
||
# 路径配置
|
||
INPUT_PATH = Path(__file__).parent / "02_demand.xlsx"
|
||
OUTPUT_PATH = Path(__file__).parent / "03_allocate.xlsx"
|
||
|
||
# 分配参数
|
||
N_TOTAL = 730 # 年度总访问次数 (365天 × 2站点/天)
|
||
MIN_VISITS = 1 # 每站点最少访问次数 (覆盖约束)
|
||
|
||
|
||
def hamilton_allocation(total: int, weights: list) -> list:
|
||
"""
|
||
Hamilton最大余数法整数分配
|
||
|
||
Args:
|
||
total: 待分配总数
|
||
weights: 各项权重
|
||
|
||
Returns:
|
||
整数分配结果列表
|
||
"""
|
||
n = len(weights)
|
||
w_sum = sum(weights)
|
||
|
||
# 连续配额
|
||
quotas = [total * w / w_sum for w in weights]
|
||
|
||
# 下取整
|
||
floors = [int(q) for q in quotas]
|
||
remainders = [q - f for q, f in zip(quotas, floors)]
|
||
|
||
# 剩余席位按余数从大到小分配
|
||
leftover = total - sum(floors)
|
||
indices = sorted(range(n), key=lambda i: -remainders[i])
|
||
for i in indices[:leftover]:
|
||
floors[i] += 1
|
||
|
||
return floors
|
||
|
||
|
||
def main():
|
||
print("=" * 60)
|
||
print("Step 03: 频次分配 - Hamilton最大余数法")
|
||
print("=" * 60)
|
||
|
||
# 1. 读取需求修正后的数据
|
||
print(f"\n[1] 读取输入: {INPUT_PATH}")
|
||
df = pd.read_excel(INPUT_PATH)
|
||
print(f" 读取 {len(df)} 条记录")
|
||
|
||
n_sites = len(df)
|
||
|
||
# 2. 显示参数
|
||
print(f"\n[2] 分配参数:")
|
||
print(f" 年度总访问次数 N = {N_TOTAL}")
|
||
print(f" 站点数 = {n_sites}")
|
||
print(f" 覆盖约束: 每站点至少 {MIN_VISITS} 次")
|
||
print(f" 剩余可分配次数 = {N_TOTAL} - {n_sites} × {MIN_VISITS} = {N_TOTAL - n_sites * MIN_VISITS}")
|
||
|
||
# 3. Hamilton分配
|
||
print(f"\n[3] 执行Hamilton分配...")
|
||
|
||
# 权重 = 修正后的真实需求 μ̃
|
||
weights = df['mu_tilde'].tolist()
|
||
|
||
# 分配剩余次数
|
||
extra_visits = N_TOTAL - n_sites * MIN_VISITS
|
||
k_extra = hamilton_allocation(extra_visits, weights)
|
||
|
||
# 总访问次数 = 基础 + 额外
|
||
df['k'] = [MIN_VISITS + ke for ke in k_extra]
|
||
|
||
# 4. 验证分配结果
|
||
print(f"\n[4] 分配结果验证:")
|
||
print(f" 总访问次数: Σk_i = {df['k'].sum()} (应为 {N_TOTAL})")
|
||
print(f" 最小访问次数: min(k_i) = {df['k'].min()} (应 >= {MIN_VISITS})")
|
||
print(f" 最大访问次数: max(k_i) = {df['k'].max()}")
|
||
print(f" 访问次数范围: [{df['k'].min()}, {df['k'].max()}]")
|
||
|
||
assert df['k'].sum() == N_TOTAL, f"总访问次数不等于{N_TOTAL}"
|
||
assert df['k'].min() >= MIN_VISITS, f"存在站点访问次数少于{MIN_VISITS}"
|
||
|
||
# 5. 计算满足率
|
||
# r_i = k_i * μ_i / μ̃_i (年度服务量 / 真实需求)
|
||
df['annual_service'] = df['k'] * df['mu'] # 年度预期服务量
|
||
df['r'] = df['annual_service'] / df['mu_tilde'] # 满足率代理
|
||
|
||
print(f"\n[5] 满足率统计:")
|
||
print(f" 满足率 r = k × μ / μ̃")
|
||
print(f" r 均值: {df['r'].mean():.2f}")
|
||
print(f" r 标准差: {df['r'].std():.2f}")
|
||
print(f" r 范围: [{df['r'].min():.2f}, {df['r'].max():.2f}]")
|
||
print(f" r 变异系数: {df['r'].std() / df['r'].mean():.4f}")
|
||
|
||
# 6. 分配结果分布
|
||
print(f"\n[6] 访问次数分布:")
|
||
k_counts = df['k'].value_counts().sort_index()
|
||
for k_val, count in k_counts.items():
|
||
print(f" k = {k_val:2d}: {count:2d} 个站点 {'█' * count}")
|
||
|
||
# 7. 与2019年对比
|
||
print(f"\n[7] 与2019年访问次数对比:")
|
||
df['k_2019_scaled'] = df['visits_2019'] * N_TOTAL / df['visits_2019'].sum()
|
||
df['k_diff'] = df['k'] - df['k_2019_scaled']
|
||
print(f" 2019年总访问次数: {df['visits_2019'].sum()}")
|
||
print(f" 2019年缩放后总次数: {df['k_2019_scaled'].sum():.1f}")
|
||
print(f" 新方案 vs 2019缩放:")
|
||
print(f" - 增加访问的站点: {(df['k_diff'] > 0.5).sum()} 个")
|
||
print(f" - 减少访问的站点: {(df['k_diff'] < -0.5).sum()} 个")
|
||
print(f" - 基本不变的站点: {((df['k_diff'] >= -0.5) & (df['k_diff'] <= 0.5)).sum()} 个")
|
||
|
||
# 8. 保存输出
|
||
print(f"\n[8] 保存输出: {OUTPUT_PATH}")
|
||
output_cols = ['site_id', 'site_name', 'lat', 'lon', 'visits_2019',
|
||
'mu', 'sigma', 'mu_tilde', 'k', 'annual_service', 'r']
|
||
df[output_cols].to_excel(OUTPUT_PATH, index=False)
|
||
print(f" 已保存 {len(df)} 条记录")
|
||
|
||
# 9. 输出预览
|
||
print(f"\n[9] 分配结果预览 (k 最高的10个站点):")
|
||
top10 = df.nlargest(10, 'k')[['site_id', 'site_name', 'mu', 'mu_tilde', 'k', 'annual_service', 'r']]
|
||
print(top10.to_string(index=False))
|
||
|
||
print(f"\n 分配结果预览 (k 最低的10个站点):")
|
||
bottom10 = df.nsmallest(10, 'k')[['site_id', 'site_name', 'mu', 'mu_tilde', 'k', 'annual_service', 'r']]
|
||
print(bottom10.to_string(index=False))
|
||
|
||
print("\n" + "=" * 60)
|
||
print("Step 03 完成")
|
||
print("=" * 60)
|
||
|
||
return df
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|