Files
mcm-mfp/kmin_effectiveness.py
2026-01-17 19:25:35 +08:00

457 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
K_min-有效性分析
k_min为实数如2.7表示70%站点最低2次30%站点最低3次
有效性计算支持引入需求标准差(正态分布),并通过 Monte Carlo 多次模拟求均值。
"""
import numpy as np
import pandas as pd
import math
import os
try:
import matplotlib
# 允许在无GUI环境生成图片
matplotlib.use("Agg")
import matplotlib.pyplot as plt
_HAS_MPL = True
except ModuleNotFoundError:
plt = None
_HAS_MPL = False
# 参数
C_OPT = 250
N_TOTAL = 730
ALPHA = 0.6
BETA = 0.2
N_SIMS = 2000
RANDOM_SEED = 606
OUTPUT_DIR = "data"
def gini_coefficient(values):
"""
计算基尼系数0=完全均等1=完全不均等)
约定values 非负若总和为0则返回0。
"""
x = np.asarray(values, dtype=float)
x = x[np.isfinite(x)]
if x.size == 0:
return 0.0
x = np.clip(x, 0, None)
total = x.sum()
if total <= 0:
return 0.0
x_sorted = np.sort(x)
n = x_sorted.size
idx = np.arange(1, n + 1, dtype=float)
return float((2.0 * (idx * x_sorted).sum()) / (n * total) - (n + 1.0) / n)
def _norm_pdf(z):
return np.exp(-0.5 * z * z) / np.sqrt(2.0 * np.pi)
def _norm_cdf(z):
z = np.asarray(z, dtype=float)
erf_vec = np.vectorize(math.erf, otypes=[float])
return 0.5 * (1.0 + erf_vec(z / np.sqrt(2.0)))
def expected_clipped_normal(mu, sigma, lower=0.0, upper=1.0):
"""
X ~ Normal(mu, sigma^2). 返回 E[clip(X, lower, upper)].
- 支持 lower=0 用于避免负需求
- sigma<=0 时退化为 clip(mu, lower, upper)
"""
mu = np.asarray(mu, dtype=float)
sigma = np.asarray(sigma, dtype=float)
lower = float(lower)
upper = float(upper)
if lower > upper:
raise ValueError("lower must be <= upper")
out = np.empty_like(mu, dtype=float)
mask = sigma > 0
out[~mask] = np.clip(mu[~mask], lower, upper)
if np.any(mask):
m = mu[mask]
s = sigma[mask]
z_u = (upper - m) / s
z_l = (lower - m) / s
Phi_u = _norm_cdf(z_u)
Phi_l = _norm_cdf(z_l)
phi_u = _norm_pdf(z_u)
phi_l = _norm_pdf(z_l)
# E[X 1_{X<=a}] = mu*Phi(z) - sigma*phi(z), z=(a-mu)/sigma
ex_le_u = m * Phi_u - s * phi_u
ex_le_l = m * Phi_l - s * phi_l
p_le_l = Phi_l
p_gt_u = 1.0 - Phi_u
out[mask] = lower * p_le_l + (ex_le_u - ex_le_l) + upper * p_gt_u
return out
def load_data():
"""加载数据"""
df = pd.read_excel('prob/MFP Regular Sites 2019.xlsx')
df = df.drop(columns=['Unnamed: 10', 'Demand per Visit == the number of clients serviced on that visit'])
df['TotalDemand'] = df['Average Demand per Visit'] * df['Number of Visits in 2019']
return df.sort_values('TotalDemand').reset_index(drop=True)
def allocate_visits(df, k_min_real, n_total, c_opt):
"""
根据实数k_min分配访问次数
k_min=2.7: floor=2, ceil=3, 70%站点得2次, 30%站点得3次
低需求站点优先获得较低的k_min已按TotalDemand排序
"""
df = df.copy()
n = len(df)
k_floor = int(np.floor(k_min_real))
k_ceil = int(np.ceil(k_min_real))
frac = k_min_real - k_floor # 获得ceil的比例
# 分配最低次数低需求站点得floor高需求站点得ceil
n_ceil = int(round(n * frac))
n_floor = n - n_ceil
k_base = np.array([k_floor] * n_floor + [k_ceil] * n_ceil)
df['K_base'] = k_base
# 计算剩余运力
n_reserved = k_base.sum()
n_free = n_total - n_reserved
if n_free < 0:
return None
# 按需求权重分配剩余运力
df['Weight'] = df['TotalDemand'] / df['TotalDemand'].sum()
df['AllocatedVisits'] = (k_base + n_free * df['Weight'].values).round().astype(int)
df['AllocatedVisits'] = np.maximum(df['AllocatedVisits'], k_base)
# 调整总数
diff = n_total - df['AllocatedVisits'].sum()
if diff != 0:
sorted_idx = df['Weight'].sort_values(ascending=(diff < 0)).index.tolist()
for idx in sorted_idx[:abs(diff)]:
df.loc[idx, 'AllocatedVisits'] += int(np.sign(diff))
return df
def calc_effectiveness(
df,
c_opt=C_OPT,
alpha=ALPHA,
beta=BETA,
method="mc",
n_sims=N_SIMS,
seed=RANDOM_SEED,
):
"""计算有效性指标method='mc' 多次模拟取均值method='analytic' 用闭式期望)"""
d = df['Average Demand per Visit'].values
d_std = df.get('StDev(Demand per Visit)', pd.Series(0.0, index=df.index)).values
d_std = np.clip(d_std, 0, None)
k = df['AllocatedVisits'].values
D = df['TotalDemand'].values
method = (method or "mc").lower()
if method not in {"mc", "analytic"}:
raise ValueError("method must be 'mc' or 'analytic'")
if method == "analytic":
# 正态分布需求:单次期望有效服务 = E[min(max(N(d, std),0), 容量)]
eff_per_visit = expected_clipped_normal(d, d_std, lower=0.0, upper=float(c_opt))
annual_eff = k * eff_per_visit
unmet = np.maximum(0, D - annual_eff) / np.maximum(D, 1)
waste = np.maximum(0, k * c_opt - annual_eff) / np.maximum(k * c_opt, 1)
base = annual_eff / np.maximum(D, 1)
score = np.clip(base - alpha * unmet - beta * waste, 0, 1)
n = score.size
bottom_n = max(1, int(np.ceil(n * 0.10)))
total_served = np.minimum(annual_eff, D).sum()
total_demand = D.sum()
return {
'mean': float(score.mean()),
'min': float(score.min()),
'bottom10_mean': float(np.sort(score)[:bottom_n].mean()),
'gini_eff': float(gini_coefficient(score)),
'std': float(score.std()),
'total_unmet': float((D - annual_eff).clip(min=0).sum()),
'total_waste': float((k * c_opt - annual_eff).clip(min=0).sum()),
'total_served': float(total_served),
'total_demand': float(total_demand),
'serve_ratio': float(total_served / total_demand) if total_demand > 0 else 0.0
}
# Monte Carlo每次访问的需求 ~ Normal(mu, sigma),重复 n_sims 次取均值
n_sims = int(n_sims)
if n_sims <= 0:
raise ValueError("n_sims must be > 0")
rng = np.random.default_rng(seed)
n_sites = len(df)
annual_eff_sims = np.zeros((n_sites, n_sims), dtype=float)
for i in range(n_sites):
k_i = int(k[i])
if k_i <= 0:
continue
mu_i = float(d[i])
sigma_i = float(d_std[i])
if sigma_i <= 0:
demand = np.full((n_sims, k_i), mu_i, dtype=float)
else:
demand = rng.normal(mu_i, sigma_i, size=(n_sims, k_i))
demand = np.clip(demand, 0, None)
annual_eff_sims[i, :] = np.minimum(demand, float(c_opt)).sum(axis=1)
D2 = D.reshape(-1, 1)
cap2 = (k * c_opt).reshape(-1, 1)
unmet = np.maximum(0, D2 - annual_eff_sims) / np.maximum(D2, 1)
waste = np.maximum(0, cap2 - annual_eff_sims) / np.maximum(cap2, 1)
base = annual_eff_sims / np.maximum(D2, 1)
score_sims = np.clip(base - alpha * unmet - beta * waste, 0, 1)
# 每个站点的期望得分(跨模拟平均)
avg_score = score_sims.mean(axis=1)
bottom_n = max(1, int(np.ceil(n_sites * 0.10)))
total_served_sims = np.minimum(annual_eff_sims, D2).sum(axis=0)
total_unmet_sims = np.maximum(0, D2 - annual_eff_sims).sum(axis=0)
total_waste_sims = np.maximum(0, cap2 - annual_eff_sims).sum(axis=0)
total_demand = float(D.sum())
total_served = float(total_served_sims.mean())
return {
'mean': float(avg_score.mean()),
'min': float(avg_score.min()),
'bottom10_mean': float(np.sort(avg_score)[:bottom_n].mean()),
'gini_eff': float(gini_coefficient(avg_score)),
'std': float(avg_score.std()),
'total_unmet': float(total_unmet_sims.mean()),
'total_waste': float(total_waste_sims.mean()),
'total_served': total_served,
'total_demand': total_demand,
'serve_ratio': float(total_served / total_demand) if total_demand > 0 else 0.0
}
def analyze_kmin_range(
df,
k_range=np.arange(1.0, 10.1, 0.1),
method="mc",
n_sims=N_SIMS,
seed=RANDOM_SEED,
):
"""扫描k_min范围计算有效性"""
results = []
n_sites = len(df)
site_cols = [f"visits_{i+1:02d}" for i in range(n_sites)]
for k_min in k_range:
df_alloc = allocate_visits(df, k_min, N_TOTAL, C_OPT)
if df_alloc is None:
continue
metrics = calc_effectiveness(df_alloc, method=method, n_sims=n_sims, seed=seed)
row = {
'k_min': k_min,
'effectiveness': metrics['mean'],
'min_eff': metrics['min'],
'bottom10_eff': metrics['bottom10_mean'],
'gini_eff': metrics['gini_eff'],
'std_eff': metrics['std'],
'unmet': metrics['total_unmet'],
'waste': metrics['total_waste'],
'total_served': metrics['total_served'],
'total_demand': metrics['total_demand'],
'serve_ratio': metrics['serve_ratio']
}
# 追加每个站点在该 k_min 下的访问次数(按 df 当前排序的站点顺序)
alloc = df_alloc["AllocatedVisits"].astype(int).tolist()
row.update({col: v for col, v in zip(site_cols, alloc)})
results.append(row)
return pd.DataFrame(results)
def plot_results(results):
"""绘制k_min-有效性曲线"""
if not _HAS_MPL:
raise RuntimeError("缺少依赖: matplotlib无法绘图。请先安装 matplotlib 再运行绘图部分。")
fig, axes = plt.subplots(4, 2, figsize=(12, 13))
# 红线选点:基尼系数第一次 < 0.2 的 k_min若不存在则回退到均值有效性最优点
gini_candidates = results.loc[results["gini_eff"] < 0.2, "k_min"]
if len(gini_candidates) > 0:
selected_k = float(gini_candidates.iloc[0])
selected_label = f'First Gini<0.2: k_min={selected_k:.1f}'
else:
best_idx = results['effectiveness'].idxmax()
selected_k = float(results.loc[best_idx, 'k_min'])
selected_label = f'Best mean eff: k_min={selected_k:.1f}'
selected_idx = (results["k_min"] - selected_k).abs().idxmin()
selected_eff = float(results.loc[selected_idx, "effectiveness"])
# 1. 有效性均值
ax = axes[0, 0]
ax.plot(results['k_min'], results['effectiveness'], 'b-', lw=2)
ax.axvline(selected_k, color='r', ls='--', label=selected_label)
ax.scatter([selected_k], [selected_eff], c='r', s=100, zorder=5)
ax.set_xlabel('k_min')
ax.set_ylabel('Mean Effectiveness')
ax.set_title('Mean Effectiveness vs k_min')
ax.legend()
ax.grid(True, alpha=0.3)
# 2. 最低10%有效性均值
ax = axes[0, 1]
ax.plot(results['k_min'], results['bottom10_eff'], 'm-', lw=2)
ax.axvline(selected_k, color='r', ls='--')
ax.set_xlabel('k_min')
ax.set_ylabel('Bottom 10% Mean Effectiveness')
ax.set_title('Bottom 10% Mean Effectiveness vs k_min')
ax.grid(True, alpha=0.3)
# 3. 总服务客户数
ax = axes[1, 0]
ax.plot(results['k_min'], results['total_served'] / 1000, 'c-', lw=2)
ax.axhline(results['total_demand'].iloc[0] / 1000, color='gray', ls=':', label='Total Demand')
ax.axvline(selected_k, color='r', ls='--')
ax.set_xlabel('k_min')
ax.set_ylabel('Served Families (×1000)')
ax.set_title('Total Served vs k_min')
ax.legend()
ax.grid(True, alpha=0.3)
# 4. 最小有效性
ax = axes[1, 1]
ax.plot(results['k_min'], results['min_eff'], 'g-', lw=2)
ax.axvline(selected_k, color='r', ls='--')
ax.set_xlabel('k_min')
ax.set_ylabel('Min Effectiveness')
ax.set_title('Worst Site Effectiveness vs k_min')
ax.grid(True, alpha=0.3)
# 5. 未满足需求 vs 浪费
ax = axes[2, 0]
ax.plot(results['k_min'], results['unmet'] / 1000, 'r-', lw=2, label='Unmet')
ax.plot(results['k_min'], results['waste'] / 1000, 'b-', lw=2, label='Waste')
ax.axvline(selected_k, color='gray', ls='--')
ax.set_xlabel('k_min')
ax.set_ylabel('Families (×1000)')
ax.set_title('Unmet Demand vs Wasted Capacity')
ax.legend()
ax.grid(True, alpha=0.3)
# 6. 有效性标准差
ax = axes[2, 1]
ax.plot(results['k_min'], results['std_eff'], color='tab:orange', lw=2)
ax.axvline(selected_k, color='gray', ls='--')
ax.set_xlabel('k_min')
ax.set_ylabel('Std Effectiveness')
ax.set_title('Effectiveness Std vs k_min')
ax.grid(True, alpha=0.3)
# 7. 基尼系数
ax = axes[3, 0]
ax.plot(results['k_min'], results['gini_eff'], color='tab:purple', lw=2)
ax.axhline(0.2, color='gray', ls=':', lw=1)
ax.axvline(selected_k, color='r', ls='--')
ax.set_xlabel('k_min')
ax.set_ylabel('Gini Coefficient')
ax.set_title('Gini (Effectiveness) vs k_min')
ax.grid(True, alpha=0.3)
# 8. 空白占位(避免最后一格空图框太挤)
axes[3, 1].axis('off')
plt.tight_layout()
os.makedirs(OUTPUT_DIR, exist_ok=True)
plt.savefig(os.path.join(OUTPUT_DIR, 'kmin_effectiveness.png'), dpi=150)
plt.close(fig)
return selected_k, selected_eff
def main():
# 1. 加载数据
df = load_data()
print(f"站点数: {len(df)}, 总运力: {N_TOTAL}")
print(f"总需求: {df['TotalDemand'].sum():,.0f} 家庭次")
site_name_col = "Site Name" if "Site Name" in df.columns else None
sites_out = pd.DataFrame(
{
"site_idx": np.arange(1, len(df) + 1, dtype=int),
"site_name": df[site_name_col].astype(str).values if site_name_col else [f"Site_{i+1:02d}" for i in range(len(df))],
"total_demand": df["TotalDemand"].values,
}
)
# 2. 扫描k_min
print("\n扫描 k_min ∈ [1.0, 10.0]...")
print(f"使用 Monte Carlo 平均n_sims={N_SIMS}, seed={RANDOM_SEED}")
results = analyze_kmin_range(df, method="mc", n_sims=N_SIMS, seed=RANDOM_SEED)
# 3. 绘图
best_idx = results['effectiveness'].idxmax()
best_k = results.loc[best_idx, 'k_min']
best_eff = results.loc[best_idx, 'effectiveness']
if _HAS_MPL:
plot_results(results)
else:
print("\n未检测到 matplotlib跳过绘图仍会保存CSV结果")
# 4. 输出最优结果
print(f"\n最优 k_min = {best_k:.1f}")
print(f"最优有效性 = {best_eff:.4f}")
# 5. 生成最优方案
df_opt = allocate_visits(df, best_k, N_TOTAL, C_OPT)
metrics = calc_effectiveness(df_opt, method="mc", n_sims=N_SIMS, seed=RANDOM_SEED)
print(f"\n最优方案统计:")
print(f" 有效性: {metrics['mean']:.4f} (min={metrics['min']:.4f})")
print(f" 总服务: {metrics['total_served']:,.0f} / {metrics['total_demand']:,.0f} ({metrics['serve_ratio']:.1%})")
print(f" 未满足: {metrics['total_unmet']:,.0f} 家庭次")
print(f" 浪费: {metrics['total_waste']:,.0f} 家庭次")
print(f" 访问次数: [{df_opt['AllocatedVisits'].min()}, {df_opt['AllocatedVisits'].max()}]")
# 6. 保存
os.makedirs(OUTPUT_DIR, exist_ok=True)
results.to_csv(os.path.join(OUTPUT_DIR, 'kmin_effectiveness_data.csv'), index=False)
sites_out.to_csv(os.path.join(OUTPUT_DIR, "kmin_effectiveness_sites.csv"), index=False)
print("\n已保存到 data/: kmin_effectiveness.png, kmin_effectiveness_data.csv, kmin_effectiveness_sites.csv")
if __name__ == '__main__':
main()