""" K_min-有效性分析 k_min为实数:如2.7表示70%站点最低2次,30%站点最低3次 有效性计算支持引入需求标准差(正态分布),并通过 Monte Carlo 多次模拟求均值。 """ import numpy as np import pandas as pd import math import os try: import matplotlib # 允许在无GUI环境生成图片 matplotlib.use("Agg") import matplotlib.pyplot as plt _HAS_MPL = True except ModuleNotFoundError: plt = None _HAS_MPL = False # 参数 C_OPT = 250 N_TOTAL = 730 ALPHA = 0.6 BETA = 0.2 N_SIMS = 2000 RANDOM_SEED = 606 OUTPUT_DIR = "data" def gini_coefficient(values): """ 计算基尼系数(0=完全均等,1=完全不均等) 约定:values 非负;若总和为0,则返回0。 """ x = np.asarray(values, dtype=float) x = x[np.isfinite(x)] if x.size == 0: return 0.0 x = np.clip(x, 0, None) total = x.sum() if total <= 0: return 0.0 x_sorted = np.sort(x) n = x_sorted.size idx = np.arange(1, n + 1, dtype=float) return float((2.0 * (idx * x_sorted).sum()) / (n * total) - (n + 1.0) / n) def _norm_pdf(z): return np.exp(-0.5 * z * z) / np.sqrt(2.0 * np.pi) def _norm_cdf(z): z = np.asarray(z, dtype=float) erf_vec = np.vectorize(math.erf, otypes=[float]) return 0.5 * (1.0 + erf_vec(z / np.sqrt(2.0))) def expected_clipped_normal(mu, sigma, lower=0.0, upper=1.0): """ X ~ Normal(mu, sigma^2). 返回 E[clip(X, lower, upper)]. - 支持 lower=0 用于避免负需求 - sigma<=0 时退化为 clip(mu, lower, upper) """ mu = np.asarray(mu, dtype=float) sigma = np.asarray(sigma, dtype=float) lower = float(lower) upper = float(upper) if lower > upper: raise ValueError("lower must be <= upper") out = np.empty_like(mu, dtype=float) mask = sigma > 0 out[~mask] = np.clip(mu[~mask], lower, upper) if np.any(mask): m = mu[mask] s = sigma[mask] z_u = (upper - m) / s z_l = (lower - m) / s Phi_u = _norm_cdf(z_u) Phi_l = _norm_cdf(z_l) phi_u = _norm_pdf(z_u) phi_l = _norm_pdf(z_l) # E[X 1_{X<=a}] = mu*Phi(z) - sigma*phi(z), z=(a-mu)/sigma ex_le_u = m * Phi_u - s * phi_u ex_le_l = m * Phi_l - s * phi_l p_le_l = Phi_l p_gt_u = 1.0 - Phi_u out[mask] = lower * p_le_l + (ex_le_u - ex_le_l) + upper * p_gt_u return out def load_data(): """加载数据""" df = pd.read_excel('prob/MFP Regular Sites 2019.xlsx') df = df.drop(columns=['Unnamed: 10', 'Demand per Visit == the number of clients serviced on that visit']) df['TotalDemand'] = df['Average Demand per Visit'] * df['Number of Visits in 2019'] return df.sort_values('TotalDemand').reset_index(drop=True) def allocate_visits(df, k_min_real, n_total, c_opt): """ 根据实数k_min分配访问次数 k_min=2.7: floor=2, ceil=3, 70%站点得2次, 30%站点得3次 低需求站点优先获得较低的k_min(已按TotalDemand排序) """ df = df.copy() n = len(df) k_floor = int(np.floor(k_min_real)) k_ceil = int(np.ceil(k_min_real)) frac = k_min_real - k_floor # 获得ceil的比例 # 分配最低次数:低需求站点得floor,高需求站点得ceil n_ceil = int(round(n * frac)) n_floor = n - n_ceil k_base = np.array([k_floor] * n_floor + [k_ceil] * n_ceil) df['K_base'] = k_base # 计算剩余运力 n_reserved = k_base.sum() n_free = n_total - n_reserved if n_free < 0: return None # 按需求权重分配剩余运力 df['Weight'] = df['TotalDemand'] / df['TotalDemand'].sum() df['AllocatedVisits'] = (k_base + n_free * df['Weight'].values).round().astype(int) df['AllocatedVisits'] = np.maximum(df['AllocatedVisits'], k_base) # 调整总数 diff = n_total - df['AllocatedVisits'].sum() if diff != 0: sorted_idx = df['Weight'].sort_values(ascending=(diff < 0)).index.tolist() for idx in sorted_idx[:abs(diff)]: df.loc[idx, 'AllocatedVisits'] += int(np.sign(diff)) return df def calc_effectiveness( df, c_opt=C_OPT, alpha=ALPHA, beta=BETA, method="mc", n_sims=N_SIMS, seed=RANDOM_SEED, ): """计算有效性指标(method='mc' 多次模拟取均值;method='analytic' 用闭式期望)""" d = df['Average Demand per Visit'].values d_std = df.get('StDev(Demand per Visit)', pd.Series(0.0, index=df.index)).values d_std = np.clip(d_std, 0, None) k = df['AllocatedVisits'].values D = df['TotalDemand'].values method = (method or "mc").lower() if method not in {"mc", "analytic"}: raise ValueError("method must be 'mc' or 'analytic'") if method == "analytic": # 正态分布需求:单次期望有效服务 = E[min(max(N(d, std),0), 容量)] eff_per_visit = expected_clipped_normal(d, d_std, lower=0.0, upper=float(c_opt)) annual_eff = k * eff_per_visit unmet = np.maximum(0, D - annual_eff) / np.maximum(D, 1) waste = np.maximum(0, k * c_opt - annual_eff) / np.maximum(k * c_opt, 1) base = annual_eff / np.maximum(D, 1) score = np.clip(base - alpha * unmet - beta * waste, 0, 1) n = score.size bottom_n = max(1, int(np.ceil(n * 0.10))) total_served = np.minimum(annual_eff, D).sum() total_demand = D.sum() return { 'mean': float(score.mean()), 'min': float(score.min()), 'bottom10_mean': float(np.sort(score)[:bottom_n].mean()), 'gini_eff': float(gini_coefficient(score)), 'std': float(score.std()), 'total_unmet': float((D - annual_eff).clip(min=0).sum()), 'total_waste': float((k * c_opt - annual_eff).clip(min=0).sum()), 'total_served': float(total_served), 'total_demand': float(total_demand), 'serve_ratio': float(total_served / total_demand) if total_demand > 0 else 0.0 } # Monte Carlo:每次访问的需求 ~ Normal(mu, sigma),重复 n_sims 次取均值 n_sims = int(n_sims) if n_sims <= 0: raise ValueError("n_sims must be > 0") rng = np.random.default_rng(seed) n_sites = len(df) annual_eff_sims = np.zeros((n_sites, n_sims), dtype=float) for i in range(n_sites): k_i = int(k[i]) if k_i <= 0: continue mu_i = float(d[i]) sigma_i = float(d_std[i]) if sigma_i <= 0: demand = np.full((n_sims, k_i), mu_i, dtype=float) else: demand = rng.normal(mu_i, sigma_i, size=(n_sims, k_i)) demand = np.clip(demand, 0, None) annual_eff_sims[i, :] = np.minimum(demand, float(c_opt)).sum(axis=1) D2 = D.reshape(-1, 1) cap2 = (k * c_opt).reshape(-1, 1) unmet = np.maximum(0, D2 - annual_eff_sims) / np.maximum(D2, 1) waste = np.maximum(0, cap2 - annual_eff_sims) / np.maximum(cap2, 1) base = annual_eff_sims / np.maximum(D2, 1) score_sims = np.clip(base - alpha * unmet - beta * waste, 0, 1) # 每个站点的期望得分(跨模拟平均) avg_score = score_sims.mean(axis=1) bottom_n = max(1, int(np.ceil(n_sites * 0.10))) total_served_sims = np.minimum(annual_eff_sims, D2).sum(axis=0) total_unmet_sims = np.maximum(0, D2 - annual_eff_sims).sum(axis=0) total_waste_sims = np.maximum(0, cap2 - annual_eff_sims).sum(axis=0) total_demand = float(D.sum()) total_served = float(total_served_sims.mean()) return { 'mean': float(avg_score.mean()), 'min': float(avg_score.min()), 'bottom10_mean': float(np.sort(avg_score)[:bottom_n].mean()), 'gini_eff': float(gini_coefficient(avg_score)), 'std': float(avg_score.std()), 'total_unmet': float(total_unmet_sims.mean()), 'total_waste': float(total_waste_sims.mean()), 'total_served': total_served, 'total_demand': total_demand, 'serve_ratio': float(total_served / total_demand) if total_demand > 0 else 0.0 } def analyze_kmin_range( df, k_range=np.arange(1.0, 10.1, 0.1), method="mc", n_sims=N_SIMS, seed=RANDOM_SEED, ): """扫描k_min范围,计算有效性""" results = [] n_sites = len(df) site_cols = [f"visits_{i+1:02d}" for i in range(n_sites)] for k_min in k_range: df_alloc = allocate_visits(df, k_min, N_TOTAL, C_OPT) if df_alloc is None: continue metrics = calc_effectiveness(df_alloc, method=method, n_sims=n_sims, seed=seed) row = { 'k_min': k_min, 'effectiveness': metrics['mean'], 'min_eff': metrics['min'], 'bottom10_eff': metrics['bottom10_mean'], 'gini_eff': metrics['gini_eff'], 'std_eff': metrics['std'], 'unmet': metrics['total_unmet'], 'waste': metrics['total_waste'], 'total_served': metrics['total_served'], 'total_demand': metrics['total_demand'], 'serve_ratio': metrics['serve_ratio'] } # 追加每个站点在该 k_min 下的访问次数(按 df 当前排序的站点顺序) alloc = df_alloc["AllocatedVisits"].astype(int).tolist() row.update({col: v for col, v in zip(site_cols, alloc)}) results.append(row) return pd.DataFrame(results) def plot_results(results): """绘制k_min-有效性曲线""" if not _HAS_MPL: raise RuntimeError("缺少依赖: matplotlib(无法绘图)。请先安装 matplotlib 再运行绘图部分。") fig, axes = plt.subplots(4, 2, figsize=(12, 13)) # 红线选点:基尼系数第一次 < 0.2 的 k_min(若不存在则回退到均值有效性最优点) gini_candidates = results.loc[results["gini_eff"] < 0.2, "k_min"] if len(gini_candidates) > 0: selected_k = float(gini_candidates.iloc[0]) selected_label = f'First Gini<0.2: k_min={selected_k:.1f}' else: best_idx = results['effectiveness'].idxmax() selected_k = float(results.loc[best_idx, 'k_min']) selected_label = f'Best mean eff: k_min={selected_k:.1f}' selected_idx = (results["k_min"] - selected_k).abs().idxmin() selected_eff = float(results.loc[selected_idx, "effectiveness"]) # 1. 有效性均值 ax = axes[0, 0] ax.plot(results['k_min'], results['effectiveness'], 'b-', lw=2) ax.axvline(selected_k, color='r', ls='--', label=selected_label) ax.scatter([selected_k], [selected_eff], c='r', s=100, zorder=5) ax.set_xlabel('k_min') ax.set_ylabel('Mean Effectiveness') ax.set_title('Mean Effectiveness vs k_min') ax.legend() ax.grid(True, alpha=0.3) # 2. 最低10%有效性均值 ax = axes[0, 1] ax.plot(results['k_min'], results['bottom10_eff'], 'm-', lw=2) ax.axvline(selected_k, color='r', ls='--') ax.set_xlabel('k_min') ax.set_ylabel('Bottom 10% Mean Effectiveness') ax.set_title('Bottom 10% Mean Effectiveness vs k_min') ax.grid(True, alpha=0.3) # 3. 总服务客户数 ax = axes[1, 0] ax.plot(results['k_min'], results['total_served'] / 1000, 'c-', lw=2) ax.axhline(results['total_demand'].iloc[0] / 1000, color='gray', ls=':', label='Total Demand') ax.axvline(selected_k, color='r', ls='--') ax.set_xlabel('k_min') ax.set_ylabel('Served Families (×1000)') ax.set_title('Total Served vs k_min') ax.legend() ax.grid(True, alpha=0.3) # 4. 最小有效性 ax = axes[1, 1] ax.plot(results['k_min'], results['min_eff'], 'g-', lw=2) ax.axvline(selected_k, color='r', ls='--') ax.set_xlabel('k_min') ax.set_ylabel('Min Effectiveness') ax.set_title('Worst Site Effectiveness vs k_min') ax.grid(True, alpha=0.3) # 5. 未满足需求 vs 浪费 ax = axes[2, 0] ax.plot(results['k_min'], results['unmet'] / 1000, 'r-', lw=2, label='Unmet') ax.plot(results['k_min'], results['waste'] / 1000, 'b-', lw=2, label='Waste') ax.axvline(selected_k, color='gray', ls='--') ax.set_xlabel('k_min') ax.set_ylabel('Families (×1000)') ax.set_title('Unmet Demand vs Wasted Capacity') ax.legend() ax.grid(True, alpha=0.3) # 6. 有效性标准差 ax = axes[2, 1] ax.plot(results['k_min'], results['std_eff'], color='tab:orange', lw=2) ax.axvline(selected_k, color='gray', ls='--') ax.set_xlabel('k_min') ax.set_ylabel('Std Effectiveness') ax.set_title('Effectiveness Std vs k_min') ax.grid(True, alpha=0.3) # 7. 基尼系数 ax = axes[3, 0] ax.plot(results['k_min'], results['gini_eff'], color='tab:purple', lw=2) ax.axhline(0.2, color='gray', ls=':', lw=1) ax.axvline(selected_k, color='r', ls='--') ax.set_xlabel('k_min') ax.set_ylabel('Gini Coefficient') ax.set_title('Gini (Effectiveness) vs k_min') ax.grid(True, alpha=0.3) # 8. 空白占位(避免最后一格空图框太挤) axes[3, 1].axis('off') plt.tight_layout() os.makedirs(OUTPUT_DIR, exist_ok=True) plt.savefig(os.path.join(OUTPUT_DIR, 'kmin_effectiveness.png'), dpi=150) plt.close(fig) return selected_k, selected_eff def main(): # 1. 加载数据 df = load_data() print(f"站点数: {len(df)}, 总运力: {N_TOTAL}") print(f"总需求: {df['TotalDemand'].sum():,.0f} 家庭次") site_name_col = "Site Name" if "Site Name" in df.columns else None sites_out = pd.DataFrame( { "site_idx": np.arange(1, len(df) + 1, dtype=int), "site_name": df[site_name_col].astype(str).values if site_name_col else [f"Site_{i+1:02d}" for i in range(len(df))], "total_demand": df["TotalDemand"].values, } ) # 2. 扫描k_min print("\n扫描 k_min ∈ [1.0, 10.0]...") print(f"使用 Monte Carlo 平均:n_sims={N_SIMS}, seed={RANDOM_SEED}") results = analyze_kmin_range(df, method="mc", n_sims=N_SIMS, seed=RANDOM_SEED) # 3. 绘图 best_idx = results['effectiveness'].idxmax() best_k = results.loc[best_idx, 'k_min'] best_eff = results.loc[best_idx, 'effectiveness'] if _HAS_MPL: plot_results(results) else: print("\n未检测到 matplotlib,跳过绘图(仍会保存CSV结果)。") # 4. 输出最优结果 print(f"\n最优 k_min = {best_k:.1f}") print(f"最优有效性 = {best_eff:.4f}") # 5. 生成最优方案 df_opt = allocate_visits(df, best_k, N_TOTAL, C_OPT) metrics = calc_effectiveness(df_opt, method="mc", n_sims=N_SIMS, seed=RANDOM_SEED) print(f"\n最优方案统计:") print(f" 有效性: {metrics['mean']:.4f} (min={metrics['min']:.4f})") print(f" 总服务: {metrics['total_served']:,.0f} / {metrics['total_demand']:,.0f} ({metrics['serve_ratio']:.1%})") print(f" 未满足: {metrics['total_unmet']:,.0f} 家庭次") print(f" 浪费: {metrics['total_waste']:,.0f} 家庭次") print(f" 访问次数: [{df_opt['AllocatedVisits'].min()}, {df_opt['AllocatedVisits'].max()}]") # 6. 保存 os.makedirs(OUTPUT_DIR, exist_ok=True) results.to_csv(os.path.join(OUTPUT_DIR, 'kmin_effectiveness_data.csv'), index=False) sites_out.to_csv(os.path.join(OUTPUT_DIR, "kmin_effectiveness_sites.csv"), index=False) print("\n已保存到 data/: kmin_effectiveness.png, kmin_effectiveness_data.csv, kmin_effectiveness_sites.csv") if __name__ == '__main__': main()