""" K_min-有效性分析 k_min为实数:如2.7表示70%站点最低2次,30%站点最低3次 """ import numpy as np import pandas as pd try: import matplotlib # 允许在无GUI环境生成图片 matplotlib.use("Agg") import matplotlib.pyplot as plt _HAS_MPL = True except ModuleNotFoundError: plt = None _HAS_MPL = False # 参数 C_OPT = 250 N_TOTAL = 722 ALPHA = 0.4 BETA = 0.2 def load_data(): """加载数据""" df = pd.read_excel('prob/MFP Regular Sites 2019.xlsx') df = df.drop(columns=['Unnamed: 10', 'Demand per Visit == the number of clients serviced on that visit']) df['TotalDemand'] = df['Average Demand per Visit'] * df['Number of Visits in 2019'] return df.sort_values('TotalDemand').reset_index(drop=True) def allocate_visits(df, k_min_real, n_total, c_opt): """ 根据实数k_min分配访问次数 k_min=2.7: floor=2, ceil=3, 70%站点得2次, 30%站点得3次 低需求站点优先获得较低的k_min(已按TotalDemand排序) """ df = df.copy() n = len(df) k_floor = int(np.floor(k_min_real)) k_ceil = int(np.ceil(k_min_real)) frac = k_min_real - k_floor # 获得ceil的比例 # 分配最低次数:低需求站点得floor,高需求站点得ceil n_ceil = int(round(n * frac)) n_floor = n - n_ceil k_base = np.array([k_floor] * n_floor + [k_ceil] * n_ceil) df['K_base'] = k_base # 计算剩余运力 n_reserved = k_base.sum() n_free = n_total - n_reserved if n_free < 0: return None # 按需求权重分配剩余运力 df['Weight'] = df['TotalDemand'] / df['TotalDemand'].sum() df['AllocatedVisits'] = (k_base + n_free * df['Weight'].values).round().astype(int) df['AllocatedVisits'] = np.maximum(df['AllocatedVisits'], k_base) # 调整总数 diff = n_total - df['AllocatedVisits'].sum() if diff != 0: sorted_idx = df['Weight'].sort_values(ascending=(diff < 0)).index.tolist() for idx in sorted_idx[:abs(diff)]: df.loc[idx, 'AllocatedVisits'] += int(np.sign(diff)) return df def calc_effectiveness(df, c_opt=C_OPT, alpha=ALPHA, beta=BETA): """计算有效性指标""" d = df['Average Demand per Visit'].values k = df['AllocatedVisits'].values D = df['TotalDemand'].values # 截断:单次有效服务 = min(需求, 容量) eff_per_visit = np.minimum(d, c_opt) annual_eff = k * eff_per_visit # 缺货率、浪费率 unmet = np.maximum(0, D - annual_eff) / np.maximum(D, 1) waste = np.maximum(0, k * c_opt - D) / np.maximum(k * c_opt, 1) # 有效性得分 base = annual_eff / np.maximum(D, 1) score = np.clip(base - alpha * unmet - beta * waste, 0, 1) n = score.size bottom_n = max(1, int(np.ceil(n * 0.10))) bottom10_mean = float(np.sort(score)[:bottom_n].mean()) # 总服务客户数 = Σ min(供给能力, 需求) total_served = np.minimum(k * c_opt, D).sum() total_demand = D.sum() return { 'mean': score.mean(), 'min': score.min(), 'bottom10_mean': bottom10_mean, 'std': score.std(), 'total_unmet': (D - annual_eff).clip(min=0).sum(), 'total_waste': (k * c_opt - D).clip(min=0).sum(), 'total_served': total_served, 'total_demand': total_demand, 'serve_ratio': total_served / total_demand } def analyze_kmin_range(df, k_range=np.arange(1.0, 10.1, 0.1)): """扫描k_min范围,计算有效性""" results = [] for k_min in k_range: df_alloc = allocate_visits(df, k_min, N_TOTAL, C_OPT) if df_alloc is None: continue metrics = calc_effectiveness(df_alloc) results.append({ 'k_min': k_min, 'effectiveness': metrics['mean'], 'min_eff': metrics['min'], 'bottom10_eff': metrics['bottom10_mean'], 'std_eff': metrics['std'], 'unmet': metrics['total_unmet'], 'waste': metrics['total_waste'], 'total_served': metrics['total_served'], 'total_demand': metrics['total_demand'], 'serve_ratio': metrics['serve_ratio'] }) return pd.DataFrame(results) def plot_results(results): """绘制k_min-有效性曲线""" if not _HAS_MPL: raise RuntimeError("缺少依赖: matplotlib(无法绘图)。请先安装 matplotlib 再运行绘图部分。") fig, axes = plt.subplots(3, 2, figsize=(12, 10)) # 找最优点 best_idx = results['effectiveness'].idxmax() best_k = results.loc[best_idx, 'k_min'] best_eff = results.loc[best_idx, 'effectiveness'] # 1. 有效性均值 ax = axes[0, 0] ax.plot(results['k_min'], results['effectiveness'], 'b-', lw=2) ax.axvline(best_k, color='r', ls='--', label=f'Best k_min={best_k:.1f}') ax.scatter([best_k], [best_eff], c='r', s=100, zorder=5) ax.set_xlabel('k_min') ax.set_ylabel('Mean Effectiveness') ax.set_title('Mean Effectiveness vs k_min') ax.legend() ax.grid(True, alpha=0.3) # 2. 最低10%有效性均值 ax = axes[0, 1] ax.plot(results['k_min'], results['bottom10_eff'], 'm-', lw=2) ax.axvline(best_k, color='r', ls='--') ax.set_xlabel('k_min') ax.set_ylabel('Bottom 10% Mean Effectiveness') ax.set_title('Bottom 10% Mean Effectiveness vs k_min') ax.grid(True, alpha=0.3) # 3. 总服务客户数 ax = axes[1, 0] ax.plot(results['k_min'], results['total_served'] / 1000, 'c-', lw=2) ax.axhline(results['total_demand'].iloc[0] / 1000, color='gray', ls=':', label='Total Demand') ax.axvline(best_k, color='r', ls='--') ax.set_xlabel('k_min') ax.set_ylabel('Served Families (×1000)') ax.set_title('Total Served vs k_min') ax.legend() ax.grid(True, alpha=0.3) # 4. 最小有效性 ax = axes[1, 1] ax.plot(results['k_min'], results['min_eff'], 'g-', lw=2) ax.axvline(best_k, color='r', ls='--') ax.set_xlabel('k_min') ax.set_ylabel('Min Effectiveness') ax.set_title('Worst Site Effectiveness vs k_min') ax.grid(True, alpha=0.3) # 5. 未满足需求 vs 浪费 ax = axes[2, 0] ax.plot(results['k_min'], results['unmet'] / 1000, 'r-', lw=2, label='Unmet') ax.plot(results['k_min'], results['waste'] / 1000, 'b-', lw=2, label='Waste') ax.axvline(best_k, color='gray', ls='--') ax.set_xlabel('k_min') ax.set_ylabel('Families (×1000)') ax.set_title('Unmet Demand vs Wasted Capacity') ax.legend() ax.grid(True, alpha=0.3) # 6. 有效性标准差 ax = axes[2, 1] ax.plot(results['k_min'], results['std_eff'], color='tab:orange', lw=2) ax.axvline(best_k, color='gray', ls='--') ax.set_xlabel('k_min') ax.set_ylabel('Std Effectiveness') ax.set_title('Effectiveness Std vs k_min') ax.grid(True, alpha=0.3) plt.tight_layout() plt.savefig('kmin_effectiveness.png', dpi=150) plt.close(fig) return best_k, best_eff def main(): # 1. 加载数据 df = load_data() print(f"站点数: {len(df)}, 总运力: {N_TOTAL}") print(f"总需求: {df['TotalDemand'].sum():,.0f} 家庭次") # 2. 扫描k_min print("\n扫描 k_min ∈ [1.0, 10.0]...") results = analyze_kmin_range(df) # 3. 绘图 best_idx = results['effectiveness'].idxmax() best_k = results.loc[best_idx, 'k_min'] best_eff = results.loc[best_idx, 'effectiveness'] if _HAS_MPL: plot_results(results) else: print("\n未检测到 matplotlib,跳过绘图(仍会保存CSV结果)。") # 4. 输出最优结果 print(f"\n最优 k_min = {best_k:.1f}") print(f"最优有效性 = {best_eff:.4f}") # 5. 生成最优方案 df_opt = allocate_visits(df, best_k, N_TOTAL, C_OPT) metrics = calc_effectiveness(df_opt) print(f"\n最优方案统计:") print(f" 有效性: {metrics['mean']:.4f} (min={metrics['min']:.4f})") print(f" 总服务: {metrics['total_served']:,.0f} / {metrics['total_demand']:,.0f} ({metrics['serve_ratio']:.1%})") print(f" 未满足: {metrics['total_unmet']:,.0f} 家庭次") print(f" 浪费: {metrics['total_waste']:,.0f} 家庭次") print(f" 访问次数: [{df_opt['AllocatedVisits'].min()}, {df_opt['AllocatedVisits'].max()}]") # 6. 保存 results.to_csv('kmin_effectiveness_data.csv', index=False) print("\n已保存: kmin_effectiveness.png, kmin_effectiveness_data.csv") if __name__ == '__main__': main()