Files
mcm-mfp/task3/05_calendar.py
2026-01-19 11:57:19 +08:00

333 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Task 3 - Step 5: 日历排程生成
=============================
输入:
- 04_reschedule.xlsx (站点访问次数 + 配对访问明细)
输出: 05_calendar.xlsx (365天的完整排程)
排程逻辑:
1. 生成所有访问事件(单站点 + 双站点)
2. 为每个事件计算理想日期(均匀分布)
3. 贪心分配到365天每天2个事件槽位
4. 优先安排双站点访问(时间较长)
约束:
- 每天恰好2个访问事件
- 同一站点的访问尽量均匀分布
"""
import pandas as pd
import numpy as np
from collections import defaultdict
# ============================================
# 参数设置
# ============================================
INPUT_FILE = '04_reschedule.xlsx'
OUTPUT_FILE = '05_calendar.xlsx'
DAYS_PER_YEAR = 365
EVENTS_PER_DAY = 2
TOTAL_EVENTS = DAYS_PER_YEAR * EVENTS_PER_DAY # 730
# ============================================
# 读取数据
# ============================================
print("=" * 60)
print("Task 3 - Step 5: 日历排程生成")
print("=" * 60)
# 读取站点访问次数
df_sites = pd.read_excel(INPUT_FILE, sheet_name='sites_schedule')
print(f"\n读取站点数据: {len(df_sites)} 个站点")
# 读取配对访问
df_pairs = pd.read_excel(INPUT_FILE, sheet_name='pair_visits')
print(f"读取配对数据: {len(df_pairs)}")
# ============================================
# 生成访问事件
# ============================================
print(f"\n" + "-" * 40)
print("生成访问事件")
print("-" * 40)
events = []
event_id = 0
# 1. 生成单站点访问事件
for _, row in df_sites.iterrows():
site_id = row['site_id']
site_name = row['site_name']
k_single = int(row['k_single_final'])
for visit_num in range(k_single):
# 计算理想日期(均匀分布)
ideal_day = (visit_num + 0.5) * DAYS_PER_YEAR / k_single
events.append({
'event_id': event_id,
'event_type': 'single',
'site_i_id': site_id,
'site_j_id': None,
'site_i_name': site_name,
'site_j_name': None,
'visit_num': visit_num + 1,
'total_visits': k_single,
'ideal_day': ideal_day,
'priority': 0 # 单站点优先级较低
})
event_id += 1
# 2. 生成双站点访问事件
for _, row in df_pairs.iterrows():
site_i_id = row['site_i_id']
site_j_id = row['site_j_id']
site_i_name = row['site_i_name']
site_j_name = row['site_j_name']
k_dual = int(row['k_dual'])
for visit_num in range(k_dual):
ideal_day = (visit_num + 0.5) * DAYS_PER_YEAR / k_dual
events.append({
'event_id': event_id,
'event_type': 'dual',
'site_i_id': site_i_id,
'site_j_id': site_j_id,
'site_i_name': site_i_name,
'site_j_name': site_j_name,
'visit_num': visit_num + 1,
'total_visits': k_dual,
'ideal_day': ideal_day,
'priority': 1 # 双站点优先级较高
})
event_id += 1
df_events = pd.DataFrame(events)
print(f"总访问事件数: {len(df_events)}")
print(f" - 单站点: {(df_events['event_type'] == 'single').sum()}")
print(f" - 双站点: {(df_events['event_type'] == 'dual').sum()}")
# ============================================
# 贪心排程算法
# ============================================
print(f"\n" + "-" * 40)
print("执行贪心排程")
print("-" * 40)
# 按理想日期和优先级排序
df_events_sorted = df_events.sort_values(
['ideal_day', 'priority'],
ascending=[True, False]
).reset_index(drop=True)
# 初始化日历槽位
calendar = {day: [] for day in range(1, DAYS_PER_YEAR + 1)}
# 记录每个站点最后访问的日期
last_visit = defaultdict(lambda: -float('inf'))
# 贪心分配
assigned_day = []
for idx, event in df_events_sorted.iterrows():
ideal = event['ideal_day']
site_i = event['site_i_id']
site_j = event['site_j_id']
# 寻找最佳可用日期
best_day = None
best_score = float('inf')
# 搜索范围:理想日期附近
search_start = max(1, int(ideal) - 30)
search_end = min(DAYS_PER_YEAR, int(ideal) + 30)
for day in range(search_start, search_end + 1):
# 检查槽位是否可用
if len(calendar[day]) >= EVENTS_PER_DAY:
continue
# 检查同一站点是否已在当天访问
sites_on_day = set()
for e in calendar[day]:
sites_on_day.add(e['site_i_id'])
if e['site_j_id'] is not None:
sites_on_day.add(e['site_j_id'])
if site_i in sites_on_day:
continue
if site_j is not None and site_j in sites_on_day:
continue
# 计算得分(理想日期偏差 + 间隔惩罚)
day_diff = abs(day - ideal)
# 间隔惩罚:鼓励与上次访问保持距离
min_gap = min(
day - last_visit[site_i],
day - last_visit[site_j] if site_j is not None else float('inf')
)
gap_penalty = max(0, 7 - min_gap) * 2 # 7天内再次访问有惩罚
score = day_diff + gap_penalty
if score < best_score:
best_score = score
best_day = day
# 如果附近没找到,扩大搜索范围
if best_day is None:
for day in range(1, DAYS_PER_YEAR + 1):
if len(calendar[day]) < EVENTS_PER_DAY:
sites_on_day = set()
for e in calendar[day]:
sites_on_day.add(e['site_i_id'])
if e['site_j_id'] is not None:
sites_on_day.add(e['site_j_id'])
if site_i not in sites_on_day:
if site_j is None or site_j not in sites_on_day:
best_day = day
break
if best_day is None:
print(f"警告: 无法分配事件 {event['event_id']}")
best_day = 1 # 强制分配
# 分配到日历
calendar[best_day].append(event.to_dict())
assigned_day.append(best_day)
# 更新最后访问日期
last_visit[site_i] = best_day
if site_j is not None:
last_visit[site_j] = best_day
df_events_sorted['assigned_day'] = assigned_day
# ============================================
# 生成日历视图
# ============================================
print(f"\n" + "-" * 40)
print("生成日历视图")
print("-" * 40)
calendar_rows = []
for day in range(1, DAYS_PER_YEAR + 1):
events_on_day = calendar[day]
row = {'day': day}
for slot in range(EVENTS_PER_DAY):
if slot < len(events_on_day):
e = events_on_day[slot]
row[f'slot_{slot+1}_type'] = e['event_type']
row[f'slot_{slot+1}_site_i'] = e['site_i_id']
row[f'slot_{slot+1}_site_j'] = e['site_j_id']
row[f'slot_{slot+1}_name_i'] = e['site_i_name']
row[f'slot_{slot+1}_name_j'] = e['site_j_name']
else:
row[f'slot_{slot+1}_type'] = None
row[f'slot_{slot+1}_site_i'] = None
row[f'slot_{slot+1}_site_j'] = None
row[f'slot_{slot+1}_name_i'] = None
row[f'slot_{slot+1}_name_j'] = None
calendar_rows.append(row)
df_calendar = pd.DataFrame(calendar_rows)
# ============================================
# 统计和验证
# ============================================
print(f"\n排程统计:")
# 每天事件数
events_per_day = [len(calendar[d]) for d in range(1, DAYS_PER_YEAR + 1)]
print(f" - 每天事件数: min={min(events_per_day)}, max={max(events_per_day)}, avg={np.mean(events_per_day):.2f}")
# 理想日期偏差
df_events_sorted['day_diff'] = abs(df_events_sorted['assigned_day'] - df_events_sorted['ideal_day'])
print(f" - 理想日期偏差: avg={df_events_sorted['day_diff'].mean():.2f}, max={df_events_sorted['day_diff'].max():.0f}")
# 访问间隔分析
print(f"\n访问间隔分析:")
site_visits = defaultdict(list)
for _, event in df_events_sorted.iterrows():
site_visits[event['site_i_id']].append(event['assigned_day'])
# 只有双站点访问才有site_j_id
if event['site_j_id'] is not None and not (isinstance(event['site_j_id'], float) and np.isnan(event['site_j_id'])):
site_visits[event['site_j_id']].append(event['assigned_day'])
gaps = []
for site_id, days in site_visits.items():
days_sorted = sorted(days)
for i in range(1, len(days_sorted)):
gaps.append(days_sorted[i] - days_sorted[i-1])
if gaps:
print(f" - 间隔范围: [{min(gaps)}, {max(gaps)}] 天")
print(f" - 平均间隔: {np.mean(gaps):.1f}")
print(f" - 中位数间隔: {np.median(gaps):.1f}")
# ============================================
# 保存结果
# ============================================
with pd.ExcelWriter(OUTPUT_FILE, engine='openpyxl') as writer:
# Sheet 1: 日历视图
df_calendar.to_excel(writer, sheet_name='calendar', index=False)
# Sheet 2: 事件详情(含分配日期)
df_events_sorted.to_excel(writer, sheet_name='events', index=False)
# Sheet 3: 站点访问日期汇总
site_schedule = []
for site_id, days in site_visits.items():
# 跳过None/NaN值
if site_id is None or (isinstance(site_id, float) and np.isnan(site_id)):
continue
# 处理可能的类型不匹配
site_row = df_sites[df_sites['site_id'] == int(site_id)]
if len(site_row) > 0:
site_name = site_row['site_name'].values[0]
else:
site_name = f"Site_{site_id}"
days_sorted = sorted(days)
site_schedule.append({
'site_id': int(site_id),
'site_name': site_name,
'total_visits': len(days),
'visit_days': ','.join(map(str, days_sorted)),
'first_visit': days_sorted[0],
'last_visit': days_sorted[-1]
})
df_site_schedule = pd.DataFrame(site_schedule)
df_site_schedule.to_excel(writer, sheet_name='site_visits', index=False)
# Sheet 4: 汇总统计
summary = pd.DataFrame({
'metric': ['total_days', 'total_events', 'single_events', 'dual_events',
'avg_day_diff', 'max_day_diff', 'avg_gap', 'min_gap', 'max_gap'],
'value': [DAYS_PER_YEAR, len(df_events),
(df_events['event_type'] == 'single').sum(),
(df_events['event_type'] == 'dual').sum(),
df_events_sorted['day_diff'].mean(),
df_events_sorted['day_diff'].max(),
np.mean(gaps) if gaps else 0,
min(gaps) if gaps else 0,
max(gaps) if gaps else 0]
})
summary.to_excel(writer, sheet_name='summary', index=False)
print(f"\n结果已保存至: {OUTPUT_FILE}")
print(" - Sheet 'calendar': 365天日历视图")
print(" - Sheet 'events': 事件详情")
print(" - Sheet 'site_visits': 站点访问日期汇总")
print(" - Sheet 'summary': 汇总统计")
print("\n" + "=" * 60)