Files
mcm-mfp/task3/05_calendar.py

333 lines
11 KiB
Python
Raw Permalink Normal View History

2026-01-19 11:57:19 +08:00
"""
Task 3 - Step 5: 日历排程生成
=============================
输入:
- 04_reschedule.xlsx (站点访问次数 + 配对访问明细)
输出: 05_calendar.xlsx (365天的完整排程)
排程逻辑:
1. 生成所有访问事件单站点 + 双站点
2. 为每个事件计算理想日期均匀分布
3. 贪心分配到365天每天2个事件槽位
4. 优先安排双站点访问时间较长
约束:
- 每天恰好2个访问事件
- 同一站点的访问尽量均匀分布
"""
import pandas as pd
import numpy as np
from collections import defaultdict
# ============================================
# 参数设置
# ============================================
INPUT_FILE = '04_reschedule.xlsx'
OUTPUT_FILE = '05_calendar.xlsx'
DAYS_PER_YEAR = 365
EVENTS_PER_DAY = 2
TOTAL_EVENTS = DAYS_PER_YEAR * EVENTS_PER_DAY # 730
# ============================================
# 读取数据
# ============================================
print("=" * 60)
print("Task 3 - Step 5: 日历排程生成")
print("=" * 60)
# 读取站点访问次数
df_sites = pd.read_excel(INPUT_FILE, sheet_name='sites_schedule')
print(f"\n读取站点数据: {len(df_sites)} 个站点")
# 读取配对访问
df_pairs = pd.read_excel(INPUT_FILE, sheet_name='pair_visits')
print(f"读取配对数据: {len(df_pairs)}")
# ============================================
# 生成访问事件
# ============================================
print(f"\n" + "-" * 40)
print("生成访问事件")
print("-" * 40)
events = []
event_id = 0
# 1. 生成单站点访问事件
for _, row in df_sites.iterrows():
site_id = row['site_id']
site_name = row['site_name']
k_single = int(row['k_single_final'])
for visit_num in range(k_single):
# 计算理想日期(均匀分布)
ideal_day = (visit_num + 0.5) * DAYS_PER_YEAR / k_single
events.append({
'event_id': event_id,
'event_type': 'single',
'site_i_id': site_id,
'site_j_id': None,
'site_i_name': site_name,
'site_j_name': None,
'visit_num': visit_num + 1,
'total_visits': k_single,
'ideal_day': ideal_day,
'priority': 0 # 单站点优先级较低
})
event_id += 1
# 2. 生成双站点访问事件
for _, row in df_pairs.iterrows():
site_i_id = row['site_i_id']
site_j_id = row['site_j_id']
site_i_name = row['site_i_name']
site_j_name = row['site_j_name']
k_dual = int(row['k_dual'])
for visit_num in range(k_dual):
ideal_day = (visit_num + 0.5) * DAYS_PER_YEAR / k_dual
events.append({
'event_id': event_id,
'event_type': 'dual',
'site_i_id': site_i_id,
'site_j_id': site_j_id,
'site_i_name': site_i_name,
'site_j_name': site_j_name,
'visit_num': visit_num + 1,
'total_visits': k_dual,
'ideal_day': ideal_day,
'priority': 1 # 双站点优先级较高
})
event_id += 1
df_events = pd.DataFrame(events)
print(f"总访问事件数: {len(df_events)}")
print(f" - 单站点: {(df_events['event_type'] == 'single').sum()}")
print(f" - 双站点: {(df_events['event_type'] == 'dual').sum()}")
# ============================================
# 贪心排程算法
# ============================================
print(f"\n" + "-" * 40)
print("执行贪心排程")
print("-" * 40)
# 按理想日期和优先级排序
df_events_sorted = df_events.sort_values(
['ideal_day', 'priority'],
ascending=[True, False]
).reset_index(drop=True)
# 初始化日历槽位
calendar = {day: [] for day in range(1, DAYS_PER_YEAR + 1)}
# 记录每个站点最后访问的日期
last_visit = defaultdict(lambda: -float('inf'))
# 贪心分配
assigned_day = []
for idx, event in df_events_sorted.iterrows():
ideal = event['ideal_day']
site_i = event['site_i_id']
site_j = event['site_j_id']
# 寻找最佳可用日期
best_day = None
best_score = float('inf')
# 搜索范围:理想日期附近
search_start = max(1, int(ideal) - 30)
search_end = min(DAYS_PER_YEAR, int(ideal) + 30)
for day in range(search_start, search_end + 1):
# 检查槽位是否可用
if len(calendar[day]) >= EVENTS_PER_DAY:
continue
# 检查同一站点是否已在当天访问
sites_on_day = set()
for e in calendar[day]:
sites_on_day.add(e['site_i_id'])
if e['site_j_id'] is not None:
sites_on_day.add(e['site_j_id'])
if site_i in sites_on_day:
continue
if site_j is not None and site_j in sites_on_day:
continue
# 计算得分(理想日期偏差 + 间隔惩罚)
day_diff = abs(day - ideal)
# 间隔惩罚:鼓励与上次访问保持距离
min_gap = min(
day - last_visit[site_i],
day - last_visit[site_j] if site_j is not None else float('inf')
)
gap_penalty = max(0, 7 - min_gap) * 2 # 7天内再次访问有惩罚
score = day_diff + gap_penalty
if score < best_score:
best_score = score
best_day = day
# 如果附近没找到,扩大搜索范围
if best_day is None:
for day in range(1, DAYS_PER_YEAR + 1):
if len(calendar[day]) < EVENTS_PER_DAY:
sites_on_day = set()
for e in calendar[day]:
sites_on_day.add(e['site_i_id'])
if e['site_j_id'] is not None:
sites_on_day.add(e['site_j_id'])
if site_i not in sites_on_day:
if site_j is None or site_j not in sites_on_day:
best_day = day
break
if best_day is None:
print(f"警告: 无法分配事件 {event['event_id']}")
best_day = 1 # 强制分配
# 分配到日历
calendar[best_day].append(event.to_dict())
assigned_day.append(best_day)
# 更新最后访问日期
last_visit[site_i] = best_day
if site_j is not None:
last_visit[site_j] = best_day
df_events_sorted['assigned_day'] = assigned_day
# ============================================
# 生成日历视图
# ============================================
print(f"\n" + "-" * 40)
print("生成日历视图")
print("-" * 40)
calendar_rows = []
for day in range(1, DAYS_PER_YEAR + 1):
events_on_day = calendar[day]
row = {'day': day}
for slot in range(EVENTS_PER_DAY):
if slot < len(events_on_day):
e = events_on_day[slot]
row[f'slot_{slot+1}_type'] = e['event_type']
row[f'slot_{slot+1}_site_i'] = e['site_i_id']
row[f'slot_{slot+1}_site_j'] = e['site_j_id']
row[f'slot_{slot+1}_name_i'] = e['site_i_name']
row[f'slot_{slot+1}_name_j'] = e['site_j_name']
else:
row[f'slot_{slot+1}_type'] = None
row[f'slot_{slot+1}_site_i'] = None
row[f'slot_{slot+1}_site_j'] = None
row[f'slot_{slot+1}_name_i'] = None
row[f'slot_{slot+1}_name_j'] = None
calendar_rows.append(row)
df_calendar = pd.DataFrame(calendar_rows)
# ============================================
# 统计和验证
# ============================================
print(f"\n排程统计:")
# 每天事件数
events_per_day = [len(calendar[d]) for d in range(1, DAYS_PER_YEAR + 1)]
print(f" - 每天事件数: min={min(events_per_day)}, max={max(events_per_day)}, avg={np.mean(events_per_day):.2f}")
# 理想日期偏差
df_events_sorted['day_diff'] = abs(df_events_sorted['assigned_day'] - df_events_sorted['ideal_day'])
print(f" - 理想日期偏差: avg={df_events_sorted['day_diff'].mean():.2f}, max={df_events_sorted['day_diff'].max():.0f}")
# 访问间隔分析
print(f"\n访问间隔分析:")
site_visits = defaultdict(list)
for _, event in df_events_sorted.iterrows():
site_visits[event['site_i_id']].append(event['assigned_day'])
# 只有双站点访问才有site_j_id
if event['site_j_id'] is not None and not (isinstance(event['site_j_id'], float) and np.isnan(event['site_j_id'])):
site_visits[event['site_j_id']].append(event['assigned_day'])
gaps = []
for site_id, days in site_visits.items():
days_sorted = sorted(days)
for i in range(1, len(days_sorted)):
gaps.append(days_sorted[i] - days_sorted[i-1])
if gaps:
print(f" - 间隔范围: [{min(gaps)}, {max(gaps)}] 天")
print(f" - 平均间隔: {np.mean(gaps):.1f}")
print(f" - 中位数间隔: {np.median(gaps):.1f}")
# ============================================
# 保存结果
# ============================================
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
# Sheet 1: 日历视图
df_calendar.to_excel(writer, sheet_name='calendar', index=False)
# Sheet 2: 事件详情(含分配日期)
df_events_sorted.to_excel(writer, sheet_name='events', index=False)
# Sheet 3: 站点访问日期汇总
site_schedule = []
for site_id, days in site_visits.items():
# 跳过None/NaN值
if site_id is None or (isinstance(site_id, float) and np.isnan(site_id)):
continue
# 处理可能的类型不匹配
site_row = df_sites[df_sites['site_id'] == int(site_id)]
if len(site_row) > 0:
site_name = site_row['site_name'].values[0]
else:
site_name = f"Site_{site_id}"
days_sorted = sorted(days)
site_schedule.append({
'site_id': int(site_id),
'site_name': site_name,
'total_visits': len(days),
'visit_days': ','.join(map(str, days_sorted)),
'first_visit': days_sorted[0],
'last_visit': days_sorted[-1]
})
df_site_schedule = pd.DataFrame(site_schedule)
df_site_schedule.to_excel(writer, sheet_name='site_visits', index=False)
# Sheet 4: 汇总统计
summary = pd.DataFrame({
'metric': ['total_days', 'total_events', 'single_events', 'dual_events',
'avg_day_diff', 'max_day_diff', 'avg_gap', 'min_gap', 'max_gap'],
'value': [DAYS_PER_YEAR, len(df_events),
(df_events['event_type'] == 'single').sum(),
(df_events['event_type'] == 'dual').sum(),
df_events_sorted['day_diff'].mean(),
df_events_sorted['day_diff'].max(),
np.mean(gaps) if gaps else 0,
min(gaps) if gaps else 0,
max(gaps) if gaps else 0]
})
summary.to_excel(writer, sheet_name='summary', index=False)
print(f"\n结果已保存至: {OUTPUT_FILE}")
print(" - Sheet 'calendar': 365天日历视图")
print(" - Sheet 'events': 事件详情")
print(" - Sheet 'site_visits': 站点访问日期汇总")
print(" - Sheet 'summary': 汇总统计")
print("\n" + "=" * 60)