Files
mcm-mfp/kmin_effectiveness.py

457 lines
15 KiB
Python
Raw Permalink Normal View History

2026-01-17 15:53:51 +08:00
"""
K_min-有效性分析
k_min为实数如2.7表示70%站点最低2次30%站点最低3次
2026-01-17 18:41:59 +08:00
有效性计算支持引入需求标准差正态分布并通过 Monte Carlo 多次模拟求均值
2026-01-17 15:53:51 +08:00
"""
import numpy as np
import pandas as pd
2026-01-17 18:41:59 +08:00
import math
2026-01-17 18:46:16 +08:00
import os
2026-01-17 15:53:51 +08:00
try:
import matplotlib
# 允许在无GUI环境生成图片
matplotlib.use("Agg")
import matplotlib.pyplot as plt
_HAS_MPL = True
except ModuleNotFoundError:
plt = None
_HAS_MPL = False
# 参数
C_OPT = 250
2026-01-17 19:25:35 +08:00
N_TOTAL = 730
ALPHA = 0.6
2026-01-17 15:53:51 +08:00
BETA = 0.2
2026-01-17 18:41:59 +08:00
N_SIMS = 2000
2026-01-17 19:25:35 +08:00
RANDOM_SEED = 606
2026-01-17 18:46:16 +08:00
OUTPUT_DIR = "data"
2026-01-17 18:41:59 +08:00
def gini_coefficient(values):
"""
计算基尼系数0=完全均等1=完全不均等
约定values 非负若总和为0则返回0
"""
x = np.asarray(values, dtype=float)
x = x[np.isfinite(x)]
if x.size == 0:
return 0.0
x = np.clip(x, 0, None)
total = x.sum()
if total <= 0:
return 0.0
x_sorted = np.sort(x)
n = x_sorted.size
idx = np.arange(1, n + 1, dtype=float)
return float((2.0 * (idx * x_sorted).sum()) / (n * total) - (n + 1.0) / n)
def _norm_pdf(z):
return np.exp(-0.5 * z * z) / np.sqrt(2.0 * np.pi)
def _norm_cdf(z):
z = np.asarray(z, dtype=float)
erf_vec = np.vectorize(math.erf, otypes=[float])
return 0.5 * (1.0 + erf_vec(z / np.sqrt(2.0)))
def expected_clipped_normal(mu, sigma, lower=0.0, upper=1.0):
"""
X ~ Normal(mu, sigma^2). 返回 E[clip(X, lower, upper)].
- 支持 lower=0 用于避免负需求
- sigma<=0 时退化为 clip(mu, lower, upper)
"""
mu = np.asarray(mu, dtype=float)
sigma = np.asarray(sigma, dtype=float)
lower = float(lower)
upper = float(upper)
if lower > upper:
raise ValueError("lower must be <= upper")
out = np.empty_like(mu, dtype=float)
mask = sigma > 0
out[~mask] = np.clip(mu[~mask], lower, upper)
if np.any(mask):
m = mu[mask]
s = sigma[mask]
z_u = (upper - m) / s
z_l = (lower - m) / s
Phi_u = _norm_cdf(z_u)
Phi_l = _norm_cdf(z_l)
phi_u = _norm_pdf(z_u)
phi_l = _norm_pdf(z_l)
# E[X 1_{X<=a}] = mu*Phi(z) - sigma*phi(z), z=(a-mu)/sigma
ex_le_u = m * Phi_u - s * phi_u
ex_le_l = m * Phi_l - s * phi_l
p_le_l = Phi_l
p_gt_u = 1.0 - Phi_u
out[mask] = lower * p_le_l + (ex_le_u - ex_le_l) + upper * p_gt_u
return out
2026-01-17 15:53:51 +08:00
def load_data():
"""加载数据"""
df = pd.read_excel('prob/MFP Regular Sites 2019.xlsx')
df = df.drop(columns=['Unnamed: 10', 'Demand per Visit == the number of clients serviced on that visit'])
df['TotalDemand'] = df['Average Demand per Visit'] * df['Number of Visits in 2019']
return df.sort_values('TotalDemand').reset_index(drop=True)
def allocate_visits(df, k_min_real, n_total, c_opt):
"""
根据实数k_min分配访问次数
k_min=2.7: floor=2, ceil=3, 70%站点得2次, 30%站点得3次
低需求站点优先获得较低的k_min已按TotalDemand排序
"""
df = df.copy()
n = len(df)
k_floor = int(np.floor(k_min_real))
k_ceil = int(np.ceil(k_min_real))
frac = k_min_real - k_floor # 获得ceil的比例
# 分配最低次数低需求站点得floor高需求站点得ceil
n_ceil = int(round(n * frac))
n_floor = n - n_ceil
k_base = np.array([k_floor] * n_floor + [k_ceil] * n_ceil)
df['K_base'] = k_base
# 计算剩余运力
n_reserved = k_base.sum()
n_free = n_total - n_reserved
if n_free < 0:
return None
# 按需求权重分配剩余运力
df['Weight'] = df['TotalDemand'] / df['TotalDemand'].sum()
df['AllocatedVisits'] = (k_base + n_free * df['Weight'].values).round().astype(int)
df['AllocatedVisits'] = np.maximum(df['AllocatedVisits'], k_base)
# 调整总数
diff = n_total - df['AllocatedVisits'].sum()
if diff != 0:
sorted_idx = df['Weight'].sort_values(ascending=(diff < 0)).index.tolist()
for idx in sorted_idx[:abs(diff)]:
df.loc[idx, 'AllocatedVisits'] += int(np.sign(diff))
return df
2026-01-17 18:41:59 +08:00
def calc_effectiveness(
df,
c_opt=C_OPT,
alpha=ALPHA,
beta=BETA,
method="mc",
n_sims=N_SIMS,
seed=RANDOM_SEED,
):
"""计算有效性指标method='mc' 多次模拟取均值method='analytic' 用闭式期望)"""
2026-01-17 15:53:51 +08:00
d = df['Average Demand per Visit'].values
2026-01-17 18:41:59 +08:00
d_std = df.get('StDev(Demand per Visit)', pd.Series(0.0, index=df.index)).values
d_std = np.clip(d_std, 0, None)
2026-01-17 15:53:51 +08:00
k = df['AllocatedVisits'].values
D = df['TotalDemand'].values
2026-01-17 18:41:59 +08:00
method = (method or "mc").lower()
if method not in {"mc", "analytic"}:
raise ValueError("method must be 'mc' or 'analytic'")
if method == "analytic":
# 正态分布需求:单次期望有效服务 = E[min(max(N(d, std),0), 容量)]
eff_per_visit = expected_clipped_normal(d, d_std, lower=0.0, upper=float(c_opt))
annual_eff = k * eff_per_visit
unmet = np.maximum(0, D - annual_eff) / np.maximum(D, 1)
waste = np.maximum(0, k * c_opt - annual_eff) / np.maximum(k * c_opt, 1)
base = annual_eff / np.maximum(D, 1)
score = np.clip(base - alpha * unmet - beta * waste, 0, 1)
n = score.size
bottom_n = max(1, int(np.ceil(n * 0.10)))
total_served = np.minimum(annual_eff, D).sum()
total_demand = D.sum()
return {
'mean': float(score.mean()),
'min': float(score.min()),
'bottom10_mean': float(np.sort(score)[:bottom_n].mean()),
'gini_eff': float(gini_coefficient(score)),
'std': float(score.std()),
'total_unmet': float((D - annual_eff).clip(min=0).sum()),
'total_waste': float((k * c_opt - annual_eff).clip(min=0).sum()),
'total_served': float(total_served),
'total_demand': float(total_demand),
'serve_ratio': float(total_served / total_demand) if total_demand > 0 else 0.0
}
# Monte Carlo每次访问的需求 ~ Normal(mu, sigma),重复 n_sims 次取均值
n_sims = int(n_sims)
if n_sims <= 0:
raise ValueError("n_sims must be > 0")
rng = np.random.default_rng(seed)
n_sites = len(df)
annual_eff_sims = np.zeros((n_sites, n_sims), dtype=float)
for i in range(n_sites):
k_i = int(k[i])
if k_i <= 0:
continue
2026-01-17 15:53:51 +08:00
2026-01-17 18:41:59 +08:00
mu_i = float(d[i])
sigma_i = float(d_std[i])
if sigma_i <= 0:
demand = np.full((n_sims, k_i), mu_i, dtype=float)
else:
demand = rng.normal(mu_i, sigma_i, size=(n_sims, k_i))
demand = np.clip(demand, 0, None)
annual_eff_sims[i, :] = np.minimum(demand, float(c_opt)).sum(axis=1)
D2 = D.reshape(-1, 1)
cap2 = (k * c_opt).reshape(-1, 1)
unmet = np.maximum(0, D2 - annual_eff_sims) / np.maximum(D2, 1)
waste = np.maximum(0, cap2 - annual_eff_sims) / np.maximum(cap2, 1)
base = annual_eff_sims / np.maximum(D2, 1)
score_sims = np.clip(base - alpha * unmet - beta * waste, 0, 1)
# 每个站点的期望得分(跨模拟平均)
avg_score = score_sims.mean(axis=1)
bottom_n = max(1, int(np.ceil(n_sites * 0.10)))
total_served_sims = np.minimum(annual_eff_sims, D2).sum(axis=0)
total_unmet_sims = np.maximum(0, D2 - annual_eff_sims).sum(axis=0)
total_waste_sims = np.maximum(0, cap2 - annual_eff_sims).sum(axis=0)
total_demand = float(D.sum())
total_served = float(total_served_sims.mean())
2026-01-17 15:53:51 +08:00
return {
2026-01-17 18:41:59 +08:00
'mean': float(avg_score.mean()),
'min': float(avg_score.min()),
'bottom10_mean': float(np.sort(avg_score)[:bottom_n].mean()),
'gini_eff': float(gini_coefficient(avg_score)),
'std': float(avg_score.std()),
'total_unmet': float(total_unmet_sims.mean()),
'total_waste': float(total_waste_sims.mean()),
2026-01-17 15:53:51 +08:00
'total_served': total_served,
'total_demand': total_demand,
2026-01-17 18:41:59 +08:00
'serve_ratio': float(total_served / total_demand) if total_demand > 0 else 0.0
2026-01-17 15:53:51 +08:00
}
2026-01-17 18:41:59 +08:00
def analyze_kmin_range(
df,
k_range=np.arange(1.0, 10.1, 0.1),
method="mc",
n_sims=N_SIMS,
seed=RANDOM_SEED,
):
2026-01-17 15:53:51 +08:00
"""扫描k_min范围计算有效性"""
results = []
2026-01-17 18:41:59 +08:00
n_sites = len(df)
site_cols = [f"visits_{i+1:02d}" for i in range(n_sites)]
2026-01-17 15:53:51 +08:00
for k_min in k_range:
df_alloc = allocate_visits(df, k_min, N_TOTAL, C_OPT)
if df_alloc is None:
continue
2026-01-17 18:41:59 +08:00
metrics = calc_effectiveness(df_alloc, method=method, n_sims=n_sims, seed=seed)
row = {
2026-01-17 15:53:51 +08:00
'k_min': k_min,
'effectiveness': metrics['mean'],
'min_eff': metrics['min'],
'bottom10_eff': metrics['bottom10_mean'],
2026-01-17 18:41:59 +08:00
'gini_eff': metrics['gini_eff'],
2026-01-17 15:53:51 +08:00
'std_eff': metrics['std'],
'unmet': metrics['total_unmet'],
'waste': metrics['total_waste'],
'total_served': metrics['total_served'],
'total_demand': metrics['total_demand'],
'serve_ratio': metrics['serve_ratio']
2026-01-17 18:41:59 +08:00
}
# 追加每个站点在该 k_min 下的访问次数(按 df 当前排序的站点顺序)
alloc = df_alloc["AllocatedVisits"].astype(int).tolist()
row.update({col: v for col, v in zip(site_cols, alloc)})
results.append(row)
2026-01-17 15:53:51 +08:00
return pd.DataFrame(results)
def plot_results(results):
"""绘制k_min-有效性曲线"""
if not _HAS_MPL:
raise RuntimeError("缺少依赖: matplotlib无法绘图。请先安装 matplotlib 再运行绘图部分。")
2026-01-17 18:41:59 +08:00
fig, axes = plt.subplots(4, 2, figsize=(12, 13))
2026-01-17 15:53:51 +08:00
2026-01-17 18:41:59 +08:00
# 红线选点:基尼系数第一次 < 0.2 的 k_min若不存在则回退到均值有效性最优点
gini_candidates = results.loc[results["gini_eff"] < 0.2, "k_min"]
if len(gini_candidates) > 0:
selected_k = float(gini_candidates.iloc[0])
selected_label = f'First Gini<0.2: k_min={selected_k:.1f}'
else:
best_idx = results['effectiveness'].idxmax()
selected_k = float(results.loc[best_idx, 'k_min'])
selected_label = f'Best mean eff: k_min={selected_k:.1f}'
selected_idx = (results["k_min"] - selected_k).abs().idxmin()
selected_eff = float(results.loc[selected_idx, "effectiveness"])
2026-01-17 15:53:51 +08:00
# 1. 有效性均值
ax = axes[0, 0]
ax.plot(results['k_min'], results['effectiveness'], 'b-', lw=2)
2026-01-17 18:41:59 +08:00
ax.axvline(selected_k, color='r', ls='--', label=selected_label)
ax.scatter([selected_k], [selected_eff], c='r', s=100, zorder=5)
2026-01-17 15:53:51 +08:00
ax.set_xlabel('k_min')
ax.set_ylabel('Mean Effectiveness')
ax.set_title('Mean Effectiveness vs k_min')
ax.legend()
ax.grid(True, alpha=0.3)
# 2. 最低10%有效性均值
ax = axes[0, 1]
ax.plot(results['k_min'], results['bottom10_eff'], 'm-', lw=2)
2026-01-17 18:41:59 +08:00
ax.axvline(selected_k, color='r', ls='--')
2026-01-17 15:53:51 +08:00
ax.set_xlabel('k_min')
ax.set_ylabel('Bottom 10% Mean Effectiveness')
ax.set_title('Bottom 10% Mean Effectiveness vs k_min')
ax.grid(True, alpha=0.3)
# 3. 总服务客户数
ax = axes[1, 0]
ax.plot(results['k_min'], results['total_served'] / 1000, 'c-', lw=2)
ax.axhline(results['total_demand'].iloc[0] / 1000, color='gray', ls=':', label='Total Demand')
2026-01-17 18:41:59 +08:00
ax.axvline(selected_k, color='r', ls='--')
2026-01-17 15:53:51 +08:00
ax.set_xlabel('k_min')
ax.set_ylabel('Served Families (×1000)')
ax.set_title('Total Served vs k_min')
ax.legend()
ax.grid(True, alpha=0.3)
# 4. 最小有效性
ax = axes[1, 1]
ax.plot(results['k_min'], results['min_eff'], 'g-', lw=2)
2026-01-17 18:41:59 +08:00
ax.axvline(selected_k, color='r', ls='--')
2026-01-17 15:53:51 +08:00
ax.set_xlabel('k_min')
ax.set_ylabel('Min Effectiveness')
ax.set_title('Worst Site Effectiveness vs k_min')
ax.grid(True, alpha=0.3)
# 5. 未满足需求 vs 浪费
ax = axes[2, 0]
ax.plot(results['k_min'], results['unmet'] / 1000, 'r-', lw=2, label='Unmet')
ax.plot(results['k_min'], results['waste'] / 1000, 'b-', lw=2, label='Waste')
2026-01-17 18:41:59 +08:00
ax.axvline(selected_k, color='gray', ls='--')
2026-01-17 15:53:51 +08:00
ax.set_xlabel('k_min')
ax.set_ylabel('Families (×1000)')
ax.set_title('Unmet Demand vs Wasted Capacity')
ax.legend()
ax.grid(True, alpha=0.3)
# 6. 有效性标准差
ax = axes[2, 1]
ax.plot(results['k_min'], results['std_eff'], color='tab:orange', lw=2)
2026-01-17 18:41:59 +08:00
ax.axvline(selected_k, color='gray', ls='--')
2026-01-17 15:53:51 +08:00
ax.set_xlabel('k_min')
ax.set_ylabel('Std Effectiveness')
ax.set_title('Effectiveness Std vs k_min')
ax.grid(True, alpha=0.3)
2026-01-17 18:41:59 +08:00
# 7. 基尼系数
ax = axes[3, 0]
ax.plot(results['k_min'], results['gini_eff'], color='tab:purple', lw=2)
ax.axhline(0.2, color='gray', ls=':', lw=1)
ax.axvline(selected_k, color='r', ls='--')
ax.set_xlabel('k_min')
ax.set_ylabel('Gini Coefficient')
ax.set_title('Gini (Effectiveness) vs k_min')
ax.grid(True, alpha=0.3)
# 8. 空白占位(避免最后一格空图框太挤)
axes[3, 1].axis('off')
2026-01-17 15:53:51 +08:00
plt.tight_layout()
2026-01-17 18:46:16 +08:00
os.makedirs(OUTPUT_DIR, exist_ok=True)
plt.savefig(os.path.join(OUTPUT_DIR, 'kmin_effectiveness.png'), dpi=150)
2026-01-17 15:53:51 +08:00
plt.close(fig)
2026-01-17 18:41:59 +08:00
return selected_k, selected_eff
2026-01-17 15:53:51 +08:00
def main():
# 1. 加载数据
df = load_data()
print(f"站点数: {len(df)}, 总运力: {N_TOTAL}")
print(f"总需求: {df['TotalDemand'].sum():,.0f} 家庭次")
2026-01-17 18:41:59 +08:00
site_name_col = "Site Name" if "Site Name" in df.columns else None
sites_out = pd.DataFrame(
{
"site_idx": np.arange(1, len(df) + 1, dtype=int),
"site_name": df[site_name_col].astype(str).values if site_name_col else [f"Site_{i+1:02d}" for i in range(len(df))],
"total_demand": df["TotalDemand"].values,
}
)
2026-01-17 15:53:51 +08:00
# 2. 扫描k_min
print("\n扫描 k_min ∈ [1.0, 10.0]...")
2026-01-17 18:41:59 +08:00
print(f"使用 Monte Carlo 平均n_sims={N_SIMS}, seed={RANDOM_SEED}")
results = analyze_kmin_range(df, method="mc", n_sims=N_SIMS, seed=RANDOM_SEED)
2026-01-17 15:53:51 +08:00
# 3. 绘图
best_idx = results['effectiveness'].idxmax()
best_k = results.loc[best_idx, 'k_min']
best_eff = results.loc[best_idx, 'effectiveness']
if _HAS_MPL:
plot_results(results)
else:
print("\n未检测到 matplotlib跳过绘图仍会保存CSV结果")
# 4. 输出最优结果
print(f"\n最优 k_min = {best_k:.1f}")
print(f"最优有效性 = {best_eff:.4f}")
# 5. 生成最优方案
df_opt = allocate_visits(df, best_k, N_TOTAL, C_OPT)
2026-01-17 18:41:59 +08:00
metrics = calc_effectiveness(df_opt, method="mc", n_sims=N_SIMS, seed=RANDOM_SEED)
2026-01-17 15:53:51 +08:00
print(f"\n最优方案统计:")
print(f" 有效性: {metrics['mean']:.4f} (min={metrics['min']:.4f})")
print(f" 总服务: {metrics['total_served']:,.0f} / {metrics['total_demand']:,.0f} ({metrics['serve_ratio']:.1%})")
print(f" 未满足: {metrics['total_unmet']:,.0f} 家庭次")
print(f" 浪费: {metrics['total_waste']:,.0f} 家庭次")
print(f" 访问次数: [{df_opt['AllocatedVisits'].min()}, {df_opt['AllocatedVisits'].max()}]")
# 6. 保存
2026-01-17 18:46:16 +08:00
os.makedirs(OUTPUT_DIR, exist_ok=True)
results.to_csv(os.path.join(OUTPUT_DIR, 'kmin_effectiveness_data.csv'), index=False)
sites_out.to_csv(os.path.join(OUTPUT_DIR, "kmin_effectiveness_sites.csv"), index=False)
print("\n已保存到 data/: kmin_effectiveness.png, kmin_effectiveness_data.csv, kmin_effectiveness_sites.csv")
2026-01-17 15:53:51 +08:00
if __name__ == '__main__':
main()