Files
2026_mcm_b/p1/pareto_optimization.py
2026-01-31 18:00:43 +08:00

1028 lines
39 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Moon Colony Logistics - Pareto Front & Knee Point Analysis
Multi-objective optimization for Space Elevator + Rocket combination:
- Objective 1: Minimize completion time T
- Objective 2: Minimize total energy E
Finds the Pareto front and identifies the "knee point" as the optimal trade-off.
"""
import numpy as np
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from matplotlib import rcParams
from dataclasses import dataclass
from typing import List, Tuple, Dict, Optional
import pandas as pd
# 设置字体
rcParams['font.sans-serif'] = ['Arial Unicode MS', 'DejaVu Sans', 'SimHei']
rcParams['axes.unicode_minus'] = False
# ============== 物理常数与参数 ==============
G0 = 9.81 # m/s²
OMEGA_EARTH = 7.27e-5 # rad/s
R_EARTH = 6.371e6 # m
# 任务参数
TOTAL_PAYLOAD = 100e6 # 100 million metric tons
# 太空电梯参数
NUM_ELEVATORS = 3
ELEVATOR_CAPACITY_PER_YEAR = 179000 # metric tons per elevator per year
TOTAL_ELEVATOR_CAPACITY = NUM_ELEVATORS * ELEVATOR_CAPACITY_PER_YEAR # 537,000 tons/year
ELEVATOR_SPECIFIC_ENERGY = 157.2e9 # J per metric ton (157.2 MJ/kg * 1000)
# 火箭参数
PAYLOAD_PER_LAUNCH = 125 # metric tons per launch
ISP = 450
SPECIFIC_FUEL_ENERGY = 15.5e6 # J/kg
ALPHA = 0.10
NUM_STAGES = 3
DELTA_V_BASE = 13300 # m/s
# ============== 发射场定义 ==============
@dataclass
class LaunchSite:
name: str
short_name: str
latitude: float
max_launches_per_day: int = 1
@property
def abs_latitude(self) -> float:
return abs(self.latitude)
@property
def delta_v_loss(self) -> float:
v_equator = OMEGA_EARTH * R_EARTH
v_site = OMEGA_EARTH * R_EARTH * np.cos(np.radians(self.abs_latitude))
return v_equator - v_site
@property
def total_delta_v(self) -> float:
return DELTA_V_BASE + self.delta_v_loss
LAUNCH_SITES = sorted([
LaunchSite("Kourou (French Guiana)", "Kourou", 5.2),
LaunchSite("Satish Dhawan (India)", "SDSC", 13.7),
LaunchSite("Boca Chica (Texas)", "Texas", 26.0),
LaunchSite("Cape Canaveral (Florida)", "Florida", 28.5),
LaunchSite("Vandenberg (California)", "California", 34.7),
LaunchSite("Wallops (Virginia)", "Virginia", 37.8),
LaunchSite("Taiyuan (China)", "Taiyuan", 38.8),
LaunchSite("Mahia (New Zealand)", "Mahia", 39.3),
LaunchSite("Baikonur (Kazakhstan)", "Baikonur", 45.6),
LaunchSite("Kodiak (Alaska)", "Alaska", 57.4),
], key=lambda x: x.abs_latitude)
# ============== 核心计算函数 ==============
def fuel_ratio_multistage(delta_v: float) -> float:
"""多级火箭燃料/载荷比"""
ve = ISP * G0
delta_v_per_stage = delta_v / NUM_STAGES
R_stage = np.exp(delta_v_per_stage / ve)
denominator = 1 - ALPHA * (R_stage - 1)
if denominator <= 0:
return np.inf
k_stage = (R_stage - 1) / denominator
total_fuel_ratio = 0
remaining_ratio = 1.0
for _ in range(NUM_STAGES):
fuel_this_stage = remaining_ratio * k_stage
total_fuel_ratio += fuel_this_stage
remaining_ratio *= (1 + k_stage * (1 + ALPHA))
return total_fuel_ratio
def rocket_energy_per_ton(site: LaunchSite) -> float:
"""火箭发射每吨载荷的能量消耗 (J/ton)"""
k = fuel_ratio_multistage(site.total_delta_v)
fuel_per_ton = k * 1000 # kg fuel per metric ton payload
return fuel_per_ton * SPECIFIC_FUEL_ENERGY
def calculate_scenario(completion_years: float) -> Optional[Dict]:
"""
计算给定完成年限下的最优方案(电梯优先+低纬火箭)
返回: 方案详情字典如果无法完成则返回None
"""
# 太空电梯运输量
elevator_payload = min(TOTAL_ELEVATOR_CAPACITY * completion_years, TOTAL_PAYLOAD)
elevator_energy = elevator_payload * ELEVATOR_SPECIFIC_ENERGY
# 剩余需要火箭运输的载荷
remaining_payload = TOTAL_PAYLOAD - elevator_payload
if remaining_payload <= 0:
# 完全由电梯完成
return {
'years': completion_years,
'elevator_payload': elevator_payload,
'rocket_payload': 0,
'elevator_energy_PJ': elevator_energy / 1e15,
'rocket_energy_PJ': 0,
'total_energy_PJ': elevator_energy / 1e15,
'rocket_launches': 0,
'sites_used': 0,
'elevator_fraction': 1.0,
}
# 需要火箭发射
rocket_launches_needed = int(np.ceil(remaining_payload / PAYLOAD_PER_LAUNCH))
# 按纬度优先分配火箭发射
days_available = completion_years * 365
max_launches_per_site = int(days_available)
# 检查是否能在规定时间内完成
total_rocket_capacity = len(LAUNCH_SITES) * max_launches_per_site * PAYLOAD_PER_LAUNCH
if remaining_payload > total_rocket_capacity:
return None # 无法完成
rocket_energy = 0
sites_used = 0
remaining_launches = rocket_launches_needed
for site in LAUNCH_SITES:
if remaining_launches <= 0:
break
allocated = min(remaining_launches, max_launches_per_site)
rocket_energy += rocket_energy_per_ton(site) * PAYLOAD_PER_LAUNCH * allocated
remaining_launches -= allocated
if allocated > 0:
sites_used += 1
rocket_payload = rocket_launches_needed * PAYLOAD_PER_LAUNCH
total_energy = elevator_energy + rocket_energy
return {
'years': completion_years,
'elevator_payload': elevator_payload,
'rocket_payload': rocket_payload,
'elevator_energy_PJ': elevator_energy / 1e15,
'rocket_energy_PJ': rocket_energy / 1e15,
'total_energy_PJ': total_energy / 1e15,
'rocket_launches': rocket_launches_needed,
'sites_used': sites_used,
'elevator_fraction': elevator_payload / TOTAL_PAYLOAD,
}
# ============== Pareto 前沿计算 ==============
def generate_pareto_front(
year_min: float = 100,
year_max: float = 300,
num_points: int = 500
) -> pd.DataFrame:
"""
生成 Pareto 前沿数据
对于此问题,由于"时间越长→能量越低"是单调关系,
所有可行解都在Pareto前沿上这是一个双目标单调权衡问题
"""
years_range = np.linspace(year_min, year_max, num_points)
results = []
for years in years_range:
scenario = calculate_scenario(years)
if scenario is not None:
results.append({
'years': years,
'energy_PJ': scenario['total_energy_PJ'],
'elevator_fraction': scenario['elevator_fraction'],
'sites_used': scenario['sites_used'],
'rocket_launches': scenario['rocket_launches'],
})
return pd.DataFrame(results)
# ============== 膝点检测算法 ==============
def find_knee_point_max_curvature(df: pd.DataFrame) -> int:
"""
方法1: 最大曲率法
在归一化后的 (T, E) 曲线上找曲率最大的点。
曲率 κ = |y''| / (1 + y'^2)^(3/2)
"""
# 归一化
T = df['years'].values
E = df['energy_PJ'].values
T_norm = (T - T.min()) / (T.max() - T.min())
E_norm = (E - E.min()) / (E.max() - E.min())
# 计算一阶和二阶导数(使用有限差分)
dE = np.gradient(E_norm, T_norm)
d2E = np.gradient(dE, T_norm)
# 计算曲率
curvature = np.abs(d2E) / np.power(1 + dE**2, 1.5)
# 排除边界点
curvature[:5] = 0
curvature[-5:] = 0
knee_idx = np.argmax(curvature)
return knee_idx
def find_knee_point_max_distance(df: pd.DataFrame) -> int:
"""
方法2: 最大距离法L-method / Kneedle思想
找到距离"起点-终点连线"最远的点。
这通常对应于"边际收益递减"最明显的拐点。
"""
T = df['years'].values
E = df['energy_PJ'].values
# 归一化到 [0, 1]
T_norm = (T - T.min()) / (T.max() - T.min())
E_norm = (E - E.min()) / (E.max() - E.min())
# 起点和终点
p1 = np.array([T_norm[0], E_norm[0]])
p2 = np.array([T_norm[-1], E_norm[-1]])
# 计算每个点到直线 p1-p2 的距离
line_vec = p2 - p1
line_len = np.linalg.norm(line_vec)
line_unit = line_vec / line_len
distances = []
for i in range(len(T)):
point = np.array([T_norm[i], E_norm[i]])
point_vec = point - p1
# 投影长度
proj_len = np.dot(point_vec, line_unit)
# 垂直距离
proj_point = p1 + proj_len * line_unit
dist = np.linalg.norm(point - proj_point)
distances.append(dist)
distances = np.array(distances)
knee_idx = np.argmax(distances)
return knee_idx
def find_knee_point_marginal(df: pd.DataFrame, threshold: float = 0.1) -> int:
"""
方法3: 边际收益法
找到"多投入1年时间能量下降比例"开始低于阈值的点。
即:|dE/dT| / |E| < threshold 的第一个点
"""
T = df['years'].values
E = df['energy_PJ'].values
# 计算边际能量变化率
dE_dT = np.gradient(E, T)
# 相对变化率
marginal_ratio = np.abs(dE_dT) / E
# 找到第一个低于阈值的点(跳过前面几个点)
for i in range(10, len(marginal_ratio)):
if marginal_ratio[i] < threshold:
return i
return len(df) // 2 # 默认返回中点
def find_knee_point_elbow(df: pd.DataFrame) -> int:
"""
方法4: 肘部法则Elbow Method
类似K-means的肘部法则找到曲线"弯曲"最明显的点。
使用二阶导数的符号变化或绝对值。
"""
T = df['years'].values
E = df['energy_PJ'].values
# 归一化
T_norm = (T - T.min()) / (T.max() - T.min() + 1e-10)
E_norm = (E - E.min()) / (E.max() - E.min() + 1e-10)
# 计算二阶差分
d2E = np.diff(np.diff(E_norm))
# 找到绝对值最大的点
knee_idx = np.argmax(np.abs(d2E)) + 1
return min(knee_idx, len(df) - 1)
# ============== 综合膝点分析 ==============
def analyze_knee_points(df: pd.DataFrame) -> Dict:
"""
使用多种方法找膝点,并综合给出推荐
"""
methods = {
'max_curvature': find_knee_point_max_curvature(df),
'max_distance': find_knee_point_max_distance(df),
'elbow': find_knee_point_elbow(df),
}
# 每种方法对应的年份和能量
results = {}
for method, idx in methods.items():
results[method] = {
'index': idx,
'years': df.iloc[idx]['years'],
'energy_PJ': df.iloc[idx]['energy_PJ'],
'elevator_fraction': df.iloc[idx]['elevator_fraction'],
'sites_used': df.iloc[idx]['sites_used'],
}
# 推荐:使用最大距离法(最稳健)
recommended = results['max_distance']
return {
'methods': results,
'recommended': recommended,
}
# ============== 可视化 ==============
def plot_pareto_analysis(
df: pd.DataFrame,
knee_analysis: Dict,
save_path: str = '/Volumes/Files/code/mm/20260130_b/p1/pareto_knee_analysis.png'
):
"""
绘制 Pareto 前沿与膝点分析图
"""
fig, axes = plt.subplots(2, 2, figsize=(16, 14))
# ========== 图1: Pareto 前沿(时间 vs 能量) ==========
ax1 = axes[0, 0]
ax1.plot(df['years'], df['energy_PJ'], 'b-', linewidth=2, label='Pareto Front')
ax1.fill_between(df['years'], df['energy_PJ'], alpha=0.2)
# 标记各方法找到的膝点
colors = {'max_curvature': 'red', 'max_distance': 'green', 'elbow': 'orange'}
markers = {'max_curvature': 'o', 'max_distance': 's', 'elbow': '^'}
for method, data in knee_analysis['methods'].items():
ax1.scatter(data['years'], data['energy_PJ'],
c=colors[method], marker=markers[method], s=150, zorder=5,
label=f'{method}: {data["years"]:.0f}y, {data["energy_PJ"]:.0f}PJ')
# 标记推荐点
rec = knee_analysis['recommended']
ax1.axvline(x=rec['years'], color='green', linestyle='--', alpha=0.7)
ax1.axhline(y=rec['energy_PJ'], color='green', linestyle='--', alpha=0.7)
ax1.set_xlabel('Completion Time (years)', fontsize=12)
ax1.set_ylabel('Total Energy (PJ)', fontsize=12)
ax1.set_title('Pareto Front: Time vs Energy Trade-off\n(Knee Points by Different Methods)', fontsize=13)
ax1.legend(loc='upper right', fontsize=9)
ax1.grid(True, alpha=0.3)
# ========== 图2: 归一化 Pareto 前沿 + 距离可视化 ==========
ax2 = axes[0, 1]
T = df['years'].values
E = df['energy_PJ'].values
T_norm = (T - T.min()) / (T.max() - T.min())
E_norm = (E - E.min()) / (E.max() - E.min())
ax2.plot(T_norm, E_norm, 'b-', linewidth=2, label='Normalized Pareto Front')
# 画起点-终点连线
ax2.plot([T_norm[0], T_norm[-1]], [E_norm[0], E_norm[-1]],
'r--', linewidth=1.5, label='Baseline (Start-End)')
# 标记最大距离点
knee_idx = knee_analysis['methods']['max_distance']['index']
ax2.scatter(T_norm[knee_idx], E_norm[knee_idx],
c='green', marker='s', s=200, zorder=5, label='Knee Point (Max Distance)')
# 画垂直距离线
p1 = np.array([T_norm[0], E_norm[0]])
p2 = np.array([T_norm[-1], E_norm[-1]])
line_vec = p2 - p1
line_unit = line_vec / np.linalg.norm(line_vec)
point = np.array([T_norm[knee_idx], E_norm[knee_idx]])
proj_len = np.dot(point - p1, line_unit)
proj_point = p1 + proj_len * line_unit
ax2.plot([T_norm[knee_idx], proj_point[0]], [E_norm[knee_idx], proj_point[1]],
'g-', linewidth=2, label='Max Distance')
ax2.set_xlabel('Normalized Time', fontsize=12)
ax2.set_ylabel('Normalized Energy', fontsize=12)
ax2.set_title('Knee Point Detection\n(Maximum Distance to Baseline)', fontsize=13)
ax2.legend(fontsize=9)
ax2.grid(True, alpha=0.3)
ax2.set_xlim(-0.05, 1.05)
ax2.set_ylim(-0.05, 1.05)
# ========== 图3: 边际收益分析 ==========
ax3 = axes[1, 0]
# 计算边际能量变化
dE_dT = np.gradient(E, T)
marginal_savings = -dE_dT # 每多1年节省的能量
ax3.plot(T, marginal_savings, 'purple', linewidth=2)
ax3.fill_between(T, marginal_savings, alpha=0.2, color='purple')
# 标记推荐点
ax3.axvline(x=rec['years'], color='green', linestyle='--', linewidth=2,
label=f'Recommended: {rec["years"]:.0f} years')
ax3.set_xlabel('Completion Time (years)', fontsize=12)
ax3.set_ylabel('Marginal Energy Saving (PJ/year)', fontsize=12)
ax3.set_title('Marginal Benefit Analysis\n(Energy Saved per Additional Year)', fontsize=13)
ax3.legend()
ax3.grid(True, alpha=0.3)
# ========== 图4: 方案构成随时间变化 ==========
ax4 = axes[1, 1]
ax4.fill_between(df['years'], 0, df['elevator_fraction'] * 100,
alpha=0.7, color='green', label='Space Elevator')
ax4.fill_between(df['years'], df['elevator_fraction'] * 100, 100,
alpha=0.7, color='red', label='Rockets')
# 标记推荐点
ax4.axvline(x=rec['years'], color='black', linestyle='--', linewidth=2,
label=f'Recommended: {rec["years"]:.0f}y ({rec["elevator_fraction"]*100:.0f}% elevator)')
ax4.set_xlabel('Completion Time (years)', fontsize=12)
ax4.set_ylabel('Payload Share (%)', fontsize=12)
ax4.set_title('Payload Distribution at Different Timelines', fontsize=13)
ax4.legend(loc='center right')
ax4.set_ylim(0, 100)
ax4.grid(True, alpha=0.3)
plt.suptitle('Pareto Optimization: Space Elevator + Rocket Combination\n'
f'Recommended Knee Point: {rec["years"]:.0f} years, {rec["energy_PJ"]:.0f} PJ',
fontsize=15, y=1.02)
plt.tight_layout()
plt.savefig(save_path, dpi=150, bbox_inches='tight')
print(f"Pareto分析图已保存至: {save_path}")
return fig
def plot_decision_recommendation(
df: pd.DataFrame,
knee_analysis: Dict,
save_path: str = '/Volumes/Files/code/mm/20260130_b/p1/decision_recommendation.png'
):
"""
绘制决策建议图(更适合放在报告中)
"""
rec = knee_analysis['recommended']
fig, ax = plt.subplots(figsize=(12, 8))
# 主曲线
ax.plot(df['years'], df['energy_PJ'], 'b-', linewidth=3, label='Energy-Time Trade-off')
# 填充区域
ax.fill_between(df['years'], df['energy_PJ'], alpha=0.15, color='blue')
# 标记推荐点
ax.scatter(rec['years'], rec['energy_PJ'], c='red', s=300, zorder=5,
marker='*', edgecolors='black', linewidth=2)
# 标注推荐点
ax.annotate(f"RECOMMENDED\n{rec['years']:.0f} years\n{rec['energy_PJ']:.0f} PJ\n"
f"({rec['elevator_fraction']*100:.0f}% Elevator)",
xy=(rec['years'], rec['energy_PJ']),
xytext=(rec['years'] + 30, rec['energy_PJ'] + 3000),
fontsize=12, fontweight='bold',
bbox=dict(boxstyle='round,pad=0.5', facecolor='yellow', alpha=0.8),
arrowprops=dict(arrowstyle='->', connectionstyle='arc3,rad=0.2',
color='black', lw=2))
# 标记几个关键时间点
key_points = [
(100, 'Minimum\nfeasible'),
(186, 'Elevator-only\nfeasible'),
(219, 'Rocket-only\nfeasible'),
]
for year, label in key_points:
if year >= df['years'].min() and year <= df['years'].max():
idx = np.argmin(np.abs(df['years'] - year))
energy = df.iloc[idx]['energy_PJ']
ax.axvline(x=year, color='gray', linestyle=':', alpha=0.7)
ax.text(year, ax.get_ylim()[1] * 0.95, label, ha='center', fontsize=9,
bbox=dict(facecolor='white', alpha=0.7))
ax.set_xlabel('Completion Time (years)', fontsize=14)
ax.set_ylabel('Total Energy Consumption (PJ)', fontsize=14)
ax.set_title('Optimal Decision Point for Moon Colony Construction\n'
'(100 Million Metric Tons Payload)', fontsize=16)
ax.grid(True, alpha=0.3)
# 添加文字说明
textstr = (f"Knee Point Analysis Result:\n"
f"• Optimal Timeline: {rec['years']:.0f} years\n"
f"• Total Energy: {rec['energy_PJ']:.0f} PJ\n"
f"• Elevator Share: {rec['elevator_fraction']*100:.1f}%\n"
f"• Rocket Sites Used: {rec['sites_used']}")
props = dict(boxstyle='round', facecolor='lightgreen', alpha=0.8)
ax.text(0.98, 0.55, textstr, transform=ax.transAxes, fontsize=11,
verticalalignment='top', horizontalalignment='right', bbox=props)
plt.tight_layout()
plt.savefig(save_path, dpi=150, bbox_inches='tight')
print(f"决策建议图已保存至: {save_path}")
return fig
def print_analysis_report(df: pd.DataFrame, knee_analysis: Dict):
"""
打印详细分析报告
"""
print("\n" + "=" * 80)
print("PARETO FRONT & KNEE POINT ANALYSIS REPORT")
print("Moon Colony Logistics Optimization")
print("=" * 80)
print(f"\n1. PROBLEM DEFINITION")
print(f" - Total payload: {TOTAL_PAYLOAD/1e6:.0f} million metric tons")
print(f" - Elevator capacity: {TOTAL_ELEVATOR_CAPACITY:,} tons/year")
print(f" - Rocket capacity: {len(LAUNCH_SITES) * 365 * PAYLOAD_PER_LAUNCH:,} tons/year")
print(f"\n2. PARETO FRONT RANGE")
print(f" - Minimum feasible time: {df['years'].min():.1f} years")
print(f" - Analysis range: {df['years'].min():.0f} - {df['years'].max():.0f} years")
print(f" - Energy range: {df['energy_PJ'].min():.0f} - {df['energy_PJ'].max():.0f} PJ")
print(f"\n3. KNEE POINT DETECTION (Multiple Methods)")
print("-" * 60)
print(f" {'Method':<20} {'Years':>10} {'Energy (PJ)':>15} {'Elev. %':>10}")
print("-" * 60)
for method, data in knee_analysis['methods'].items():
print(f" {method:<20} {data['years']:>10.0f} {data['energy_PJ']:>15.0f} "
f"{data['elevator_fraction']*100:>9.1f}%")
print("-" * 60)
rec = knee_analysis['recommended']
print(f"\n4. RECOMMENDED OPTIMAL POINT (Max Distance Method)")
print(f" ┌─────────────────────────────────────────────┐")
print(f" │ Completion Time: {rec['years']:>10.0f} years │")
print(f" │ Total Energy: {rec['energy_PJ']:>10.0f} PJ │")
print(f" │ Elevator Share: {rec['elevator_fraction']*100:>10.1f} % │")
print(f" │ Rocket Sites Used: {rec['sites_used']:>10.0f}")
print(f" └─────────────────────────────────────────────┘")
print(f"\n5. INTERPRETATION")
print(f" At the recommended {rec['years']:.0f}-year timeline:")
# 计算方案详情
scenario = calculate_scenario(rec['years'])
print(f" - Space Elevator delivers: {scenario['elevator_payload']/1e6:.1f}M tons")
print(f" - Rockets deliver: {scenario['rocket_payload']/1e6:.1f}M tons")
print(f" - Total rocket launches: {scenario['rocket_launches']:,}")
# 与极端方案对比
min_time_scenario = calculate_scenario(df['years'].min() + 1)
max_time_idx = len(df) - 1
print(f"\n Compared to minimum-time ({df['years'].min():.0f}y):")
if min_time_scenario:
time_diff = rec['years'] - df['years'].min()
energy_diff = min_time_scenario['total_energy_PJ'] - rec['energy_PJ']
print(f" - Extra time: +{time_diff:.0f} years")
print(f" - Energy saved: {energy_diff:.0f} PJ ({energy_diff/min_time_scenario['total_energy_PJ']*100:.1f}%)")
print(f"\n Compared to elevator-only (~186y):")
elevator_only_energy = TOTAL_PAYLOAD * ELEVATOR_SPECIFIC_ENERGY / 1e15
energy_overhead = rec['energy_PJ'] - elevator_only_energy
time_saved = 186 - rec['years']
print(f" - Time saved: {time_saved:.0f} years")
print(f" - Extra energy: {energy_overhead:.0f} PJ ({energy_overhead/elevator_only_energy*100:.1f}%)")
print("\n" + "=" * 80)
print("CONCLUSION: The knee point represents the optimal trade-off where")
print("extending the timeline further yields diminishing energy savings.")
print("=" * 80)
# ============== 组合方案范围内的膝点分析 ==============
def analyze_combined_range():
"""
在组合方案有效范围内101-186年进行膝点分析
186年后电梯可独立完成曲线平坦无意义
"""
# 计算电梯独立完成的时间
elevator_only_years = TOTAL_PAYLOAD / TOTAL_ELEVATOR_CAPACITY
print(f"电梯独立完成需要: {elevator_only_years:.1f}")
# 在组合方案范围内生成Pareto前沿
year_min = 101
year_max = elevator_only_years # ~186年
df = generate_pareto_front(year_min=year_min, year_max=year_max, num_points=300)
return df, year_min, year_max
def plot_combined_range_analysis(
df: pd.DataFrame,
knee_analysis: Dict,
year_range: Tuple[float, float],
save_path: str = '/Volumes/Files/code/mm/20260130_b/p1/pareto_combined_range.png'
):
"""
绘制组合方案范围内的Pareto分析图
"""
fig, axes = plt.subplots(2, 2, figsize=(16, 14))
year_min, year_max = year_range
# ========== 图1: Pareto 前沿(时间 vs 能量) ==========
ax1 = axes[0, 0]
ax1.plot(df['years'], df['energy_PJ'], 'b-', linewidth=2.5, label='Pareto Front')
ax1.fill_between(df['years'], df['energy_PJ'], alpha=0.2)
# 标记各方法找到的膝点
colors = {'max_curvature': 'red', 'max_distance': 'green', 'elbow': 'orange'}
markers = {'max_curvature': 'o', 'max_distance': 's', 'elbow': '^'}
labels = {'max_curvature': 'Max Curvature', 'max_distance': 'Max Distance', 'elbow': 'Elbow'}
for method, data in knee_analysis['methods'].items():
ax1.scatter(data['years'], data['energy_PJ'],
c=colors[method], marker=markers[method], s=200, zorder=5,
edgecolors='black', linewidth=1.5,
label=f'{labels[method]}: {data["years"]:.0f}y, {data["energy_PJ"]:.0f}PJ')
# 标记推荐点
rec = knee_analysis['recommended']
ax1.axvline(x=rec['years'], color='green', linestyle='--', alpha=0.7, linewidth=2)
ax1.axhline(y=rec['energy_PJ'], color='green', linestyle='--', alpha=0.7, linewidth=2)
ax1.set_xlabel('Completion Time (years)', fontsize=12)
ax1.set_ylabel('Total Energy (PJ)', fontsize=12)
ax1.set_title(f'Pareto Front: Combined Scenario ({year_min:.0f}-{year_max:.0f} years)\n'
f'Knee Points by Different Methods', fontsize=13)
ax1.legend(loc='upper right', fontsize=10)
ax1.grid(True, alpha=0.3)
# ========== 图2: 归一化 + 距离可视化 ==========
ax2 = axes[0, 1]
T = df['years'].values
E = df['energy_PJ'].values
T_norm = (T - T.min()) / (T.max() - T.min())
E_norm = (E - E.min()) / (E.max() - E.min())
ax2.plot(T_norm, E_norm, 'b-', linewidth=2.5, label='Normalized Pareto Front')
# 画起点-终点连线
ax2.plot([T_norm[0], T_norm[-1]], [E_norm[0], E_norm[-1]],
'r--', linewidth=2, label='Baseline (Start→End)')
# 标记最大距离点
knee_idx = knee_analysis['methods']['max_distance']['index']
ax2.scatter(T_norm[knee_idx], E_norm[knee_idx],
c='green', marker='s', s=250, zorder=5, edgecolors='black', linewidth=2,
label='Knee Point')
# 画垂直距离线
p1 = np.array([T_norm[0], E_norm[0]])
p2 = np.array([T_norm[-1], E_norm[-1]])
line_vec = p2 - p1
line_unit = line_vec / np.linalg.norm(line_vec)
point = np.array([T_norm[knee_idx], E_norm[knee_idx]])
proj_len = np.dot(point - p1, line_unit)
proj_point = p1 + proj_len * line_unit
ax2.plot([T_norm[knee_idx], proj_point[0]], [E_norm[knee_idx], proj_point[1]],
'g-', linewidth=3, label=f'Max Distance')
ax2.set_xlabel('Normalized Time (0=min, 1=max)', fontsize=12)
ax2.set_ylabel('Normalized Energy (0=min, 1=max)', fontsize=12)
ax2.set_title('Knee Point Detection\n(Maximum Distance to Baseline)', fontsize=13)
ax2.legend(fontsize=10)
ax2.grid(True, alpha=0.3)
ax2.set_xlim(-0.05, 1.05)
ax2.set_ylim(-0.05, 1.05)
# ========== 图3: 曲率分析 ==========
ax3 = axes[1, 0]
# 计算曲率
dE = np.gradient(E_norm, T_norm)
d2E = np.gradient(dE, T_norm)
curvature = np.abs(d2E) / np.power(1 + dE**2, 1.5)
ax3.plot(T, curvature, 'purple', linewidth=2)
ax3.fill_between(T, curvature, alpha=0.2, color='purple')
# 标记最大曲率点
curv_idx = knee_analysis['methods']['max_curvature']['index']
ax3.scatter(T[curv_idx], curvature[curv_idx], c='red', s=200, zorder=5,
marker='o', edgecolors='black', linewidth=2,
label=f'Max Curvature: {T[curv_idx]:.0f} years')
ax3.set_xlabel('Completion Time (years)', fontsize=12)
ax3.set_ylabel('Curvature κ', fontsize=12)
ax3.set_title('Curvature Analysis\n(Higher curvature = sharper bend)', fontsize=13)
ax3.legend(fontsize=10)
ax3.grid(True, alpha=0.3)
# ========== 图4: 边际收益 + 方案构成 ==========
ax4 = axes[1, 1]
# 边际节省
dE_dT = np.gradient(E, T)
marginal_savings = -dE_dT # 每多1年节省的能量
ax4_twin = ax4.twinx()
# 边际收益曲线
line1, = ax4.plot(T, marginal_savings, 'b-', linewidth=2, label='Marginal Saving (PJ/year)')
ax4.fill_between(T, marginal_savings, alpha=0.15, color='blue')
# 电梯占比
line2, = ax4_twin.plot(df['years'], df['elevator_fraction'] * 100, 'g--', linewidth=2,
label='Elevator Share (%)')
# 标记推荐点
ax4.axvline(x=rec['years'], color='red', linestyle=':', linewidth=2,
label=f'Recommended: {rec["years"]:.0f}y')
ax4.set_xlabel('Completion Time (years)', fontsize=12)
ax4.set_ylabel('Marginal Energy Saving (PJ/year)', fontsize=12, color='blue')
ax4_twin.set_ylabel('Elevator Share (%)', fontsize=12, color='green')
ax4.set_title('Marginal Benefit & Payload Distribution', fontsize=13)
# 合并图例
lines = [line1, line2]
labels = [l.get_label() for l in lines]
ax4.legend(lines, labels, loc='upper right', fontsize=10)
ax4.grid(True, alpha=0.3)
plt.suptitle(f'Pareto Optimization: Combined Scenario (Elevator + Rockets)\n'
f'Analysis Range: {year_min:.0f} - {year_max:.0f} years | '
f'Recommended: {rec["years"]:.0f} years, {rec["energy_PJ"]:.0f} PJ',
fontsize=15, y=1.02)
plt.tight_layout()
plt.savefig(save_path, dpi=150, bbox_inches='tight')
print(f"组合方案Pareto分析图已保存至: {save_path}")
return fig
def plot_combined_decision(
df: pd.DataFrame,
knee_analysis: Dict,
year_range: Tuple[float, float],
save_path: str = '/Volumes/Files/code/mm/20260130_b/p1/combined_decision.png'
):
"""
绘制组合方案决策图
"""
rec = knee_analysis['recommended']
year_min, year_max = year_range
fig, ax = plt.subplots(figsize=(14, 9))
T = df['years'].values
E = df['energy_PJ'].values
# 主曲线
ax.plot(T, E, 'b-', linewidth=3, label='Energy-Time Trade-off')
ax.fill_between(T, E, alpha=0.15, color='blue')
# 标记三种方法的膝点
methods_info = knee_analysis['methods']
# 最大距离法(推荐)
ax.scatter(methods_info['max_distance']['years'], methods_info['max_distance']['energy_PJ'],
c='green', s=400, zorder=5, marker='*', edgecolors='black', linewidth=2)
# 最大曲率法
ax.scatter(methods_info['max_curvature']['years'], methods_info['max_curvature']['energy_PJ'],
c='red', s=200, zorder=5, marker='o', edgecolors='black', linewidth=1.5)
# 肘部法
ax.scatter(methods_info['elbow']['years'], methods_info['elbow']['energy_PJ'],
c='orange', s=200, zorder=5, marker='^', edgecolors='black', linewidth=1.5)
# 标注推荐点
ax.annotate(f"★ RECOMMENDED (Max Distance)\n"
f"{rec['years']:.0f} years | {rec['energy_PJ']:.0f} PJ\n"
f"Elevator: {rec['elevator_fraction']*100:.0f}%",
xy=(rec['years'], rec['energy_PJ']),
xytext=(rec['years'] + 15, rec['energy_PJ'] + 2500),
fontsize=12, fontweight='bold',
bbox=dict(boxstyle='round,pad=0.5', facecolor='lightgreen', alpha=0.9),
arrowprops=dict(arrowstyle='->', connectionstyle='arc3,rad=0.2',
color='green', lw=2))
# 标注最大曲率点(如果不同)
if abs(methods_info['max_curvature']['years'] - rec['years']) > 5:
mc = methods_info['max_curvature']
ax.annotate(f"● Max Curvature\n{mc['years']:.0f}y | {mc['energy_PJ']:.0f}PJ",
xy=(mc['years'], mc['energy_PJ']),
xytext=(mc['years'] - 20, mc['energy_PJ'] - 2000),
fontsize=10,
bbox=dict(boxstyle='round,pad=0.3', facecolor='lightyellow', alpha=0.8),
arrowprops=dict(arrowstyle='->', color='red', lw=1.5))
# 标记端点
ax.scatter(T[0], E[0], c='red', s=150, marker='D', edgecolors='black', zorder=5)
ax.annotate(f'Fastest\n{T[0]:.0f}y, {E[0]:.0f}PJ',
xy=(T[0], E[0]), xytext=(T[0]+5, E[0]-1500),
fontsize=10, ha='left')
ax.scatter(T[-1], E[-1], c='green', s=150, marker='D', edgecolors='black', zorder=5)
ax.annotate(f'Lowest Energy\n{T[-1]:.0f}y, {E[-1]:.0f}PJ',
xy=(T[-1], E[-1]), xytext=(T[-1]-25, E[-1]+1500),
fontsize=10, ha='right')
ax.set_xlabel('Completion Time (years)', fontsize=14)
ax.set_ylabel('Total Energy Consumption (PJ)', fontsize=14)
ax.set_title(f'Optimal Decision Point for Combined Scenario\n'
f'(Elevator + Rockets, {year_min:.0f}-{year_max:.0f} years)', fontsize=16)
ax.grid(True, alpha=0.3)
# 图例
from matplotlib.lines import Line2D
legend_elements = [
Line2D([0], [0], marker='*', color='w', markerfacecolor='green', markersize=20,
markeredgecolor='black', label='Max Distance (Recommended)'),
Line2D([0], [0], marker='o', color='w', markerfacecolor='red', markersize=12,
markeredgecolor='black', label='Max Curvature'),
Line2D([0], [0], marker='^', color='w', markerfacecolor='orange', markersize=12,
markeredgecolor='black', label='Elbow Method'),
]
ax.legend(handles=legend_elements, loc='upper right', fontsize=11)
# 右侧信息框
scenario = calculate_scenario(rec['years'])
textstr = (f"Knee Point Analysis\n"
f"─────────────────\n"
f"Optimal Time: {rec['years']:.0f} years\n"
f"Total Energy: {rec['energy_PJ']:.0f} PJ\n"
f"─────────────────\n"
f"Elevator: {scenario['elevator_payload']/1e6:.1f}M tons\n"
f"Rockets: {scenario['rocket_payload']/1e6:.1f}M tons\n"
f"Launches: {scenario['rocket_launches']:,}\n"
f"Sites Used: {scenario['sites_used']}")
props = dict(boxstyle='round', facecolor='white', alpha=0.9, edgecolor='gray')
ax.text(0.02, 0.35, textstr, transform=ax.transAxes, fontsize=11,
verticalalignment='top', horizontalalignment='left', bbox=props,
family='monospace')
plt.tight_layout()
plt.savefig(save_path, dpi=150, bbox_inches='tight')
print(f"组合方案决策图已保存至: {save_path}")
return fig
def print_combined_range_report(df: pd.DataFrame, knee_analysis: Dict, year_range: Tuple[float, float]):
"""
打印组合方案范围内的分析报告
"""
year_min, year_max = year_range
print("\n" + "=" * 80)
print("COMBINED SCENARIO PARETO ANALYSIS (Elevator + Rockets)")
print(f"Analysis Range: {year_min:.0f} - {year_max:.0f} years")
print("=" * 80)
print(f"\n1. RANGE DEFINITION")
print(f" - Minimum time (all resources): {year_min:.0f} years")
print(f" - Elevator-only time: {year_max:.0f} years")
print(f" - This range requires BOTH elevator and rockets")
print(f"\n2. PARETO FRONT STATISTICS")
print(f" - Energy at fastest ({year_min:.0f}y): {df['energy_PJ'].max():.0f} PJ")
print(f" - Energy at slowest ({year_max:.0f}y): {df['energy_PJ'].min():.0f} PJ")
print(f" - Energy reduction: {df['energy_PJ'].max() - df['energy_PJ'].min():.0f} PJ "
f"({(1 - df['energy_PJ'].min()/df['energy_PJ'].max())*100:.1f}%)")
print(f"\n3. KNEE POINT DETECTION RESULTS")
print("-" * 70)
print(f" {'Method':<20} {'Years':>8} {'Energy(PJ)':>12} {'Elev%':>8} {'Rocket%':>8}")
print("-" * 70)
for method, data in knee_analysis['methods'].items():
rocket_pct = (1 - data['elevator_fraction']) * 100
print(f" {method:<20} {data['years']:>8.0f} {data['energy_PJ']:>12.0f} "
f"{data['elevator_fraction']*100:>7.1f}% {rocket_pct:>7.1f}%")
print("-" * 70)
rec = knee_analysis['recommended']
scenario = calculate_scenario(rec['years'])
print(f"\n4. RECOMMENDED OPTIMAL POINT")
print(f" ╔═══════════════════════════════════════════════════════╗")
print(f" ║ Method: Max Distance (most robust) ║")
print(f" ║ Completion Time: {rec['years']:>6.0f} years ║")
print(f" ║ Total Energy: {rec['energy_PJ']:>6.0f} PJ ║")
print(f" ║ Elevator Payload: {scenario['elevator_payload']/1e6:>6.1f} M tons ({rec['elevator_fraction']*100:.0f}%) ║")
print(f" ║ Rocket Payload: {scenario['rocket_payload']/1e6:>6.1f} M tons ({(1-rec['elevator_fraction'])*100:.0f}%) ║")
print(f" ║ Total Launches: {scenario['rocket_launches']:>6,}")
print(f" ║ Sites Required: {scenario['sites_used']:>6}")
print(f" ╚═══════════════════════════════════════════════════════╝")
print(f"\n5. TRADE-OFF ANALYSIS")
# 与最快方案对比
fast_scenario = calculate_scenario(year_min + 0.5)
if fast_scenario:
time_gain = rec['years'] - year_min
energy_save = fast_scenario['total_energy_PJ'] - rec['energy_PJ']
print(f"\n vs. Fastest ({year_min:.0f}y):")
print(f" - Extra time: +{time_gain:.0f} years")
print(f" - Energy saved: {energy_save:.0f} PJ ({energy_save/fast_scenario['total_energy_PJ']*100:.1f}%)")
print(f" - Efficiency: {energy_save/time_gain:.1f} PJ saved per extra year")
# 与纯电梯对比
elev_scenario = calculate_scenario(year_max - 0.5)
if elev_scenario:
time_save = year_max - rec['years']
energy_cost = rec['energy_PJ'] - elev_scenario['total_energy_PJ']
print(f"\n vs. Elevator-only ({year_max:.0f}y):")
print(f" - Time saved: {time_save:.0f} years")
print(f" - Extra energy: {energy_cost:.0f} PJ ({energy_cost/elev_scenario['total_energy_PJ']*100:.1f}%)")
print(f" - Cost: {energy_cost/time_save:.1f} PJ per year saved")
print("\n" + "=" * 80)
print("INTERPRETATION:")
print(f"The knee point at {rec['years']:.0f} years represents the optimal trade-off")
print("where further extending the timeline yields diminishing returns,")
print("while shortening it incurs disproportionately higher energy costs.")
print("=" * 80)
# ============== 主程序 ==============
if __name__ == "__main__":
print("=" * 80)
print("PARETO FRONT & KNEE POINT OPTIMIZATION")
print("Space Elevator + Rocket Combination for Moon Colony")
print("=" * 80)
# ===== 分析1: 组合方案范围 (101-186年) =====
print("\n" + "=" * 80)
print("ANALYSIS 1: Combined Scenario Range (Elevator + Rockets Required)")
print("=" * 80)
df_combined, year_min, year_max = analyze_combined_range()
knee_combined = analyze_knee_points(df_combined)
print_combined_range_report(df_combined, knee_combined, (year_min, year_max))
plot_combined_range_analysis(df_combined, knee_combined, (year_min, year_max))
plot_combined_decision(df_combined, knee_combined, (year_min, year_max))
# ===== 分析2: 完整范围 (100-300年) =====
print("\n" + "=" * 80)
print("ANALYSIS 2: Full Range (100-300 years)")
print("=" * 80)
df_full = generate_pareto_front(year_min=100, year_max=300, num_points=500)
knee_full = analyze_knee_points(df_full)
print_analysis_report(df_full, knee_full)
plot_pareto_analysis(df_full, knee_full)
plot_decision_recommendation(df_full, knee_full)
# 保存数据
df_combined.to_csv('/Volumes/Files/code/mm/20260130_b/p1/pareto_combined_range.csv', index=False)
df_full.to_csv('/Volumes/Files/code/mm/20260130_b/p1/pareto_front_data.csv', index=False)
print("\n数据已保存至 CSV 文件")
print("\n" + "=" * 80)
print("分析完成!")
print("=" * 80)