Files
mcm-mfp/task1/03_allocate.py
2026-01-19 10:14:46 +08:00

156 lines
5.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Step 03: 频次分配 - Hamilton最大余数法
输入: 02_demand.xlsx
输出: 03_allocate.xlsx
功能:
1. 按真实需求 μ̃ 比例分配年度访问次数 k_i
2. 使用 Hamilton 方法保证整数分配且总和 = N
3. 满足覆盖约束: k_i >= 1
分配原则:
- 先给每个站点分配1次 (覆盖约束)
- 剩余 N-70 次按 μ̃ 比例分配
"""
import pandas as pd
import numpy as np
from pathlib import Path
# 路径配置
INPUT_PATH = Path(__file__).parent / "02_demand.xlsx"
OUTPUT_PATH = Path(__file__).parent / "03_allocate.xlsx"
# 分配参数
N_TOTAL = 730 # 年度总访问次数 (365天 × 2站点/天)
MIN_VISITS = 1 # 每站点最少访问次数 (覆盖约束)
def hamilton_allocation(total: int, weights: list) -> list:
"""
Hamilton最大余数法整数分配
Args:
total: 待分配总数
weights: 各项权重
Returns:
整数分配结果列表
"""
n = len(weights)
w_sum = sum(weights)
# 连续配额
quotas = [total * w / w_sum for w in weights]
# 下取整
floors = [int(q) for q in quotas]
remainders = [q - f for q, f in zip(quotas, floors)]
# 剩余席位按余数从大到小分配
leftover = total - sum(floors)
indices = sorted(range(n), key=lambda i: -remainders[i])
for i in indices[:leftover]:
floors[i] += 1
return floors
def main():
print("=" * 60)
print("Step 03: 频次分配 - Hamilton最大余数法")
print("=" * 60)
# 1. 读取需求修正后的数据
print(f"\n[1] 读取输入: {INPUT_PATH}")
df = pd.read_excel(INPUT_PATH)
print(f" 读取 {len(df)} 条记录")
n_sites = len(df)
# 2. 显示参数
print(f"\n[2] 分配参数:")
print(f" 年度总访问次数 N = {N_TOTAL}")
print(f" 站点数 = {n_sites}")
print(f" 覆盖约束: 每站点至少 {MIN_VISITS}")
print(f" 剩余可分配次数 = {N_TOTAL} - {n_sites} × {MIN_VISITS} = {N_TOTAL - n_sites * MIN_VISITS}")
# 3. Hamilton分配
print(f"\n[3] 执行Hamilton分配...")
# 权重 = 修正后的真实需求 μ̃
weights = df['mu_tilde'].tolist()
# 分配剩余次数
extra_visits = N_TOTAL - n_sites * MIN_VISITS
k_extra = hamilton_allocation(extra_visits, weights)
# 总访问次数 = 基础 + 额外
df['k'] = [MIN_VISITS + ke for ke in k_extra]
# 4. 验证分配结果
print(f"\n[4] 分配结果验证:")
print(f" 总访问次数: Σk_i = {df['k'].sum()} (应为 {N_TOTAL})")
print(f" 最小访问次数: min(k_i) = {df['k'].min()} (应 >= {MIN_VISITS})")
print(f" 最大访问次数: max(k_i) = {df['k'].max()}")
print(f" 访问次数范围: [{df['k'].min()}, {df['k'].max()}]")
assert df['k'].sum() == N_TOTAL, f"总访问次数不等于{N_TOTAL}"
assert df['k'].min() >= MIN_VISITS, f"存在站点访问次数少于{MIN_VISITS}"
# 5. 计算满足率
# r_i = k_i * μ_i / μ̃_i (年度服务量 / 真实需求)
df['annual_service'] = df['k'] * df['mu'] # 年度预期服务量
df['r'] = df['annual_service'] / df['mu_tilde'] # 满足率代理
print(f"\n[5] 满足率统计:")
print(f" 满足率 r = k × μ / μ̃")
print(f" r 均值: {df['r'].mean():.2f}")
print(f" r 标准差: {df['r'].std():.2f}")
print(f" r 范围: [{df['r'].min():.2f}, {df['r'].max():.2f}]")
print(f" r 变异系数: {df['r'].std() / df['r'].mean():.4f}")
# 6. 分配结果分布
print(f"\n[6] 访问次数分布:")
k_counts = df['k'].value_counts().sort_index()
for k_val, count in k_counts.items():
print(f" k = {k_val:2d}: {count:2d} 个站点 {'' * count}")
# 7. 与2019年对比
print(f"\n[7] 与2019年访问次数对比:")
df['k_2019_scaled'] = df['visits_2019'] * N_TOTAL / df['visits_2019'].sum()
df['k_diff'] = df['k'] - df['k_2019_scaled']
print(f" 2019年总访问次数: {df['visits_2019'].sum()}")
print(f" 2019年缩放后总次数: {df['k_2019_scaled'].sum():.1f}")
print(f" 新方案 vs 2019缩放:")
print(f" - 增加访问的站点: {(df['k_diff'] > 0.5).sum()}")
print(f" - 减少访问的站点: {(df['k_diff'] < -0.5).sum()}")
print(f" - 基本不变的站点: {((df['k_diff'] >= -0.5) & (df['k_diff'] <= 0.5)).sum()}")
# 8. 保存输出
print(f"\n[8] 保存输出: {OUTPUT_PATH}")
output_cols = ['site_id', 'site_name', 'lat', 'lon', 'visits_2019',
'mu', 'sigma', 'mu_tilde', 'k', 'annual_service', 'r']
df[output_cols].to_excel(OUTPUT_PATH, index=False)
print(f" 已保存 {len(df)} 条记录")
# 9. 输出预览
print(f"\n[9] 分配结果预览 (k 最高的10个站点):")
top10 = df.nlargest(10, 'k')[['site_id', 'site_name', 'mu', 'mu_tilde', 'k', 'annual_service', 'r']]
print(top10.to_string(index=False))
print(f"\n 分配结果预览 (k 最低的10个站点):")
bottom10 = df.nsmallest(10, 'k')[['site_id', 'site_name', 'mu', 'mu_tilde', 'k', 'annual_service', 'r']]
print(bottom10.to_string(index=False))
print("\n" + "=" * 60)
print("Step 03 完成")
print("=" * 60)
return df
if __name__ == "__main__":
main()