786 lines
29 KiB
Python
786 lines
29 KiB
Python
|
|
#!/usr/bin/env python3
|
|||
|
|
# -*- coding: utf-8 -*-
|
|||
|
|
"""
|
|||
|
|
火箭发射数上限预测分析
|
|||
|
|
对比 Logistic、Gompertz、Richards 三种增长模型
|
|||
|
|
分析不同历史数据窗口对预测的影响
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import pandas as pd
|
|||
|
|
import numpy as np
|
|||
|
|
from scipy.optimize import curve_fit
|
|||
|
|
from scipy.stats import pearsonr
|
|||
|
|
import matplotlib.pyplot as plt
|
|||
|
|
import warnings
|
|||
|
|
warnings.filterwarnings('ignore')
|
|||
|
|
|
|||
|
|
plt.rcParams["font.sans-serif"] = ["Arial Unicode MS", "SimHei", "DejaVu Sans"]
|
|||
|
|
plt.rcParams["axes.unicode_minus"] = False
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ============================================================
|
|||
|
|
# 1. 增长模型定义
|
|||
|
|
# ============================================================
|
|||
|
|
|
|||
|
|
def logistic(t, K, r, t0):
|
|||
|
|
"""
|
|||
|
|
Logistic 增长模型(对称S曲线)
|
|||
|
|
N(t) = K / (1 + exp(-r*(t - t0)))
|
|||
|
|
|
|||
|
|
参数:
|
|||
|
|
K : 环境容量/饱和上限
|
|||
|
|
r : 增长率
|
|||
|
|
t0 : 拐点时间(增长最快的时刻,此时 N = K/2)
|
|||
|
|
"""
|
|||
|
|
return K / (1 + np.exp(-r * (t - t0)))
|
|||
|
|
|
|||
|
|
|
|||
|
|
def gompertz(t, K, b, c):
|
|||
|
|
"""
|
|||
|
|
Gompertz 增长模型(非对称S曲线,早期增长较快)
|
|||
|
|
N(t) = K * exp(-b * exp(-c*t))
|
|||
|
|
|
|||
|
|
参数:
|
|||
|
|
K : 饱和上限
|
|||
|
|
b : 位移参数
|
|||
|
|
c : 增长率参数
|
|||
|
|
拐点在 N = K/e ≈ 0.368K 处
|
|||
|
|
"""
|
|||
|
|
return K * np.exp(-b * np.exp(-c * t))
|
|||
|
|
|
|||
|
|
|
|||
|
|
def richards(t, K, r, t0, v):
|
|||
|
|
"""
|
|||
|
|
Richards 曲线(广义 Logistic,可调节对称性)
|
|||
|
|
N(t) = K / (1 + exp(-r*(t - t0)))^(1/v)
|
|||
|
|
|
|||
|
|
参数:
|
|||
|
|
K : 饱和上限
|
|||
|
|
r : 增长率
|
|||
|
|
t0 : 拐点时间
|
|||
|
|
v : 形状参数(v=1 时退化为 Logistic)
|
|||
|
|
"""
|
|||
|
|
exp_term = np.exp(-r * (t - t0))
|
|||
|
|
# 避免数值溢出
|
|||
|
|
exp_term = np.clip(exp_term, 1e-10, 1e10)
|
|||
|
|
return K / np.power(1 + exp_term, 1/v)
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ============================================================
|
|||
|
|
# 2. 数据加载与预处理
|
|||
|
|
# ============================================================
|
|||
|
|
|
|||
|
|
def load_data(filepath="rocket_launch_counts.csv"):
|
|||
|
|
"""加载并清洗火箭发射数据"""
|
|||
|
|
df = pd.read_csv(filepath)
|
|||
|
|
df = df.rename(columns={"YDate": "year", "Total": "launches"})
|
|||
|
|
df["year"] = pd.to_numeric(df["year"], errors="coerce")
|
|||
|
|
df["launches"] = pd.to_numeric(df["launches"], errors="coerce")
|
|||
|
|
df = df.dropna(subset=["year", "launches"])
|
|||
|
|
# 只保留完整年份数据(截至2025年,2026年数据不完整)
|
|||
|
|
df = df[(df["year"] >= 1957) & (df["year"] <= 2025)]
|
|||
|
|
df = df.astype({"year": int, "launches": int})
|
|||
|
|
df = df.sort_values("year").reset_index(drop=True)
|
|||
|
|
return df
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ============================================================
|
|||
|
|
# 3. 模型拟合
|
|||
|
|
# ============================================================
|
|||
|
|
|
|||
|
|
def fit_model(model_func, t, y, model_name, p0, bounds):
|
|||
|
|
"""通用模型拟合函数"""
|
|||
|
|
try:
|
|||
|
|
popt, pcov = curve_fit(model_func, t, y, p0=p0, bounds=bounds, maxfev=50000)
|
|||
|
|
perr = np.sqrt(np.diag(pcov))
|
|||
|
|
y_pred = model_func(t, *popt)
|
|||
|
|
|
|||
|
|
# 计算拟合优度
|
|||
|
|
ss_res = np.sum((y - y_pred) ** 2)
|
|||
|
|
ss_tot = np.sum((y - np.mean(y)) ** 2)
|
|||
|
|
r_squared = 1 - (ss_res / ss_tot)
|
|||
|
|
rmse = np.sqrt(np.mean((y - y_pred) ** 2))
|
|||
|
|
|
|||
|
|
return {
|
|||
|
|
"success": True,
|
|||
|
|
"params": popt,
|
|||
|
|
"param_err": perr,
|
|||
|
|
"r_squared": r_squared,
|
|||
|
|
"rmse": rmse,
|
|||
|
|
"y_pred": y_pred
|
|||
|
|
}
|
|||
|
|
except Exception as e:
|
|||
|
|
return {
|
|||
|
|
"success": False,
|
|||
|
|
"error": str(e)
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
|
|||
|
|
def fit_all_models(data, base_year=1957):
|
|||
|
|
"""对数据拟合所有三种模型"""
|
|||
|
|
years = data["year"].values
|
|||
|
|
launches = data["launches"].values
|
|||
|
|
t = (years - base_year).astype(float)
|
|||
|
|
|
|||
|
|
results = {}
|
|||
|
|
|
|||
|
|
# Logistic 模型 - 提高上限边界以避免撞边界
|
|||
|
|
p0_log = [2000.0, 0.08, 70.0]
|
|||
|
|
bounds_log = ([300, 0.01, 0], [500000, 1.0, 300])
|
|||
|
|
res = fit_model(logistic, t, launches, "Logistic", p0_log, bounds_log)
|
|||
|
|
if res["success"]:
|
|||
|
|
K, r, t0 = res["params"]
|
|||
|
|
res["K"] = K
|
|||
|
|
res["inflection_year"] = base_year + t0
|
|||
|
|
res["inflection_value"] = K / 2
|
|||
|
|
results["Logistic"] = res
|
|||
|
|
|
|||
|
|
# Gompertz 模型
|
|||
|
|
p0_gom = [2000.0, 5.0, 0.03]
|
|||
|
|
bounds_gom = ([300, 0.5, 0.005], [500000, 50.0, 0.5])
|
|||
|
|
res = fit_model(gompertz, t, launches, "Gompertz", p0_gom, bounds_gom)
|
|||
|
|
if res["success"]:
|
|||
|
|
K, b, c = res["params"]
|
|||
|
|
res["K"] = K
|
|||
|
|
# Gompertz 拐点:t = ln(b)/c, 值为 K/e
|
|||
|
|
res["inflection_year"] = base_year + np.log(b) / c
|
|||
|
|
res["inflection_value"] = K / np.e
|
|||
|
|
results["Gompertz"] = res
|
|||
|
|
|
|||
|
|
# Richards 模型
|
|||
|
|
p0_ric = [2000.0, 0.08, 70.0, 2.0]
|
|||
|
|
bounds_ric = ([300, 0.01, 0, 0.1], [500000, 1.0, 300, 10.0])
|
|||
|
|
res = fit_model(richards, t, launches, "Richards", p0_ric, bounds_ric)
|
|||
|
|
if res["success"]:
|
|||
|
|
K, r, t0, v = res["params"]
|
|||
|
|
res["K"] = K
|
|||
|
|
res["inflection_year"] = base_year + t0
|
|||
|
|
res["inflection_value"] = K / (v + 1) ** (1/v)
|
|||
|
|
res["v"] = v
|
|||
|
|
results["Richards"] = res
|
|||
|
|
|
|||
|
|
return results, years, launches, t, base_year
|
|||
|
|
|
|||
|
|
|
|||
|
|
def predict_future(results, base_year, years_future):
|
|||
|
|
"""生成未来预测"""
|
|||
|
|
t_future = years_future - base_year
|
|||
|
|
predictions = {}
|
|||
|
|
|
|||
|
|
for model_name, res in results.items():
|
|||
|
|
if not res["success"]:
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
params = res["params"]
|
|||
|
|
if model_name == "Logistic":
|
|||
|
|
pred = logistic(t_future, *params)
|
|||
|
|
elif model_name == "Gompertz":
|
|||
|
|
pred = gompertz(t_future, *params)
|
|||
|
|
elif model_name == "Richards":
|
|||
|
|
pred = richards(t_future, *params)
|
|||
|
|
|
|||
|
|
predictions[model_name] = np.maximum(pred, 0)
|
|||
|
|
|
|||
|
|
return predictions
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ============================================================
|
|||
|
|
# 4. 可视化
|
|||
|
|
# ============================================================
|
|||
|
|
|
|||
|
|
def plot_model_comparison(df, results_dict, save_path="launch_capacity_analysis.png"):
|
|||
|
|
"""绘制模型对比图"""
|
|||
|
|
fig, axes = plt.subplots(2, 2, figsize=(16, 12))
|
|||
|
|
|
|||
|
|
# 颜色方案
|
|||
|
|
colors = {
|
|||
|
|
"Logistic": "#E74C3C",
|
|||
|
|
"Gompertz": "#3498DB",
|
|||
|
|
"Richards": "#27AE60"
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# 子图1: 历史数据 + 三种模型拟合(全部数据)
|
|||
|
|
ax1 = axes[0, 0]
|
|||
|
|
results, years, launches, t, base_year = results_dict["全部数据"]
|
|||
|
|
ax1.scatter(years, launches, color="gray", s=30, alpha=0.7, label="历史数据", zorder=3)
|
|||
|
|
|
|||
|
|
years_curve = np.arange(years.min(), 2101)
|
|||
|
|
t_curve = years_curve - base_year
|
|||
|
|
|
|||
|
|
for model_name, res in results.items():
|
|||
|
|
if not res["success"]:
|
|||
|
|
continue
|
|||
|
|
params = res["params"]
|
|||
|
|
if model_name == "Logistic":
|
|||
|
|
pred = logistic(t_curve, *params)
|
|||
|
|
elif model_name == "Gompertz":
|
|||
|
|
pred = gompertz(t_curve, *params)
|
|||
|
|
elif model_name == "Richards":
|
|||
|
|
pred = richards(t_curve, *params)
|
|||
|
|
|
|||
|
|
ax1.plot(years_curve, pred, color=colors[model_name], lw=2,
|
|||
|
|
label=f"{model_name} (K={res['K']:.0f}, R²={res['r_squared']:.3f})")
|
|||
|
|
|
|||
|
|
ax1.axvline(2025, color="gray", ls="--", lw=1, alpha=0.5)
|
|||
|
|
ax1.set_xlabel("年份", fontsize=11)
|
|||
|
|
ax1.set_ylabel("年发射次数", fontsize=11)
|
|||
|
|
ax1.set_title("全部历史数据 (1957-2025) 模型拟合", fontsize=12)
|
|||
|
|
ax1.legend(loc="upper left", fontsize=9)
|
|||
|
|
ax1.grid(True, alpha=0.3)
|
|||
|
|
ax1.set_xlim(1955, 2105)
|
|||
|
|
ax1.set_ylim(0, None)
|
|||
|
|
|
|||
|
|
# 子图2: 近15年数据拟合
|
|||
|
|
ax2 = axes[0, 1]
|
|||
|
|
results, years, launches, t, base_year = results_dict["近15年"]
|
|||
|
|
ax2.scatter(years, launches, color="gray", s=40, alpha=0.8, label="历史数据", zorder=3)
|
|||
|
|
|
|||
|
|
years_curve = np.arange(2010, 2101)
|
|||
|
|
t_curve = years_curve - base_year
|
|||
|
|
|
|||
|
|
for model_name, res in results.items():
|
|||
|
|
if not res["success"]:
|
|||
|
|
continue
|
|||
|
|
params = res["params"]
|
|||
|
|
if model_name == "Logistic":
|
|||
|
|
pred = logistic(t_curve, *params)
|
|||
|
|
elif model_name == "Gompertz":
|
|||
|
|
pred = gompertz(t_curve, *params)
|
|||
|
|
elif model_name == "Richards":
|
|||
|
|
pred = richards(t_curve, *params)
|
|||
|
|
|
|||
|
|
ax2.plot(years_curve, pred, color=colors[model_name], lw=2,
|
|||
|
|
label=f"{model_name} (K={res['K']:.0f})")
|
|||
|
|
|
|||
|
|
ax2.axvline(2025, color="gray", ls="--", lw=1, alpha=0.5)
|
|||
|
|
ax2.set_xlabel("年份", fontsize=11)
|
|||
|
|
ax2.set_ylabel("年发射次数", fontsize=11)
|
|||
|
|
ax2.set_title("近15年数据 (2010-2025) 模型拟合", fontsize=12)
|
|||
|
|
ax2.legend(loc="upper left", fontsize=9)
|
|||
|
|
ax2.grid(True, alpha=0.3)
|
|||
|
|
ax2.set_xlim(2008, 2105)
|
|||
|
|
ax2.set_ylim(0, None)
|
|||
|
|
|
|||
|
|
# 子图3: 只展示 Richards 模型的K值(因为其他模型撞边界)
|
|||
|
|
ax3 = axes[1, 0]
|
|||
|
|
windows = ["全部数据", "近25年", "近15年", "近10年"]
|
|||
|
|
x = np.arange(len(windows))
|
|||
|
|
|
|||
|
|
k_values = []
|
|||
|
|
for window in windows:
|
|||
|
|
if window in results_dict:
|
|||
|
|
res = results_dict[window][0].get("Richards", {})
|
|||
|
|
k_values.append(res.get("K", 0) if res.get("success", False) else 0)
|
|||
|
|
else:
|
|||
|
|
k_values.append(0)
|
|||
|
|
|
|||
|
|
bars = ax3.bar(x, k_values, 0.6, color=colors["Richards"], alpha=0.8, edgecolor='black')
|
|||
|
|
|
|||
|
|
# 添加数值标签
|
|||
|
|
for bar, k in zip(bars, k_values):
|
|||
|
|
if k > 0:
|
|||
|
|
ax3.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 500,
|
|||
|
|
f'{k:.0f}', ha='center', va='bottom', fontsize=10, fontweight='bold')
|
|||
|
|
|
|||
|
|
ax3.set_xlabel("数据窗口", fontsize=11)
|
|||
|
|
ax3.set_ylabel("预测饱和上限 K (次/年)", fontsize=11)
|
|||
|
|
ax3.set_title("Richards 模型: 不同数据窗口的饱和上限预测\n(Logistic/Gompertz 撞边界,不适用)", fontsize=11)
|
|||
|
|
ax3.set_xticks(x)
|
|||
|
|
ax3.set_xticklabels(windows)
|
|||
|
|
ax3.grid(True, alpha=0.3, axis='y')
|
|||
|
|
ax3.set_ylim(0, max(k_values) * 1.15 if k_values else 10000)
|
|||
|
|
|
|||
|
|
# 子图4: Richards 模型拐点年份
|
|||
|
|
ax4 = axes[1, 1]
|
|||
|
|
inflection_years = []
|
|||
|
|
inflection_values = []
|
|||
|
|
for window in windows:
|
|||
|
|
if window in results_dict:
|
|||
|
|
res = results_dict[window][0].get("Richards", {})
|
|||
|
|
if res.get("success", False):
|
|||
|
|
inflection_years.append(res.get("inflection_year", 0))
|
|||
|
|
inflection_values.append(res.get("inflection_value", 0))
|
|||
|
|
else:
|
|||
|
|
inflection_years.append(0)
|
|||
|
|
inflection_values.append(0)
|
|||
|
|
else:
|
|||
|
|
inflection_years.append(0)
|
|||
|
|
inflection_values.append(0)
|
|||
|
|
|
|||
|
|
bars = ax4.bar(x, inflection_years, 0.6, color=colors["Richards"], alpha=0.8, edgecolor='black')
|
|||
|
|
|
|||
|
|
# 添加标签(拐点年份和对应发射数)
|
|||
|
|
for bar, year, val in zip(bars, inflection_years, inflection_values):
|
|||
|
|
if year > 0:
|
|||
|
|
ax4.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.5,
|
|||
|
|
f'{year:.0f}\n({val:.0f}次/年)', ha='center', va='bottom', fontsize=9)
|
|||
|
|
|
|||
|
|
ax4.axhline(2025, color="red", ls="--", lw=2, alpha=0.8, label="当前年份 (2025)")
|
|||
|
|
ax4.set_xlabel("数据窗口", fontsize=11)
|
|||
|
|
ax4.set_ylabel("拐点年份", fontsize=11)
|
|||
|
|
ax4.set_title("Richards 模型: 预测拐点年份\n(增长最快的时刻)", fontsize=11)
|
|||
|
|
ax4.set_xticks(x)
|
|||
|
|
ax4.set_xticklabels(windows)
|
|||
|
|
ax4.legend(loc="lower right", fontsize=9)
|
|||
|
|
ax4.grid(True, alpha=0.3, axis='y')
|
|||
|
|
valid_years = [y for y in inflection_years if y > 0]
|
|||
|
|
if valid_years:
|
|||
|
|
ax4.set_ylim(2020, max(valid_years) + 15)
|
|||
|
|
|
|||
|
|
plt.suptitle("火箭年发射数上限预测 - 多模型对比分析", fontsize=14, fontweight='bold', y=1.02)
|
|||
|
|
plt.tight_layout()
|
|||
|
|
plt.savefig(save_path, dpi=150, bbox_inches="tight")
|
|||
|
|
plt.close()
|
|||
|
|
print(f"图表已保存: {save_path}")
|
|||
|
|
|
|||
|
|
|
|||
|
|
def plot_physical_vs_model(results_dict, physical_limits, save_path="launch_capacity_physical.png"):
|
|||
|
|
"""绘制物理约束与模型预测对比图"""
|
|||
|
|
fig, axes = plt.subplots(1, 2, figsize=(14, 6))
|
|||
|
|
|
|||
|
|
# 左图:物理约束与模型预测K值对比
|
|||
|
|
ax1 = axes[0]
|
|||
|
|
|
|||
|
|
# 物理约束
|
|||
|
|
phys_scenarios = list(physical_limits.keys())
|
|||
|
|
phys_values = list(physical_limits.values())
|
|||
|
|
x_phys = np.arange(len(phys_scenarios))
|
|||
|
|
bars1 = ax1.barh(x_phys, phys_values, 0.6, color='#3498DB', alpha=0.7, label='物理约束估计')
|
|||
|
|
|
|||
|
|
# 添加数值标签
|
|||
|
|
for bar, val in zip(bars1, phys_values):
|
|||
|
|
ax1.text(val + 50, bar.get_y() + bar.get_height()/2,
|
|||
|
|
f'{val:,}', va='center', fontsize=10)
|
|||
|
|
|
|||
|
|
# 添加模型预测(Richards)
|
|||
|
|
richards_k = []
|
|||
|
|
for window in ["近25年", "近15年", "近10年"]:
|
|||
|
|
if window in results_dict:
|
|||
|
|
res = results_dict[window][0].get("Richards", {})
|
|||
|
|
if res.get("success", False):
|
|||
|
|
richards_k.append(res["K"])
|
|||
|
|
|
|||
|
|
if richards_k:
|
|||
|
|
avg_richards = np.mean(richards_k)
|
|||
|
|
ax1.axvline(avg_richards, color='#E74C3C', ls='--', lw=2,
|
|||
|
|
label=f'Richards模型平均 K={avg_richards:,.0f}')
|
|||
|
|
|
|||
|
|
ax1.set_yticks(x_phys)
|
|||
|
|
ax1.set_yticklabels(phys_scenarios)
|
|||
|
|
ax1.set_xlabel("年发射上限 (次/年)", fontsize=11)
|
|||
|
|
ax1.set_title("物理约束 vs 模型预测 (10个发射场)", fontsize=12)
|
|||
|
|
ax1.legend(loc="lower right", fontsize=9)
|
|||
|
|
ax1.grid(True, alpha=0.3, axis='x')
|
|||
|
|
ax1.set_xlim(0, max(phys_values) * 1.3)
|
|||
|
|
|
|||
|
|
# 右图:带物理约束的时间预测
|
|||
|
|
ax2 = axes[1]
|
|||
|
|
|
|||
|
|
# 获取近15年Richards预测
|
|||
|
|
if "近15年" in results_dict:
|
|||
|
|
results, years, launches, t, base_year = results_dict["近15年"]
|
|||
|
|
res = results.get("Richards", {})
|
|||
|
|
|
|||
|
|
if res["success"]:
|
|||
|
|
years_future = np.arange(2010, 2101)
|
|||
|
|
t_future = years_future - base_year
|
|||
|
|
pred = richards(t_future, *res["params"])
|
|||
|
|
|
|||
|
|
ax2.scatter(years, launches, color="gray", s=40, alpha=0.7, label="历史数据", zorder=3)
|
|||
|
|
ax2.plot(years_future, pred, color='#27AE60', lw=2, label=f"Richards (K={res['K']:,.0f})")
|
|||
|
|
|
|||
|
|
# 添加物理约束线
|
|||
|
|
phys_colors = ['#E74C3C', '#F39C12', '#3498DB', '#9B59B6', '#1ABC9C']
|
|||
|
|
for i, (scenario, limit) in enumerate(physical_limits.items()):
|
|||
|
|
ax2.axhline(limit, color=phys_colors[i % len(phys_colors)],
|
|||
|
|
ls='--', lw=1.5, alpha=0.7, label=f'{scenario}: {limit:,}')
|
|||
|
|
|
|||
|
|
ax2.axvline(2025, color="gray", ls=":", lw=1, alpha=0.5)
|
|||
|
|
ax2.set_xlabel("年份", fontsize=11)
|
|||
|
|
ax2.set_ylabel("年发射次数", fontsize=11)
|
|||
|
|
ax2.set_title("Richards预测 + 物理约束线", fontsize=12)
|
|||
|
|
ax2.legend(loc="upper left", fontsize=8)
|
|||
|
|
ax2.grid(True, alpha=0.3)
|
|||
|
|
ax2.set_xlim(2008, 2105)
|
|||
|
|
ax2.set_ylim(0, max(physical_limits.values()) * 1.2)
|
|||
|
|
|
|||
|
|
plt.suptitle("物理约束分析 (10个发射场)", fontsize=14, fontweight='bold', y=1.02)
|
|||
|
|
plt.tight_layout()
|
|||
|
|
plt.savefig(save_path, dpi=150, bbox_inches="tight")
|
|||
|
|
plt.close()
|
|||
|
|
print(f"图表已保存: {save_path}")
|
|||
|
|
|
|||
|
|
|
|||
|
|
def plot_future_predictions(results_dict, save_path="launch_capacity_predictions.png"):
|
|||
|
|
"""绘制未来预测图"""
|
|||
|
|
fig, axes = plt.subplots(1, 2, figsize=(16, 6))
|
|||
|
|
|
|||
|
|
colors = {
|
|||
|
|
"Logistic": "#E74C3C",
|
|||
|
|
"Gompertz": "#3498DB",
|
|||
|
|
"Richards": "#27AE60"
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# 左图:基于全部数据的预测
|
|||
|
|
ax1 = axes[0]
|
|||
|
|
results, years, launches, t, base_year = results_dict["全部数据"]
|
|||
|
|
years_future = np.arange(1957, 2151)
|
|||
|
|
predictions = predict_future(results, base_year, years_future)
|
|||
|
|
|
|||
|
|
ax1.scatter(years, launches, color="gray", s=20, alpha=0.6, label="历史数据", zorder=3)
|
|||
|
|
for model_name, pred in predictions.items():
|
|||
|
|
ax1.plot(years_future, pred, color=colors[model_name], lw=2, label=model_name)
|
|||
|
|
|
|||
|
|
ax1.axvline(2025, color="gray", ls="--", lw=1, alpha=0.5)
|
|||
|
|
ax1.axvspan(2025, 2150, alpha=0.05, color="blue")
|
|||
|
|
ax1.set_xlabel("年份", fontsize=11)
|
|||
|
|
ax1.set_ylabel("年发射次数", fontsize=11)
|
|||
|
|
ax1.set_title("基于全部历史数据的预测 (至2150年)", fontsize=12)
|
|||
|
|
ax1.legend(loc="upper left", fontsize=10)
|
|||
|
|
ax1.grid(True, alpha=0.3)
|
|||
|
|
ax1.set_xlim(1955, 2155)
|
|||
|
|
ax1.set_ylim(0, None)
|
|||
|
|
|
|||
|
|
# 右图:基于近15年数据的预测
|
|||
|
|
ax2 = axes[1]
|
|||
|
|
results, years, launches, t, base_year = results_dict["近15年"]
|
|||
|
|
years_future = np.arange(2010, 2151)
|
|||
|
|
predictions = predict_future(results, base_year, years_future)
|
|||
|
|
|
|||
|
|
ax2.scatter(years, launches, color="gray", s=40, alpha=0.7, label="历史数据", zorder=3)
|
|||
|
|
for model_name, pred in predictions.items():
|
|||
|
|
ax2.plot(years_future, pred, color=colors[model_name], lw=2, label=model_name)
|
|||
|
|
|
|||
|
|
ax2.axvline(2025, color="gray", ls="--", lw=1, alpha=0.5)
|
|||
|
|
ax2.axvspan(2025, 2150, alpha=0.05, color="blue")
|
|||
|
|
ax2.set_xlabel("年份", fontsize=11)
|
|||
|
|
ax2.set_ylabel("年发射次数", fontsize=11)
|
|||
|
|
ax2.set_title("基于近15年数据的预测 (至2150年)", fontsize=12)
|
|||
|
|
ax2.legend(loc="upper left", fontsize=10)
|
|||
|
|
ax2.grid(True, alpha=0.3)
|
|||
|
|
ax2.set_xlim(2008, 2155)
|
|||
|
|
ax2.set_ylim(0, None)
|
|||
|
|
|
|||
|
|
plt.suptitle("火箭年发射数长期预测", fontsize=14, fontweight='bold', y=1.02)
|
|||
|
|
plt.tight_layout()
|
|||
|
|
plt.savefig(save_path, dpi=150, bbox_inches="tight")
|
|||
|
|
plt.close()
|
|||
|
|
print(f"图表已保存: {save_path}")
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ============================================================
|
|||
|
|
# 5. 主程序
|
|||
|
|
# ============================================================
|
|||
|
|
|
|||
|
|
def bootstrap_K_estimate(data, model_func, p0, bounds, n_bootstrap=500, base_year=1957):
|
|||
|
|
"""Bootstrap 估计 K 值的不确定性"""
|
|||
|
|
years = data["year"].values
|
|||
|
|
launches = data["launches"].values
|
|||
|
|
t = (years - base_year).astype(float)
|
|||
|
|
n = len(t)
|
|||
|
|
|
|||
|
|
K_samples = []
|
|||
|
|
for _ in range(n_bootstrap):
|
|||
|
|
# 重采样
|
|||
|
|
idx = np.random.choice(n, n, replace=True)
|
|||
|
|
t_boot = t[idx]
|
|||
|
|
y_boot = launches[idx]
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
popt, _ = curve_fit(model_func, t_boot, y_boot, p0=p0, bounds=bounds, maxfev=10000)
|
|||
|
|
K_samples.append(popt[0]) # K 是第一个参数
|
|||
|
|
except:
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
if len(K_samples) > 10:
|
|||
|
|
return {
|
|||
|
|
"mean": np.mean(K_samples),
|
|||
|
|
"median": np.median(K_samples),
|
|||
|
|
"std": np.std(K_samples),
|
|||
|
|
"ci_5": np.percentile(K_samples, 5),
|
|||
|
|
"ci_95": np.percentile(K_samples, 95),
|
|||
|
|
"samples": K_samples
|
|||
|
|
}
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
|
|||
|
|
def calculate_aic_bic(y_true, y_pred, n_params):
|
|||
|
|
"""计算 AIC 和 BIC"""
|
|||
|
|
n = len(y_true)
|
|||
|
|
ss_res = np.sum((y_true - y_pred) ** 2)
|
|||
|
|
mse = ss_res / n
|
|||
|
|
|
|||
|
|
# Log-likelihood (假设高斯误差)
|
|||
|
|
log_likelihood = -n/2 * (np.log(2 * np.pi) + np.log(mse) + 1)
|
|||
|
|
|
|||
|
|
aic = 2 * n_params - 2 * log_likelihood
|
|||
|
|
bic = n_params * np.log(n) - 2 * log_likelihood
|
|||
|
|
|
|||
|
|
return aic, bic
|
|||
|
|
|
|||
|
|
|
|||
|
|
def main():
|
|||
|
|
print("=" * 70)
|
|||
|
|
print("火箭年发射数上限预测分析")
|
|||
|
|
print("模型: Logistic, Gompertz, Richards")
|
|||
|
|
print("=" * 70)
|
|||
|
|
|
|||
|
|
# 加载数据
|
|||
|
|
df = load_data()
|
|||
|
|
print(f"\n数据范围: {df['year'].min()} - {df['year'].max()}")
|
|||
|
|
print(f"数据点数: {len(df)}")
|
|||
|
|
print(f"最近5年发射数:")
|
|||
|
|
for _, row in df.tail(5).iterrows():
|
|||
|
|
print(f" {int(row['year'])}: {int(row['launches'])} 次")
|
|||
|
|
|
|||
|
|
# 计算增长率
|
|||
|
|
recent = df[df["year"] >= 2020].copy()
|
|||
|
|
if len(recent) > 1:
|
|||
|
|
growth_rates = recent["launches"].pct_change().dropna() * 100
|
|||
|
|
print(f"\n2020年以来年均增长率: {growth_rates.mean():.1f}%")
|
|||
|
|
|
|||
|
|
# 定义不同的数据窗口
|
|||
|
|
end_year = 2025
|
|||
|
|
windows = {
|
|||
|
|
"全部数据": df[df["year"] <= end_year].copy(),
|
|||
|
|
"近25年": df[(df["year"] >= 2000) & (df["year"] <= end_year)].copy(),
|
|||
|
|
"近15年": df[(df["year"] >= 2010) & (df["year"] <= end_year)].copy(),
|
|||
|
|
"近10年": df[(df["year"] >= 2015) & (df["year"] <= end_year)].copy(),
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
# 存储所有结果
|
|||
|
|
all_results = {}
|
|||
|
|
|
|||
|
|
# 对每个窗口进行拟合
|
|||
|
|
for window_name, data in windows.items():
|
|||
|
|
print(f"\n{'='*50}")
|
|||
|
|
print(f"数据窗口: {window_name} ({data['year'].min()}-{data['year'].max()}, n={len(data)})")
|
|||
|
|
print("=" * 50)
|
|||
|
|
|
|||
|
|
results, years, launches, t, base_year = fit_all_models(data)
|
|||
|
|
all_results[window_name] = (results, years, launches, t, base_year)
|
|||
|
|
|
|||
|
|
for model_name, res in results.items():
|
|||
|
|
if res["success"]:
|
|||
|
|
print(f"\n{model_name} 模型:")
|
|||
|
|
print(f" 饱和上限 K = {res['K']:.0f} 次/年")
|
|||
|
|
print(f" 拐点年份 = {res['inflection_year']:.1f}")
|
|||
|
|
print(f" 拐点处发射数 = {res['inflection_value']:.0f} 次/年")
|
|||
|
|
print(f" R² (拟合优度) = {res['r_squared']:.4f}")
|
|||
|
|
print(f" RMSE = {res['rmse']:.2f}")
|
|||
|
|
if model_name == "Richards":
|
|||
|
|
print(f" 形状参数 v = {res.get('v', 'N/A'):.3f}")
|
|||
|
|
else:
|
|||
|
|
print(f"\n{model_name} 模型: 拟合失败 - {res.get('error', 'Unknown')}")
|
|||
|
|
|
|||
|
|
# 生成未来预测表
|
|||
|
|
print("\n" + "=" * 70)
|
|||
|
|
print("未来预测 (基于近15年数据)")
|
|||
|
|
print("=" * 70)
|
|||
|
|
|
|||
|
|
results, years, launches, t, base_year = all_results["近15年"]
|
|||
|
|
future_years = np.array([2030, 2040, 2050, 2075, 2100])
|
|||
|
|
predictions = predict_future(results, base_year, future_years)
|
|||
|
|
|
|||
|
|
print(f"\n{'年份':<10}", end="")
|
|||
|
|
for model in ["Logistic", "Gompertz", "Richards"]:
|
|||
|
|
print(f"{model:<15}", end="")
|
|||
|
|
print()
|
|||
|
|
print("-" * 55)
|
|||
|
|
|
|||
|
|
for i, year in enumerate(future_years):
|
|||
|
|
print(f"{year:<10}", end="")
|
|||
|
|
for model in ["Logistic", "Gompertz", "Richards"]:
|
|||
|
|
if model in predictions:
|
|||
|
|
print(f"{predictions[model][i]:<15.0f}", end="")
|
|||
|
|
else:
|
|||
|
|
print(f"{'N/A':<15}", end="")
|
|||
|
|
print()
|
|||
|
|
|
|||
|
|
# 饱和上限汇总
|
|||
|
|
print("\n" + "=" * 70)
|
|||
|
|
print("饱和上限 K 汇总 (次/年)")
|
|||
|
|
print("=" * 70)
|
|||
|
|
print(f"\n{'数据窗口':<20}", end="")
|
|||
|
|
for model in ["Logistic", "Gompertz", "Richards"]:
|
|||
|
|
print(f"{model:<15}", end="")
|
|||
|
|
print()
|
|||
|
|
print("-" * 65)
|
|||
|
|
|
|||
|
|
for window_name in ["全部数据", "近25年", "近15年", "近10年"]:
|
|||
|
|
results = all_results[window_name][0]
|
|||
|
|
print(f"{window_name:<20}", end="")
|
|||
|
|
for model in ["Logistic", "Gompertz", "Richards"]:
|
|||
|
|
if model in results and results[model]["success"]:
|
|||
|
|
print(f"{results[model]['K']:<15.0f}", end="")
|
|||
|
|
else:
|
|||
|
|
print(f"{'N/A':<15}", end="")
|
|||
|
|
print()
|
|||
|
|
|
|||
|
|
# 物理约束分析
|
|||
|
|
physical_limits = analyze_physical_constraints(n_sites=10)
|
|||
|
|
|
|||
|
|
# 绘制图表
|
|||
|
|
print("\n" + "=" * 70)
|
|||
|
|
print("生成图表...")
|
|||
|
|
plot_model_comparison(df, all_results)
|
|||
|
|
plot_future_predictions(all_results)
|
|||
|
|
plot_physical_vs_model(all_results, physical_limits)
|
|||
|
|
|
|||
|
|
# 保存详细结果到CSV
|
|||
|
|
save_results_csv(all_results)
|
|||
|
|
|
|||
|
|
print("\n" + "=" * 70)
|
|||
|
|
print("分析完成!")
|
|||
|
|
print("=" * 70)
|
|||
|
|
|
|||
|
|
# Bootstrap 不确定性分析
|
|||
|
|
print("\n" + "=" * 70)
|
|||
|
|
print("Bootstrap 不确定性分析 (基于近15年数据, 500次重采样)")
|
|||
|
|
print("=" * 70)
|
|||
|
|
|
|||
|
|
data_15 = windows["近15年"]
|
|||
|
|
base_year = 1957
|
|||
|
|
|
|||
|
|
# Richards 模型 Bootstrap
|
|||
|
|
p0_ric = [2000.0, 0.08, 70.0, 2.0]
|
|||
|
|
bounds_ric = ([300, 0.01, 0, 0.1], [500000, 1.0, 300, 10.0])
|
|||
|
|
|
|||
|
|
boot_result = bootstrap_K_estimate(data_15, richards, p0_ric, bounds_ric, n_bootstrap=500, base_year=base_year)
|
|||
|
|
if boot_result:
|
|||
|
|
print(f"\nRichards 模型 K 值 Bootstrap 估计:")
|
|||
|
|
print(f" 均值: {boot_result['mean']:.0f} 次/年")
|
|||
|
|
print(f" 中位数: {boot_result['median']:.0f} 次/年")
|
|||
|
|
print(f" 标准差: {boot_result['std']:.0f}")
|
|||
|
|
print(f" 90% 置信区间: [{boot_result['ci_5']:.0f}, {boot_result['ci_95']:.0f}] 次/年")
|
|||
|
|
|
|||
|
|
# AIC/BIC 模型选择
|
|||
|
|
print("\n" + "=" * 70)
|
|||
|
|
print("模型选择 (AIC/BIC, 基于近15年数据)")
|
|||
|
|
print("=" * 70)
|
|||
|
|
|
|||
|
|
results_15, years_15, launches_15, t_15, _ = all_results["近15年"]
|
|||
|
|
print(f"\n{'模型':<12} {'AIC':<12} {'BIC':<12} {'参数数':<8}")
|
|||
|
|
print("-" * 44)
|
|||
|
|
|
|||
|
|
for model_name, res in results_15.items():
|
|||
|
|
if res["success"]:
|
|||
|
|
n_params = len(res["params"])
|
|||
|
|
aic, bic = calculate_aic_bic(launches_15, res["y_pred"], n_params)
|
|||
|
|
print(f"{model_name:<12} {aic:<12.1f} {bic:<12.1f} {n_params:<8}")
|
|||
|
|
|
|||
|
|
# 结论
|
|||
|
|
print("\n" + "=" * 70)
|
|||
|
|
print("📊 分析结论与建议")
|
|||
|
|
print("=" * 70)
|
|||
|
|
|
|||
|
|
# 收集有效的K值(排除撞边界的)
|
|||
|
|
valid_k_values = []
|
|||
|
|
for window_name, (results, _, _, _, _) in all_results.items():
|
|||
|
|
for model_name, res in results.items():
|
|||
|
|
if res["success"] and res["K"] < 100000: # 排除撞边界的
|
|||
|
|
valid_k_values.append((window_name, model_name, res["K"]))
|
|||
|
|
|
|||
|
|
print("\n1. 模型适用性分析:")
|
|||
|
|
print("-" * 50)
|
|||
|
|
print(" • Logistic/Gompertz: 数据处于快速增长期,无法确定上限")
|
|||
|
|
print(" (模型撞到边界,说明增长远未饱和)")
|
|||
|
|
print(" • Richards 曲线: 额外的形状参数使其能给出更合理估计")
|
|||
|
|
print(" (推荐用于当前阶段的上限预测)")
|
|||
|
|
|
|||
|
|
print("\n2. 合理的上限估计 (Richards 模型):")
|
|||
|
|
print("-" * 50)
|
|||
|
|
for window_name, model_name, k in valid_k_values:
|
|||
|
|
if model_name == "Richards" and window_name != "全部数据":
|
|||
|
|
print(f" • {window_name}: K ≈ {k:.0f} 次/年")
|
|||
|
|
|
|||
|
|
# Richards 结果的平均值
|
|||
|
|
richards_k = [k for w, m, k in valid_k_values if m == "Richards" and w != "全部数据"]
|
|||
|
|
if richards_k:
|
|||
|
|
print(f"\n 📌 综合估计: {int(np.mean(richards_k)):,} - {int(np.max(richards_k)):,} 次/年")
|
|||
|
|
|
|||
|
|
print("\n3. 物理约束参考 (基于10个主要发射场):")
|
|||
|
|
print("-" * 50)
|
|||
|
|
print(" • 主要发射场数量: 10个")
|
|||
|
|
print(" • 假设每发射场年最大容量:")
|
|||
|
|
print(" - 保守估计: 50次/年 → 总上限 500 次/年")
|
|||
|
|
print(" - 中等估计: 100次/年 → 总上限 1,000 次/年")
|
|||
|
|
print(" - 乐观估计: 200次/年 → 总上限 2,000 次/年")
|
|||
|
|
print(" - 极限估计 (SpaceX级别): 400次/年 → 总上限 4,000 次/年")
|
|||
|
|
print(" • 2025年实际数据: 329次 (10发射场平均 33次/场)")
|
|||
|
|
print(" • SpaceX Starship 单型号目标: 1,000+ 次/年")
|
|||
|
|
|
|||
|
|
print("\n4. 预测不确定性:")
|
|||
|
|
print("-" * 50)
|
|||
|
|
print(" • 当前处于指数增长早期,上限估计高度不确定")
|
|||
|
|
print(" • 建议每年用新数据更新预测")
|
|||
|
|
print(" • 需关注技术突破(可重复使用、星舰等)对上限的影响")
|
|||
|
|
|
|||
|
|
if boot_result:
|
|||
|
|
print(f"\n Bootstrap 90%置信区间: {boot_result['ci_5']:.0f} - {boot_result['ci_95']:.0f} 次/年")
|
|||
|
|
|
|||
|
|
|
|||
|
|
def analyze_physical_constraints(n_sites=10):
|
|||
|
|
"""
|
|||
|
|
基于发射场物理约束的上限分析
|
|||
|
|
|
|||
|
|
参数:
|
|||
|
|
n_sites: 发射场数量
|
|||
|
|
"""
|
|||
|
|
print("\n" + "=" * 70)
|
|||
|
|
print(f"物理约束分析 (基于 {n_sites} 个发射场)")
|
|||
|
|
print("=" * 70)
|
|||
|
|
|
|||
|
|
# 不同情景下的单发射场年容量
|
|||
|
|
scenarios = {
|
|||
|
|
"保守 (当前技术)": 50,
|
|||
|
|
"中等 (技术进步)": 100,
|
|||
|
|
"乐观 (快速周转)": 200,
|
|||
|
|
"极限 (SpaceX级)": 400,
|
|||
|
|
"理论极限 (每天1发)": 365,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
print(f"\n单发射场年发射容量假设:")
|
|||
|
|
print("-" * 50)
|
|||
|
|
|
|||
|
|
results = {}
|
|||
|
|
for scenario, capacity_per_site in scenarios.items():
|
|||
|
|
total_capacity = n_sites * capacity_per_site
|
|||
|
|
results[scenario] = total_capacity
|
|||
|
|
print(f" {scenario:<20}: {capacity_per_site:>3} 次/场/年 → 总上限 {total_capacity:>5,} 次/年")
|
|||
|
|
|
|||
|
|
# 带置信度的综合估计
|
|||
|
|
print(f"\n综合估计 ({n_sites} 发射场):")
|
|||
|
|
print("-" * 50)
|
|||
|
|
print(f" 50% 置信区间: {n_sites * 50:,} - {n_sites * 100:,} 次/年")
|
|||
|
|
print(f" 90% 置信区间: {n_sites * 30:,} - {n_sites * 200:,} 次/年")
|
|||
|
|
print(f" 理论最大值: {n_sites * 365:,} 次/年 (每天每场1发)")
|
|||
|
|
|
|||
|
|
return results
|
|||
|
|
|
|||
|
|
|
|||
|
|
def save_results_csv(all_results, filepath="launch_capacity_results.csv"):
|
|||
|
|
"""保存详细结果到CSV"""
|
|||
|
|
rows = []
|
|||
|
|
for window_name, (results, _, _, _, _) in all_results.items():
|
|||
|
|
for model_name, res in results.items():
|
|||
|
|
if res["success"]:
|
|||
|
|
row = {
|
|||
|
|
"数据窗口": window_name,
|
|||
|
|
"模型": model_name,
|
|||
|
|
"饱和上限K": res["K"],
|
|||
|
|
"拐点年份": res["inflection_year"],
|
|||
|
|
"拐点发射数": res["inflection_value"],
|
|||
|
|
"R²": res["r_squared"],
|
|||
|
|
"RMSE": res["rmse"]
|
|||
|
|
}
|
|||
|
|
rows.append(row)
|
|||
|
|
|
|||
|
|
df_results = pd.DataFrame(rows)
|
|||
|
|
df_results.to_csv(filepath, index=False, encoding="utf-8-sig")
|
|||
|
|
print(f"详细结果已保存: {filepath}")
|
|||
|
|
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
main()
|