""" Step 05: 日历排程 - 贪心装箱算法 输入: 03_allocate.xlsx 输出: 05_schedule.xlsx 功能: 1. 将年度频次 k_i 转化为具体日期 2. 保证每天恰好2个站点 3. 优化访问间隔的均匀性 4. 输出完整的365天日历 约束: - 每天恰好2个站点 - 每站点出现次数 = k_i - 同一站点相邻访问间隔尽量均匀 """ import pandas as pd import numpy as np from pathlib import Path from collections import defaultdict import random # 路径配置 INPUT_PATH = Path(__file__).parent / "03_allocate.xlsx" OUTPUT_PATH = Path(__file__).parent / "05_schedule.xlsx" # 排程参数 T = 365 # 全年天数 CAPACITY = 2 # 每天站点数 RANDOM_SEED = 42 # 随机种子 (用于局部优化) def generate_ideal_dates(k: int, T: int = 365) -> list: """ 生成站点的理想访问日期 将 k 次访问均匀分布在 [1, T] 内 t_j = round((j + 0.5) * T / k), j = 0, 1, ..., k-1 """ dates = [] for j in range(k): ideal_day = round((j + 0.5) * T / k) ideal_day = max(1, min(T, ideal_day)) dates.append(ideal_day) return dates def greedy_schedule(site_visits: dict, T: int = 365, capacity: int = 2) -> dict: """ 贪心装箱算法 Args: site_visits: {site_id: k} 各站点的年度访问次数 T: 全年天数 capacity: 每天站点容量 Returns: calendar: {day: [site_id, ...]} 日历排程 """ # 生成所有访问事件: (理想日期, 站点ID) events = [] for site_id, k in site_visits.items(): ideal_dates = generate_ideal_dates(k, T) for ideal_day in ideal_dates: events.append((ideal_day, site_id)) # 按理想日期排序 events.sort(key=lambda x: (x[0], x[1])) # 初始化日历 calendar = {day: [] for day in range(1, T + 1)} # 贪心分配 for ideal_day, site_id in events: assigned = False # 从理想日期向两侧搜索可用槽位 for offset in range(T): for day in [ideal_day + offset, ideal_day - offset]: if 1 <= day <= T: # 检查容量和重复 if len(calendar[day]) < capacity and site_id not in calendar[day]: calendar[day].append(site_id) assigned = True break if assigned: break if not assigned: print(f"警告: 无法分配站点 {site_id} (理想日期 {ideal_day})") return calendar def compute_gap_stats(calendar: dict, site_id: int) -> dict: """计算单个站点的访问间隔统计""" days = sorted([day for day, sites in calendar.items() if site_id in sites]) if len(days) < 2: return { 'n_visits': len(days), 'gaps': [], 'gap_mean': None, 'gap_std': None, 'gap_min': None, 'gap_max': None, 'gap_cv': None } gaps = [days[i + 1] - days[i] for i in range(len(days) - 1)] return { 'n_visits': len(days), 'gaps': gaps, 'gap_mean': np.mean(gaps), 'gap_std': np.std(gaps), 'gap_min': min(gaps), 'gap_max': max(gaps), 'gap_cv': np.std(gaps) / np.mean(gaps) if np.mean(gaps) > 0 else 0 } def local_optimization(calendar: dict, site_ids: list, max_iter: int = 5000, seed: int = 42) -> dict: """ 局部搜索优化间隔均匀性 通过随机交换两天的站点,若改善总间隔方差则接受 """ random.seed(seed) calendar = {day: list(sites) for day, sites in calendar.items()} # 深拷贝 def total_gap_variance(): """计算所有站点间隔方差之和""" total_var = 0 for site_id in site_ids: stats = compute_gap_stats(calendar, site_id) if stats['gap_std'] is not None: total_var += stats['gap_std'] ** 2 return total_var current_var = total_gap_variance() improved = 0 for iteration in range(max_iter): # 随机选两天 t1, t2 = random.sample(range(1, 366), 2) if len(calendar[t1]) == 2 and len(calendar[t2]) == 2: # 随机选择交换位置 pos1, pos2 = random.randint(0, 1), random.randint(0, 1) s1, s2 = calendar[t1][pos1], calendar[t2][pos2] # 检查交换可行性 (不能产生重复) if s1 != s2: other1 = calendar[t1][1 - pos1] other2 = calendar[t2][1 - pos2] if s2 != other1 and s1 != other2: # 尝试交换 calendar[t1][pos1], calendar[t2][pos2] = s2, s1 new_var = total_gap_variance() if new_var < current_var: current_var = new_var improved += 1 else: # 撤销 calendar[t1][pos1], calendar[t2][pos2] = s1, s2 return calendar, improved def main(): print("=" * 60) print("Step 05: 日历排程 - 贪心装箱算法") print("=" * 60) # 1. 读取分配结果 print(f"\n[1] 读取输入: {INPUT_PATH}") df = pd.read_excel(INPUT_PATH) print(f" 读取 {len(df)} 条记录") # 构建 site_visits 字典 site_visits = dict(zip(df['site_id'], df['k'])) total_visits = sum(site_visits.values()) print(f" 总访问次数: {total_visits}") print(f" 期望日历天数: {total_visits // CAPACITY} 天") # 2. 执行贪心排程 print(f"\n[2] 执行贪心装箱排程...") calendar = greedy_schedule(site_visits, T, CAPACITY) # 验证 total_assigned = sum(len(sites) for sites in calendar.values()) print(f" 已分配访问事件: {total_assigned} / {total_visits}") empty_days = sum(1 for sites in calendar.values() if len(sites) == 0) partial_days = sum(1 for sites in calendar.values() if len(sites) == 1) full_days = sum(1 for sites in calendar.values() if len(sites) == 2) print(f" 日历统计: {full_days} 满载 + {partial_days} 部分 + {empty_days} 空闲") # 3. 局部优化 print(f"\n[3] 局部优化 (改善间隔均匀性)...") site_ids = list(site_visits.keys()) calendar_opt, n_improved = local_optimization(calendar, site_ids, max_iter=5000, seed=RANDOM_SEED) print(f" 优化迭代: 5000 次") print(f" 接受的改进: {n_improved} 次") # 4. 间隔统计 print(f"\n[4] 访问间隔统计") gap_stats_list = [] for site_id in site_ids: stats = compute_gap_stats(calendar_opt, site_id) stats['site_id'] = site_id gap_stats_list.append(stats) df_gaps = pd.DataFrame(gap_stats_list) df_gaps = df_gaps.merge(df[['site_id', 'site_name', 'k']], on='site_id') # 全局统计 valid_gaps = df_gaps[df_gaps['gap_mean'].notna()] print(f" 平均间隔均值: {valid_gaps['gap_mean'].mean():.2f} 天") print(f" 平均间隔标准差: {valid_gaps['gap_std'].mean():.2f} 天") print(f" 最大单次间隔: {valid_gaps['gap_max'].max():.0f} 天") print(f" 平均间隔CV: {valid_gaps['gap_cv'].mean():.4f}") # 5. 生成日历输出 print(f"\n[5] 生成日历输出...") # 日历表: date, site_1, site_2 calendar_rows = [] for day in range(1, T + 1): sites = calendar_opt.get(day, []) site_1 = sites[0] if len(sites) > 0 else None site_2 = sites[1] if len(sites) > 1 else None calendar_rows.append({ 'day': day, 'site_1_id': site_1, 'site_2_id': site_2 }) df_calendar = pd.DataFrame(calendar_rows) # 添加站点名称 site_name_map = dict(zip(df['site_id'], df['site_name'])) df_calendar['site_1_name'] = df_calendar['site_1_id'].map(site_name_map) df_calendar['site_2_name'] = df_calendar['site_2_id'].map(site_name_map) # 6. 站点日期列表 site_dates = [] for site_id in site_ids: days = sorted([day for day, sites in calendar_opt.items() if site_id in sites]) site_dates.append({ 'site_id': site_id, 'site_name': site_name_map[site_id], 'k': len(days), 'dates': ','.join(map(str, days)) }) df_site_dates = pd.DataFrame(site_dates) # 7. 保存输出 print(f"\n[6] 保存输出: {OUTPUT_PATH}") with pd.ExcelWriter(OUTPUT_PATH, engine='openpyxl') as writer: # Sheet 1: 日历 (365天) df_calendar.to_excel(writer, sheet_name='calendar', index=False) # Sheet 2: 站点日期列表 df_site_dates.to_excel(writer, sheet_name='site_dates', index=False) # Sheet 3: 间隔统计 df_gaps_out = df_gaps[['site_id', 'site_name', 'k', 'n_visits', 'gap_mean', 'gap_std', 'gap_min', 'gap_max', 'gap_cv']] df_gaps_out.to_excel(writer, sheet_name='gap_statistics', index=False) # Sheet 4: 排程参数 params = pd.DataFrame([ {'parameter': 'T (days)', 'value': T}, {'parameter': 'CAPACITY (sites/day)', 'value': CAPACITY}, {'parameter': 'total_visits', 'value': total_visits}, {'parameter': 'optimization_iterations', 'value': 5000}, {'parameter': 'improvements_accepted', 'value': n_improved}, ]) params.to_excel(writer, sheet_name='parameters', index=False) print(f" 已保存4个工作表: calendar, site_dates, gap_statistics, parameters") # 8. 输出预览 print(f"\n[7] 日历预览 (前10天):") print(df_calendar.head(10).to_string(index=False)) print(f"\n 间隔最大的5个站点:") top5_gap = df_gaps.nlargest(5, 'gap_max')[['site_id', 'site_name', 'k', 'gap_mean', 'gap_max', 'gap_cv']] print(top5_gap.to_string(index=False)) print("\n" + "=" * 60) print("Step 05 完成") print("=" * 60) return df_calendar, df_gaps if __name__ == "__main__": main()