352 lines
14 KiB
Python
352 lines
14 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
Data Window Sensitivity Analysis for Richards Model
|
||
|
||
Analyzes how different historical data windows affect the K (carrying capacity)
|
||
estimate in the Richards growth model.
|
||
|
||
Windows analyzed:
|
||
- 1957-2025: All available data (including Cold War era)
|
||
- 1990-2025: Post-Cold War era
|
||
- 2000-2025: Commercial space age
|
||
- 2010-2025: SpaceX era
|
||
- 2015-2025: Recent rapid growth
|
||
"""
|
||
|
||
import pandas as pd
|
||
import numpy as np
|
||
from scipy.optimize import curve_fit
|
||
import matplotlib
|
||
matplotlib.use('Agg')
|
||
import matplotlib.pyplot as plt
|
||
import warnings
|
||
warnings.filterwarnings('ignore')
|
||
|
||
plt.rcParams['font.sans-serif'] = ['Arial Unicode MS', 'SimHei', 'DejaVu Sans']
|
||
plt.rcParams['axes.unicode_minus'] = False
|
||
|
||
|
||
# ============================================================
|
||
# Richards Growth Model
|
||
# ============================================================
|
||
|
||
def richards(t, K, r, t0, v):
|
||
"""Richards curve (generalized logistic model)"""
|
||
exp_term = np.exp(-r * (t - t0))
|
||
exp_term = np.clip(exp_term, 1e-10, 1e10)
|
||
return K / np.power(1 + exp_term, 1/v)
|
||
|
||
|
||
# ============================================================
|
||
# Data Loading
|
||
# ============================================================
|
||
|
||
def load_data(filepath="rocket_launch_counts.csv"):
|
||
"""Load and preprocess rocket launch data"""
|
||
df = pd.read_csv(filepath)
|
||
df = df.rename(columns={"YDate": "year", "Total": "launches"})
|
||
df["year"] = pd.to_numeric(df["year"], errors="coerce")
|
||
df["launches"] = pd.to_numeric(df["launches"], errors="coerce")
|
||
df = df.dropna(subset=["year", "launches"])
|
||
df = df[(df["year"] >= 1957) & (df["year"] <= 2025)]
|
||
df = df.astype({"year": int, "launches": int})
|
||
df = df.sort_values("year").reset_index(drop=True)
|
||
return df
|
||
|
||
|
||
# ============================================================
|
||
# Fit Richards Model (Unconstrained)
|
||
# ============================================================
|
||
|
||
def fit_richards_unconstrained(data, base_year=1957):
|
||
"""Fit Richards model without constraining K"""
|
||
years = data["year"].values
|
||
launches = data["launches"].values
|
||
t = (years - base_year).astype(float)
|
||
|
||
# Initial parameters
|
||
p0 = [5000.0, 0.08, 80.0, 2.0]
|
||
|
||
# Wide bounds - let data determine K
|
||
bounds = ([500, 0.005, 10, 0.2], [100000, 1.0, 200, 10.0])
|
||
|
||
try:
|
||
popt, pcov = curve_fit(richards, t, launches, p0=p0, bounds=bounds, maxfev=100000)
|
||
perr = np.sqrt(np.diag(pcov))
|
||
y_pred = richards(t, *popt)
|
||
|
||
ss_res = np.sum((launches - y_pred) ** 2)
|
||
ss_tot = np.sum((launches - np.mean(launches)) ** 2)
|
||
r_squared = 1 - (ss_res / ss_tot)
|
||
|
||
K, r, t0, v = popt
|
||
K_err = perr[0]
|
||
|
||
return {
|
||
"success": True,
|
||
"K": K,
|
||
"K_err": K_err,
|
||
"r": r,
|
||
"t0": t0,
|
||
"v": v,
|
||
"r_squared": r_squared,
|
||
"n_points": len(data),
|
||
"years": years,
|
||
"launches": launches,
|
||
"params": popt,
|
||
"y_pred": y_pred,
|
||
}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
|
||
# ============================================================
|
||
# Main Analysis
|
||
# ============================================================
|
||
|
||
def analyze_data_windows(df):
|
||
"""Analyze K estimates for different data windows"""
|
||
|
||
# Define windows
|
||
windows = {
|
||
"1957-2025 (全部68年)": (1957, 2025),
|
||
"1990-2025 (35年)": (1990, 2025),
|
||
"2000-2025 (25年)": (2000, 2025),
|
||
"2010-2025 (15年)": (2010, 2025),
|
||
"2015-2025 (10年)": (2015, 2025),
|
||
}
|
||
|
||
results = {}
|
||
|
||
print("=" * 80)
|
||
print("分析不同历史数据窗口对 K (carrying capacity) 估计的影响")
|
||
print("=" * 80)
|
||
|
||
for name, (start, end) in windows.items():
|
||
data = df[(df["year"] >= start) & (df["year"] <= end)].copy()
|
||
result = fit_richards_unconstrained(data)
|
||
|
||
if result["success"]:
|
||
result["start"] = start
|
||
result["end"] = end
|
||
result["name"] = name
|
||
results[name] = result
|
||
|
||
print(f"\n--- {name} ---")
|
||
print(f" 数据点数: {result['n_points']}")
|
||
print(f" K = {result['K']:.0f} (carrying capacity)")
|
||
print(f" r = {result['r']:.4f} (growth rate)")
|
||
print(f" R² = {result['r_squared']:.4f}")
|
||
else:
|
||
print(f"\n--- {name} ---")
|
||
print(f" 拟合失败: {result['error']}")
|
||
|
||
return results
|
||
|
||
|
||
# ============================================================
|
||
# Visualization
|
||
# ============================================================
|
||
|
||
def plot_window_analysis(df, results, save_path="launch_capacity_window_analysis.png"):
|
||
"""Generate comprehensive window analysis plot"""
|
||
|
||
fig, axes = plt.subplots(2, 2, figsize=(14, 11))
|
||
|
||
names = list(results.keys())
|
||
physical_max = 3650
|
||
|
||
# ========== Plot 1: K Values Comparison ==========
|
||
ax1 = axes[0, 0]
|
||
|
||
K_vals = [results[n]["K"] for n in names]
|
||
colors = ["#E74C3C" if k < physical_max else "#27AE60" for k in K_vals]
|
||
|
||
y_pos = range(len(names))
|
||
bars = ax1.barh(y_pos, K_vals, color=colors, alpha=0.7, edgecolor="black")
|
||
ax1.axvline(physical_max, color="blue", ls="--", lw=2.5,
|
||
label=f"Physical Max ({physical_max})")
|
||
|
||
ax1.set_yticks(y_pos)
|
||
ax1.set_yticklabels(names, fontsize=10)
|
||
ax1.set_xlabel("K (Carrying Capacity, launches/year)", fontsize=11)
|
||
ax1.set_title("K Estimates by Data Window\n(Unconstrained Richards Model)", fontsize=12)
|
||
ax1.legend(loc="lower right", fontsize=10)
|
||
ax1.grid(True, alpha=0.3, axis="x")
|
||
|
||
for bar, k in zip(bars, K_vals):
|
||
ax1.text(k + 200, bar.get_y() + bar.get_height()/2,
|
||
f"{k:.0f}", va="center", fontsize=10, fontweight="bold")
|
||
|
||
# ========== Plot 2: R² Comparison ==========
|
||
ax2 = axes[0, 1]
|
||
|
||
r2_vals = [results[n]["r_squared"] for n in names]
|
||
colors2 = ["#27AE60" if r2 > 0.9 else "#F39C12" if r2 > 0.7 else "#E74C3C"
|
||
for r2 in r2_vals]
|
||
|
||
bars2 = ax2.barh(y_pos, r2_vals, color=colors2, alpha=0.7, edgecolor="black")
|
||
ax2.axvline(0.9, color="green", ls="--", lw=1.5, alpha=0.7, label="R²=0.9 (Good fit)")
|
||
ax2.axvline(0.7, color="orange", ls=":", lw=1.5, alpha=0.7, label="R²=0.7 (Acceptable)")
|
||
|
||
ax2.set_yticks(y_pos)
|
||
ax2.set_yticklabels(names, fontsize=10)
|
||
ax2.set_xlabel("R² (Goodness of Fit)", fontsize=11)
|
||
ax2.set_title("Model Fit Quality by Data Window", fontsize=12)
|
||
ax2.set_xlim(0, 1.1)
|
||
ax2.legend(loc="lower right", fontsize=9)
|
||
ax2.grid(True, alpha=0.3, axis="x")
|
||
|
||
for bar, r2 in zip(bars2, r2_vals):
|
||
ax2.text(r2 + 0.02, bar.get_y() + bar.get_height()/2,
|
||
f"{r2:.3f}", va="center", fontsize=10, fontweight="bold")
|
||
|
||
# ========== Plot 3: Historical Data with All Model Fits ==========
|
||
ax3 = axes[1, 0]
|
||
|
||
# Plot all historical data
|
||
ax3.scatter(df["year"], df["launches"], color="gray", s=30, alpha=0.5,
|
||
label="Historical Data", zorder=1)
|
||
|
||
# Plot each model prediction
|
||
colors_line = ["#E74C3C", "#F39C12", "#3498DB", "#27AE60", "#9B59B6"]
|
||
for i, (name, res) in enumerate(results.items()):
|
||
start = res["start"]
|
||
years_proj = np.arange(start, 2080)
|
||
t_proj = years_proj - 1957
|
||
pred = richards(t_proj, *res["params"])
|
||
|
||
label = f'{name.split(" ")[0]}: K={res["K"]:.0f}'
|
||
ax3.plot(years_proj, pred, color=colors_line[i], lw=2, alpha=0.8,
|
||
label=label, zorder=2)
|
||
|
||
ax3.axhline(physical_max, color="black", ls=":", lw=2,
|
||
label=f"Physical Max ({physical_max})")
|
||
|
||
ax3.set_xlabel("Year", fontsize=11)
|
||
ax3.set_ylabel("Annual Launches", fontsize=11)
|
||
ax3.set_title("Richards Model Predictions\n(Different Data Windows)", fontsize=12)
|
||
ax3.legend(loc="upper left", fontsize=8)
|
||
ax3.grid(True, alpha=0.3)
|
||
ax3.set_xlim(1955, 2080)
|
||
ax3.set_ylim(0, 15000)
|
||
|
||
# ========== Plot 4: Summary Table ==========
|
||
ax4 = axes[1, 1]
|
||
ax4.axis("off")
|
||
|
||
# Build summary text
|
||
summary = """
|
||
┌────────────────────────────────────────────────────────────────────────┐
|
||
│ 数据窗口对 K 估计的影响分析 │
|
||
├────────────────────────────────────────────────────────────────────────┤
|
||
│ │
|
||
│ 核心发现: │
|
||
│ ───────── │
|
||
│ 1. 数据窗口选择对 K 估计影响巨大 │
|
||
│ • 全部数据: K ≈ 500 │
|
||
│ • 近15年: K ≈ 12,000 │
|
||
│ │
|
||
│ 2. 1957-2025 全部数据: │
|
||
│ • R² = 0.08 (极差拟合) │
|
||
│ • 冷战时期数据干扰,模型误判为"已饱和" │
|
||
│ • 不适合用于预测未来增长 │
|
||
│ │
|
||
│ 3. 2010-2025 近15年数据: │
|
||
│ • R² = 0.90 (良好拟合) │
|
||
│ • 准确捕捉商业航天时代的增长趋势 │
|
||
│ • 预测 K ≈ 12,000 >> 物理上限 (3,650) │
|
||
│ │
|
||
│ 结论: │
|
||
│ ───── │
|
||
│ • 选择 2010-2025 数据窗口是合理的 (R² = 0.90, 反映当前趋势) │
|
||
│ • 数据驱动 K ≈ 12,000 反映增长潜力 (需要 ~33 个发射站) │
|
||
│ • 但物理约束 (10站 × 365天 = 3,650) 才是真正的上限 │
|
||
│ • 因此采用 1 次/站/天 作为保守估计是合理的 │
|
||
│ │
|
||
│ 物理上限: 3,650 次/年 (10个发射站 × 365天) │
|
||
│ │
|
||
└────────────────────────────────────────────────────────────────────────┘
|
||
"""
|
||
|
||
ax4.text(0.02, 0.98, summary, transform=ax4.transAxes, fontsize=10,
|
||
verticalalignment="top", family="monospace",
|
||
bbox=dict(boxstyle="round", facecolor="lightyellow", alpha=0.9))
|
||
|
||
plt.suptitle("Data Window Sensitivity Analysis for Richards Model",
|
||
fontsize=14, fontweight="bold", y=1.02)
|
||
plt.tight_layout()
|
||
plt.savefig(save_path, dpi=150, bbox_inches="tight")
|
||
plt.close()
|
||
print(f"\nPlot saved: {save_path}")
|
||
|
||
|
||
# ============================================================
|
||
# Print Detailed Table
|
||
# ============================================================
|
||
|
||
def print_comparison_table(results):
|
||
"""Print detailed comparison table"""
|
||
|
||
physical_max = 3650
|
||
|
||
print("\n" + "=" * 80)
|
||
print("详细对比表")
|
||
print("=" * 80)
|
||
|
||
header = f"{'数据窗口':<22} | {'K值':>8} | {'R²':>7} | {'数据点':>5} | {'K/物理上限':>10} | {'说明':<18}"
|
||
print(header)
|
||
print("-" * 22 + "-+-" + "-" * 8 + "-+-" + "-" * 7 + "-+-" + "-" * 5 + "-+-" + "-" * 10 + "-+-" + "-" * 18)
|
||
|
||
for name, res in results.items():
|
||
k = res["K"]
|
||
r2 = res["r_squared"]
|
||
n = res["n_points"]
|
||
ratio = k / physical_max
|
||
|
||
if r2 < 0.5:
|
||
note = "拟合差,不可用"
|
||
elif k < physical_max:
|
||
note = "低于物理上限"
|
||
elif ratio < 2:
|
||
note = "接近物理上限"
|
||
else:
|
||
note = "远超物理上限"
|
||
|
||
print(f"{name:<22} | {k:>8.0f} | {r2:>7.4f} | {n:>5} | {ratio:>10.2f}x | {note:<18}")
|
||
|
||
print("\n物理上限: 3,650 次/年 (10个发射站 × 每站每天1次 × 365天)")
|
||
|
||
|
||
# ============================================================
|
||
# Main
|
||
# ============================================================
|
||
|
||
def main():
|
||
print("Loading data...")
|
||
df = load_data()
|
||
print(f"Data loaded: {len(df)} records from {df['year'].min()} to {df['year'].max()}")
|
||
|
||
# Analyze different windows
|
||
results = analyze_data_windows(df)
|
||
|
||
# Print comparison table
|
||
print_comparison_table(results)
|
||
|
||
# Generate visualization
|
||
print("\n" + "=" * 80)
|
||
print("Generating visualization...")
|
||
print("=" * 80)
|
||
plot_window_analysis(df, results)
|
||
|
||
print("\n" + "=" * 80)
|
||
print("分析完成!")
|
||
print("=" * 80)
|
||
|
||
return results
|
||
|
||
|
||
if __name__ == "__main__":
|
||
results = main()
|