from __future__ import annotations import bisect import datetime as dt from dataclasses import dataclass import numpy as np import pandas as pd ALLOC_XLSX = "task1/03_allocate.xlsx" OUTPUT_XLSX = "task1/04_schedule.xlsx" YEAR = 2021 DAYS = 365 SLOTS_PER_DAY = 2 # scenario B: 2 trucks, 2 distinct sites/day # Default recommendation DEFAULT_SCENARIO = "rho20" DEFAULT_METHOD = "proportional_D" @dataclass(frozen=True) class Event: site_id: int site_name: str target_day: int # 1..365 def build_targets(site_id: int, site_name: str, k: int) -> list[Event]: if k <= 0: return [] targets: list[Event] = [] for j in range(k): # Even spacing: place j-th visit at (j+0.5)*DAYS/k t = int(round((j + 0.5) * DAYS / k)) t = max(1, min(DAYS, t)) targets.append(Event(site_id=site_id, site_name=site_name, target_day=t)) return targets def assign_events_to_days(events: list[Event]) -> dict[int, list[Event]]: # Initial binning by rounded target day day_to_events: dict[int, list[Event]] = {d: [] for d in range(1, DAYS + 1)} overflow: list[Event] = [] # Put into bins for ev in sorted(events, key=lambda e: (e.target_day, e.site_id)): day_to_events[ev.target_day].append(ev) # Enforce per-day capacity and per-day unique site_id for day in range(1, DAYS + 1): bucket = day_to_events[day] if not bucket: continue seen: set[int] = set() kept: list[Event] = [] for ev in bucket: if ev.site_id in seen: overflow.append(ev) else: seen.add(ev.site_id) kept.append(ev) # If still over capacity, keep earliest (already sorted) and overflow rest day_to_events[day] = kept[:SLOTS_PER_DAY] overflow.extend(kept[SLOTS_PER_DAY:]) # Underfull days list underfull_days: list[int] = [] for day in range(1, DAYS + 1): cap = SLOTS_PER_DAY - len(day_to_events[day]) underfull_days.extend([day] * cap) underfull_days.sort() # Fill underfull days with closest assignment to target_day def day_has_site(day: int, site_id: int) -> bool: return any(ev.site_id == site_id for ev in day_to_events[day]) for ev in sorted(overflow, key=lambda e: (e.target_day, e.site_id)): if not underfull_days: raise RuntimeError("No remaining capacity but overflow events remain.") pos = bisect.bisect_left(underfull_days, ev.target_day) candidate_positions = [] for delta in range(0, len(underfull_days)): # Check outward from the insertion point for p in (pos - delta, pos + delta): if 0 <= p < len(underfull_days): candidate_positions.append(p) if candidate_positions: # We gathered some; break after first ring to keep cost small break assigned_idx = None for p in candidate_positions: day = underfull_days[p] if not day_has_site(day, ev.site_id): assigned_idx = p break if assigned_idx is None: # Fallback: scan until we find any feasible slot for p, day in enumerate(underfull_days): if not day_has_site(day, ev.site_id): assigned_idx = p break if assigned_idx is None: raise RuntimeError(f"Unable to place event for site_id={ev.site_id}; per-day uniqueness too strict.") day = underfull_days.pop(assigned_idx) day_to_events[day].append(ev) # Final sanity: every day filled, and no day has duplicate site_id for day in range(1, DAYS + 1): if len(day_to_events[day]) != SLOTS_PER_DAY: raise RuntimeError(f"Day {day} not filled: {len(day_to_events[day])} events.") ids = [e.site_id for e in day_to_events[day]] if len(set(ids)) != len(ids): raise RuntimeError(f"Day {day} has duplicate site assignments.") return day_to_events def main() -> None: alloc = pd.read_excel(ALLOC_XLSX, sheet_name="allocations") k_col = f"k_{DEFAULT_METHOD}_{DEFAULT_SCENARIO}" if k_col not in alloc.columns: raise ValueError(f"Allocation column not found: {k_col}") alloc = alloc[["site_id", "site_name", k_col]].copy() alloc = alloc.rename(columns={k_col: "k_2021"}) alloc["k_2021"] = pd.to_numeric(alloc["k_2021"], errors="raise").astype(int) if int(alloc["k_2021"].sum()) != DAYS * SLOTS_PER_DAY: raise ValueError("k_2021 does not match total required slots.") if (alloc["k_2021"] < 1).any(): raise ValueError("k_2021 violates coverage constraint k_i >= 1.") events: list[Event] = [] for row in alloc.itertuples(index=False): events.extend(build_targets(site_id=int(row.site_id), site_name=str(row.site_name), k=int(row.k_2021))) if len(events) != DAYS * SLOTS_PER_DAY: raise RuntimeError("Generated events mismatch total required slots.") day_to_events = assign_events_to_days(events) start = dt.date(YEAR, 1, 1) calendar_rows: list[dict[str, object]] = [] per_site_rows: list[dict[str, object]] = [] for day in range(1, DAYS + 1): date = start + dt.timedelta(days=day - 1) evs = sorted(day_to_events[day], key=lambda e: e.site_id) calendar_rows.append( { "date": date.isoformat(), "day_of_year": day, "site1_id": evs[0].site_id, "site1_name": evs[0].site_name, "site2_id": evs[1].site_id, "site2_name": evs[1].site_name, } ) for slot, ev in enumerate(evs, start=1): per_site_rows.append( { "site_id": ev.site_id, "site_name": ev.site_name, "date": date.isoformat(), "day_of_year": day, "slot": slot, "target_day": ev.target_day, } ) calendar_df = pd.DataFrame(calendar_rows) site_dates_df = pd.DataFrame(per_site_rows).sort_values(["site_id", "day_of_year"]).reset_index(drop=True) # Schedule quality metrics: gaps between visits for each site gap_rows: list[dict[str, object]] = [] for site_id, group in site_dates_df.groupby("site_id"): days = group["day_of_year"].to_numpy(int) gaps = np.diff(days) if len(gaps) == 0: gap_rows.append({"site_id": int(site_id), "k": 1, "gap_max": None, "gap_mean": None, "gap_std": None}) else: gap_rows.append( { "site_id": int(site_id), "k": int(len(days)), "gap_max": int(gaps.max()), "gap_mean": float(gaps.mean()), "gap_std": float(gaps.std(ddof=0)), } ) gap_df = pd.DataFrame(gap_rows).merge(alloc[["site_id", "site_name"]], on="site_id", how="left") meta_df = pd.DataFrame( [ {"key": "year", "value": YEAR}, {"key": "days", "value": DAYS}, {"key": "slots_per_day", "value": SLOTS_PER_DAY}, {"key": "total_visits", "value": int(DAYS * SLOTS_PER_DAY)}, {"key": "allocation_scenario", "value": DEFAULT_SCENARIO}, {"key": "allocation_method", "value": DEFAULT_METHOD}, {"key": "k_column", "value": k_col}, ] ) with pd.ExcelWriter(OUTPUT_XLSX, engine="openpyxl") as writer: meta_df.to_excel(writer, index=False, sheet_name="meta") calendar_df.to_excel(writer, index=False, sheet_name="calendar") site_dates_df.to_excel(writer, index=False, sheet_name="site_dates") gap_df.to_excel(writer, index=False, sheet_name="gap_metrics") if __name__ == "__main__": main()