Files
2026_mcm_b/p2/monte_carlo_simulation.py

970 lines
38 KiB
Python
Raw Permalink Normal View History

2026-01-31 18:00:43 +08:00
"""
任务二蒙特卡洛模拟 - 非完美运行条件下的月球殖民地物资运输分析
Monte Carlo Simulation for Moon Colony Logistics Under Imperfect Conditions
模拟故障类型
1. 缆索摆动 (Tether Swaying) - 导致释放精度偏差需要额外轨道修正ΔV
2. 火箭发射失败 (Rocket Failure) - 发射失败导致载荷损失
3. 电梯故障 (Elevator Breakdown) - 电梯停机维护
4. 天气取消 (Weather Cancellation) - 不利天气导致发射取消
三种方案对比
- 方案A (成本优先): 纯空间电梯
- 方案B (时间优先): 混合方案电梯+火箭全开约101年
- 方案C (平衡方案): 混合方案电梯优先+低纬火箭约139年膝点
"""
import numpy as np
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from matplotlib import rcParams
from dataclasses import dataclass, field
from typing import List, Dict, Tuple, Optional
import pandas as pd
from scipy import stats
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')
# 设置字体
rcParams['font.sans-serif'] = ['Arial Unicode MS', 'DejaVu Sans', 'SimHei']
rcParams['axes.unicode_minus'] = False
# 随机种子(可复现)
np.random.seed(42)
# ============== 物理常数 ==============
G0 = 9.81 # m/s²
OMEGA_EARTH = 7.27e-5 # rad/s
R_EARTH = 6.371e6 # m
# ============== 任务参数 ==============
TOTAL_PAYLOAD = 100e6 # 100 million metric tons
# ============== 太空电梯参数 ==============
NUM_ELEVATORS = 3
ELEVATOR_CAPACITY_PER_YEAR = 179000 # metric tons per elevator per year
TOTAL_ELEVATOR_CAPACITY = NUM_ELEVATORS * ELEVATOR_CAPACITY_PER_YEAR # 537,000 tons/year
# 电梯比能量 (MJ/kg → J/ton)
ELEVATOR_SPECIFIC_ENERGY = 157.2e9 # J per metric ton (157.2 MJ/kg * 1000 kg)
# 电梯释放速度 (用于计算摆动影响)
ELEVATOR_RELEASE_VELOCITY = 7270 # m/s at 100,000 km (大于逃逸速度)
# ============== 火箭参数 ==============
PAYLOAD_PER_LAUNCH = 125 # metric tons per launch
ISP = 450 # 比冲 (秒)
SPECIFIC_FUEL_ENERGY = 15.5e6 # J/kg
ALPHA = 0.10 # 结构系数
NUM_STAGES = 3
DELTA_V_BASE = 13300 # m/s (赤道发射到月球)
# ============== 故障率参数(基于历史数据和合理假设)==============
@dataclass
class FailureParameters:
"""故障率参数"""
# 火箭发射失败率 (SpaceX Falcon 9: ~2-3%, 假设2050年技术进步)
rocket_failure_rate: float = 0.02 # 2%
# 电梯故障率 (假设技术,年故障次数期望)
elevator_failure_rate_per_year: float = 2.0 # 每部电梯每年平均故障2次
elevator_downtime_days_mean: float = 14 # 平均停机14天
elevator_downtime_days_std: float = 7 # 停机时间标准差
# 天气取消率 (分发射场,基于地理位置)
weather_cancel_rates: Dict[str, float] = field(default_factory=lambda: {
'Kourou': 0.10, # 法属圭亚那,热带,雨季影响
'SDSC': 0.15, # 印度,季风影响
'Texas': 0.12, # 德州,风暴
'Florida': 0.25, # 佛罗里达,雷暴频繁
'California': 0.08, # 加州,天气较好
'Virginia': 0.18, # 弗吉尼亚,东海岸天气
'Taiyuan': 0.10, # 太原,内陆干燥
'Mahia': 0.20, # 新西兰,海洋性天气
'Baikonur': 0.12, # 哈萨克斯坦,大陆性
'Alaska': 0.30, # 阿拉斯加,极端天气
})
# 缆索摆动参数 (释放角度偏差)
tether_sway_std_deg: float = 0.5 # 摆动角度标准差 (度)
# 火箭失败后载荷是否损失
payload_lost_on_failure: bool = True
# 火箭失败后重发延迟 (天)
rocket_failure_delay_days: float = 30
# ============== 发射场定义 ==============
@dataclass
class LaunchSite:
name: str
short_name: str
latitude: float
max_launches_per_day: int = 1
@property
def abs_latitude(self) -> float:
return abs(self.latitude)
@property
def rotation_velocity(self) -> float:
return OMEGA_EARTH * R_EARTH * np.cos(np.radians(self.abs_latitude))
@property
def delta_v_loss(self) -> float:
v_equator = OMEGA_EARTH * R_EARTH
return v_equator - self.rotation_velocity
@property
def total_delta_v(self) -> float:
return DELTA_V_BASE + self.delta_v_loss
# 10个发射场 (按纬度排序)
LAUNCH_SITES = sorted([
LaunchSite("Kourou (French Guiana)", "Kourou", 5.2),
LaunchSite("Satish Dhawan (India)", "SDSC", 13.7),
LaunchSite("Boca Chica (Texas)", "Texas", 26.0),
LaunchSite("Cape Canaveral (Florida)", "Florida", 28.5),
LaunchSite("Vandenberg (California)", "California", 34.7),
LaunchSite("Wallops (Virginia)", "Virginia", 37.8),
LaunchSite("Taiyuan (China)", "Taiyuan", 38.8),
LaunchSite("Mahia (New Zealand)", "Mahia", 39.3),
LaunchSite("Baikonur (Kazakhstan)", "Baikonur", 45.6),
LaunchSite("Kodiak (Alaska)", "Alaska", 57.4),
], key=lambda x: x.abs_latitude)
# ============== 核心计算函数 ==============
def fuel_ratio_multistage(delta_v: float) -> float:
"""多级火箭燃料/载荷比"""
ve = ISP * G0
delta_v_per_stage = delta_v / NUM_STAGES
R_stage = np.exp(delta_v_per_stage / ve)
denominator = 1 - ALPHA * (R_stage - 1)
if denominator <= 0:
return np.inf
k_stage = (R_stage - 1) / denominator
total_fuel_ratio = 0
remaining_ratio = 1.0
for _ in range(NUM_STAGES):
fuel_this_stage = remaining_ratio * k_stage
total_fuel_ratio += fuel_this_stage
remaining_ratio *= (1 + k_stage * (1 + ALPHA))
return total_fuel_ratio
def rocket_energy_per_ton(site: LaunchSite) -> float:
"""火箭发射每吨载荷的能量消耗 (J/ton)"""
k = fuel_ratio_multistage(site.total_delta_v)
fuel_per_ton = k * 1000 # kg fuel per metric ton payload
return fuel_per_ton * SPECIFIC_FUEL_ENERGY
def calculate_tether_sway_energy_penalty(sway_angle_deg: float) -> float:
"""
计算缆索摆动导致的能量惩罚
物理模型
- 摆动导致释放速度方向偏差
- 需要额外ΔV进行轨道修正
- ΔV_correction 2 * v_orbit * sin(θ/2) 对于小角度
参数:
sway_angle_deg: 摆动角度 ()
返回:
额外能量消耗比例 (相对于正常能耗的比例)
"""
sway_angle_rad = np.radians(abs(sway_angle_deg))
# 轨道修正ΔV (小角度近似)
delta_v_correction = ELEVATOR_RELEASE_VELOCITY * sway_angle_rad
# 计算额外燃料需求 (使用火箭方程)
ve = ISP * G0
# 单级修正,燃料比
if delta_v_correction > 0:
mass_ratio = np.exp(delta_v_correction / ve)
fuel_ratio = mass_ratio - 1
# 额外能量 = 额外燃料 * 燃料比能量
extra_energy_per_kg = fuel_ratio * SPECIFIC_FUEL_ENERGY
# 转换为相对于电梯基础能耗的比例
return extra_energy_per_kg / (ELEVATOR_SPECIFIC_ENERGY / 1000) # ELEVATOR_SPECIFIC_ENERGY是J/ton
return 0
# ============== 蒙特卡洛模拟类 ==============
@dataclass
class YearlyResult:
"""单年模拟结果"""
year: int
elevator_payload: float
elevator_energy: float
elevator_downtime_days: float
elevator_sway_energy_penalty: float
rocket_payload: float
rocket_energy: float
rocket_launches_attempted: int
rocket_launches_failed: int
rocket_weather_cancelled: int
@property
def total_payload(self) -> float:
return self.elevator_payload + self.rocket_payload
@property
def total_energy(self) -> float:
return self.elevator_energy + self.elevator_sway_energy_penalty + self.rocket_energy
@dataclass
class SimulationResult:
"""单次模拟结果"""
scenario_name: str
completion_years: float
total_energy_pj: float
elevator_payload: float
rocket_payload: float
total_rocket_failures: int
total_weather_cancellations: int
total_elevator_downtime_days: float
total_sway_energy_penalty_pj: float
yearly_results: List[YearlyResult] = field(default_factory=list)
class MonteCarloSimulator:
"""蒙特卡洛模拟器"""
def __init__(self, params: FailureParameters = None):
self.params = params or FailureParameters()
def simulate_elevator_year(
self,
remaining_payload: float,
year: int
) -> Tuple[float, float, float, float]:
"""
模拟电梯一年的运行
返回: (运输载荷, 基础能量, 停机天数, 摆动能量惩罚)
"""
total_downtime_days = 0
total_sway_penalty = 0
# 模拟每部电梯
for elevator_id in range(NUM_ELEVATORS):
# 模拟故障事件 (泊松过程)
num_failures = np.random.poisson(self.params.elevator_failure_rate_per_year)
for _ in range(num_failures):
# 停机时间 (对数正态分布,确保正值)
downtime = max(1, np.random.normal(
self.params.elevator_downtime_days_mean,
self.params.elevator_downtime_days_std
))
total_downtime_days += downtime
# 计算有效运行天数
effective_days = max(0, 365 * NUM_ELEVATORS - total_downtime_days)
effective_capacity_ratio = effective_days / (365 * NUM_ELEVATORS)
# 实际运输量
max_payload = TOTAL_ELEVATOR_CAPACITY * effective_capacity_ratio
actual_payload = min(max_payload, remaining_payload)
# 基础能量
base_energy = actual_payload * ELEVATOR_SPECIFIC_ENERGY
# 模拟每次运输的缆索摆动
# 假设每天运输一批次
num_trips = int(actual_payload / (TOTAL_ELEVATOR_CAPACITY / 365)) if TOTAL_ELEVATOR_CAPACITY > 0 else 0
for _ in range(max(1, num_trips)):
# 摆动角度 (正态分布)
sway_angle = np.random.normal(0, self.params.tether_sway_std_deg)
penalty_ratio = calculate_tether_sway_energy_penalty(sway_angle)
total_sway_penalty += base_energy * penalty_ratio / num_trips if num_trips > 0 else 0
return actual_payload, base_energy, total_downtime_days, total_sway_penalty
def simulate_rocket_year(
self,
remaining_payload: float,
target_launches: int,
sites_to_use: List[LaunchSite],
year: int
) -> Tuple[float, float, int, int, int]:
"""
模拟火箭一年的发射
返回: (运输载荷, 能量消耗, 尝试发射次数, 失败次数, 天气取消次数)
"""
if target_launches <= 0 or remaining_payload <= 0:
return 0, 0, 0, 0, 0
total_payload = 0
total_energy = 0
total_attempts = 0
total_failures = 0
total_weather_cancel = 0
# 按纬度优先分配发射
launches_per_site = target_launches // len(sites_to_use) if len(sites_to_use) > 0 else 0
extra_launches = target_launches % len(sites_to_use) if len(sites_to_use) > 0 else 0
for i, site in enumerate(sites_to_use):
site_launches = launches_per_site + (1 if i < extra_launches else 0)
for launch_idx in range(site_launches):
if total_payload >= remaining_payload:
break
total_attempts += 1
# 天气取消检查
weather_rate = self.params.weather_cancel_rates.get(site.short_name, 0.15)
if np.random.random() < weather_rate:
total_weather_cancel += 1
continue
# 发射失败检查
if np.random.random() < self.params.rocket_failure_rate:
total_failures += 1
if self.params.payload_lost_on_failure:
# 载荷损失,但仍消耗能量
total_energy += rocket_energy_per_ton(site) * PAYLOAD_PER_LAUNCH
continue
# 成功发射
payload = min(PAYLOAD_PER_LAUNCH, remaining_payload - total_payload)
total_payload += payload
total_energy += rocket_energy_per_ton(site) * payload
return total_payload, total_energy, total_attempts, total_failures, total_weather_cancel
def simulate_scenario(
self,
scenario_name: str,
target_years: float,
use_elevator: bool = True,
use_rockets: bool = True,
max_years: int = 500
) -> SimulationResult:
"""
模拟单个场景
参数:
scenario_name: 方案名称
target_years: 目标完成年限
use_elevator: 是否使用电梯
use_rockets: 是否使用火箭
max_years: 最大模拟年限
"""
remaining_payload = TOTAL_PAYLOAD
total_energy = 0
total_elevator_payload = 0
total_rocket_payload = 0
total_rocket_failures = 0
total_weather_cancellations = 0
total_elevator_downtime = 0
total_sway_penalty = 0
yearly_results = []
# 计算每年需要的火箭发射量(如果需要)
if use_rockets and use_elevator:
# 混合方案:计算火箭需要补充的量
elevator_total_capacity = TOTAL_ELEVATOR_CAPACITY * target_years
rocket_needed = max(0, TOTAL_PAYLOAD - elevator_total_capacity)
rocket_launches_per_year = int(np.ceil(rocket_needed / PAYLOAD_PER_LAUNCH / target_years))
elif use_rockets and not use_elevator:
# 纯火箭
rocket_launches_per_year = int(np.ceil(TOTAL_PAYLOAD / PAYLOAD_PER_LAUNCH / target_years))
else:
rocket_launches_per_year = 0
year = 0
while remaining_payload > 0 and year < max_years:
year += 1
# 电梯运输
elev_payload, elev_energy, elev_downtime, sway_penalty = 0, 0, 0, 0
if use_elevator:
elev_payload, elev_energy, elev_downtime, sway_penalty = \
self.simulate_elevator_year(remaining_payload, year)
remaining_payload -= elev_payload
total_elevator_payload += elev_payload
total_energy += elev_energy + sway_penalty
total_elevator_downtime += elev_downtime
total_sway_penalty += sway_penalty
# 火箭运输
rock_payload, rock_energy, attempts, failures, weather = 0, 0, 0, 0, 0
if use_rockets and remaining_payload > 0:
rock_payload, rock_energy, attempts, failures, weather = \
self.simulate_rocket_year(
remaining_payload,
rocket_launches_per_year,
LAUNCH_SITES,
year
)
remaining_payload -= rock_payload
total_rocket_payload += rock_payload
total_energy += rock_energy
total_rocket_failures += failures
total_weather_cancellations += weather
yearly_results.append(YearlyResult(
year=year,
elevator_payload=elev_payload,
elevator_energy=elev_energy,
elevator_downtime_days=elev_downtime,
elevator_sway_energy_penalty=sway_penalty,
rocket_payload=rock_payload,
rocket_energy=rock_energy,
rocket_launches_attempted=attempts,
rocket_launches_failed=failures,
rocket_weather_cancelled=weather
))
return SimulationResult(
scenario_name=scenario_name,
completion_years=year,
total_energy_pj=total_energy / 1e15,
elevator_payload=total_elevator_payload,
rocket_payload=total_rocket_payload,
total_rocket_failures=total_rocket_failures,
total_weather_cancellations=total_weather_cancellations,
total_elevator_downtime_days=total_elevator_downtime,
total_sway_energy_penalty_pj=total_sway_penalty / 1e15,
yearly_results=yearly_results
)
def run_monte_carlo(
self,
scenario_name: str,
target_years: float,
use_elevator: bool,
use_rockets: bool,
n_simulations: int = 1000,
show_progress: bool = True
) -> List[SimulationResult]:
"""运行蒙特卡洛模拟"""
results = []
iterator = range(n_simulations)
if show_progress:
iterator = tqdm(iterator, desc=f"Simulating {scenario_name[:30]}", unit="sim")
for i in iterator:
result = self.simulate_scenario(
scenario_name=scenario_name,
target_years=target_years,
use_elevator=use_elevator,
use_rockets=use_rockets
)
results.append(result)
return results
# ============== 三种方案定义 ==============
def define_scenarios() -> Dict[str, Dict]:
"""
定义三种方案
基于任务一的分析结果:
- 纯电梯: ~18615720 PJ
- 混合最短时间: ~101
- 混合膝点: ~139
"""
# 计算关键时间点
elevator_only_years = TOTAL_PAYLOAD / TOTAL_ELEVATOR_CAPACITY # ~186年
rocket_capacity_per_year = len(LAUNCH_SITES) * 365 * PAYLOAD_PER_LAUNCH
combined_min_years = TOTAL_PAYLOAD / (TOTAL_ELEVATOR_CAPACITY + rocket_capacity_per_year) # ~101年
balanced_years = 139 # 膝点
return {
'Scenario_A': {
'name': 'Cost Priority (Elevator Only)',
'name_cn': '方案A: 成本优先(纯电梯)',
'target_years': elevator_only_years,
'use_elevator': True,
'use_rockets': False,
'description': '仅使用太空电梯,能耗最低但耗时最长'
},
'Scenario_B': {
'name': 'Time Priority (Combined - Max Speed)',
'name_cn': '方案B: 时间优先(混合-全速)',
'target_years': combined_min_years * 1.05, # 加5%余量
'use_elevator': True,
'use_rockets': True,
'description': '电梯+火箭全开,时间最短但能耗较高'
},
'Scenario_C': {
'name': 'Balanced (Combined - Knee Point)',
'name_cn': '方案C: 综合平衡(膝点方案)',
'target_years': balanced_years,
'use_elevator': True,
'use_rockets': True,
'description': '电梯优先+低纬火箭补充,成本-时间平衡'
}
}
# ============== 可视化函数 ==============
def plot_monte_carlo_results(
all_results: Dict[str, List[SimulationResult]],
save_dir: str = '/Volumes/Files/code/mm/20260130_b/p2'
):
"""绘制蒙特卡洛模拟结果"""
scenarios = define_scenarios()
# ========== 图1: 完成年份分布 ==========
fig1, axes1 = plt.subplots(1, 3, figsize=(15, 5))
colors = ['green', 'blue', 'purple']
for idx, (key, results) in enumerate(all_results.items()):
ax = axes1[idx]
years = [r.completion_years for r in results]
ax.hist(years, bins=30, color=colors[idx], alpha=0.7, edgecolor='black')
ax.axvline(np.mean(years), color='red', linestyle='--', linewidth=2,
label=f'Mean: {np.mean(years):.1f}')
ax.axvline(np.percentile(years, 5), color='orange', linestyle=':', linewidth=1.5,
label=f'5%: {np.percentile(years, 5):.1f}')
ax.axvline(np.percentile(years, 95), color='orange', linestyle=':', linewidth=1.5,
label=f'95%: {np.percentile(years, 95):.1f}')
ax.set_xlabel('Completion Years', fontsize=11)
ax.set_ylabel('Frequency', fontsize=11)
ax.set_title(f"{scenarios[key]['name']}\nCompletion Time Distribution", fontsize=12)
ax.legend(fontsize=9)
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(f'{save_dir}/completion_time_distribution.png', dpi=150, bbox_inches='tight')
print(f"完成年份分布图已保存")
# ========== 图2: 能量消耗分布 ==========
fig2, axes2 = plt.subplots(1, 3, figsize=(15, 5))
for idx, (key, results) in enumerate(all_results.items()):
ax = axes2[idx]
energy = [r.total_energy_pj for r in results]
ax.hist(energy, bins=30, color=colors[idx], alpha=0.7, edgecolor='black')
ax.axvline(np.mean(energy), color='red', linestyle='--', linewidth=2,
label=f'Mean: {np.mean(energy):.0f} PJ')
ax.axvline(np.percentile(energy, 5), color='orange', linestyle=':', linewidth=1.5,
label=f'5%: {np.percentile(energy, 5):.0f}')
ax.axvline(np.percentile(energy, 95), color='orange', linestyle=':', linewidth=1.5,
label=f'95%: {np.percentile(energy, 95):.0f}')
ax.set_xlabel('Total Energy (PJ)', fontsize=11)
ax.set_ylabel('Frequency', fontsize=11)
ax.set_title(f"{scenarios[key]['name']}\nEnergy Distribution", fontsize=12)
ax.legend(fontsize=9)
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(f'{save_dir}/energy_distribution.png', dpi=150, bbox_inches='tight')
print(f"能量分布图已保存")
# ========== 图3: 箱线图对比 ==========
fig3, axes3 = plt.subplots(1, 2, figsize=(14, 6))
# 完成年份箱线图
ax3a = axes3[0]
years_data = [[r.completion_years for r in results] for results in all_results.values()]
bp1 = ax3a.boxplot(years_data, labels=['A: Elevator\nOnly', 'B: Time\nPriority', 'C: Balanced'],
patch_artist=True)
for patch, color in zip(bp1['boxes'], colors):
patch.set_facecolor(color)
patch.set_alpha(0.7)
ax3a.set_ylabel('Completion Years', fontsize=12)
ax3a.set_title('Completion Time Comparison (Monte Carlo)', fontsize=13)
ax3a.grid(True, alpha=0.3)
# 能量消耗箱线图
ax3b = axes3[1]
energy_data = [[r.total_energy_pj for r in results] for results in all_results.values()]
bp2 = ax3b.boxplot(energy_data, labels=['A: Elevator\nOnly', 'B: Time\nPriority', 'C: Balanced'],
patch_artist=True)
for patch, color in zip(bp2['boxes'], colors):
patch.set_facecolor(color)
patch.set_alpha(0.7)
ax3b.set_ylabel('Total Energy (PJ)', fontsize=12)
ax3b.set_title('Energy Consumption Comparison (Monte Carlo)', fontsize=13)
ax3b.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(f'{save_dir}/boxplot_comparison.png', dpi=150, bbox_inches='tight')
print(f"箱线图对比已保存")
# ========== 图4: 故障影响分析 ==========
fig4, axes4 = plt.subplots(2, 2, figsize=(14, 12))
# 火箭失败统计
ax4a = axes4[0, 0]
for idx, (key, results) in enumerate(all_results.items()):
failures = [r.total_rocket_failures for r in results]
if max(failures) > 0:
ax4a.hist(failures, bins=20, color=colors[idx], alpha=0.5,
label=scenarios[key]['name'], edgecolor='black')
ax4a.set_xlabel('Total Rocket Failures', fontsize=11)
ax4a.set_ylabel('Frequency', fontsize=11)
ax4a.set_title('Rocket Launch Failures Distribution', fontsize=12)
ax4a.legend()
ax4a.grid(True, alpha=0.3)
# 天气取消统计
ax4b = axes4[0, 1]
for idx, (key, results) in enumerate(all_results.items()):
weather = [r.total_weather_cancellations for r in results]
if max(weather) > 0:
ax4b.hist(weather, bins=20, color=colors[idx], alpha=0.5,
label=scenarios[key]['name'], edgecolor='black')
ax4b.set_xlabel('Total Weather Cancellations', fontsize=11)
ax4b.set_ylabel('Frequency', fontsize=11)
ax4b.set_title('Weather Cancellation Distribution', fontsize=12)
ax4b.legend()
ax4b.grid(True, alpha=0.3)
# 电梯停机时间统计
ax4c = axes4[1, 0]
for idx, (key, results) in enumerate(all_results.items()):
downtime = [r.total_elevator_downtime_days for r in results]
if max(downtime) > 0:
ax4c.hist(downtime, bins=20, color=colors[idx], alpha=0.5,
label=scenarios[key]['name'], edgecolor='black')
ax4c.set_xlabel('Total Elevator Downtime (days)', fontsize=11)
ax4c.set_ylabel('Frequency', fontsize=11)
ax4c.set_title('Elevator Downtime Distribution', fontsize=12)
ax4c.legend()
ax4c.grid(True, alpha=0.3)
# 缆索摆动能量惩罚
ax4d = axes4[1, 1]
for idx, (key, results) in enumerate(all_results.items()):
sway = [r.total_sway_energy_penalty_pj for r in results]
if max(sway) > 0:
ax4d.hist(sway, bins=20, color=colors[idx], alpha=0.5,
label=scenarios[key]['name'], edgecolor='black')
ax4d.set_xlabel('Tether Sway Energy Penalty (PJ)', fontsize=11)
ax4d.set_ylabel('Frequency', fontsize=11)
ax4d.set_title('Tether Sway Energy Penalty Distribution', fontsize=12)
ax4d.legend()
ax4d.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(f'{save_dir}/failure_analysis.png', dpi=150, bbox_inches='tight')
print(f"故障分析图已保存")
# ========== 图5: 敏感性分析 - 龙卷风图 ==========
fig5, ax5 = plt.subplots(figsize=(12, 8))
# 计算各因素对完成时间的影响以方案C为例
results_c = all_results['Scenario_C']
# 计算相关性
years = np.array([r.completion_years for r in results_c])
failures = np.array([r.total_rocket_failures for r in results_c])
weather = np.array([r.total_weather_cancellations for r in results_c])
downtime = np.array([r.total_elevator_downtime_days for r in results_c])
sway = np.array([r.total_sway_energy_penalty_pj for r in results_c])
# 计算Spearman相关系数
correlations = {
'Rocket Failures': stats.spearmanr(failures, years)[0] if failures.std() > 0 else 0,
'Weather Cancellations': stats.spearmanr(weather, years)[0] if weather.std() > 0 else 0,
'Elevator Downtime': stats.spearmanr(downtime, years)[0] if downtime.std() > 0 else 0,
'Tether Sway Penalty': stats.spearmanr(sway, years)[0] if sway.std() > 0 else 0,
}
# 排序
sorted_corr = sorted(correlations.items(), key=lambda x: abs(x[1]), reverse=True)
labels = [x[0] for x in sorted_corr]
values = [x[1] for x in sorted_corr]
colors_bar = ['red' if v > 0 else 'blue' for v in values]
y_pos = np.arange(len(labels))
ax5.barh(y_pos, values, color=colors_bar, alpha=0.7)
ax5.set_yticks(y_pos)
ax5.set_yticklabels(labels)
ax5.set_xlabel('Spearman Correlation with Completion Time', fontsize=12)
ax5.set_title('Sensitivity Analysis: Factors Affecting Completion Time\n(Scenario C: Balanced)', fontsize=13)
ax5.axvline(x=0, color='black', linestyle='-', linewidth=0.5)
ax5.grid(True, alpha=0.3, axis='x')
# 添加数值标签
for i, v in enumerate(values):
ax5.text(v + 0.02 if v > 0 else v - 0.02, i, f'{v:.3f}',
va='center', ha='left' if v > 0 else 'right', fontsize=10)
plt.tight_layout()
plt.savefig(f'{save_dir}/sensitivity_tornado.png', dpi=150, bbox_inches='tight')
print(f"敏感性龙卷风图已保存")
# ========== 图6: 综合对比图 ==========
fig6, axes6 = plt.subplots(2, 2, figsize=(14, 12))
# 左上:完成时间 vs 能量散点图
ax6a = axes6[0, 0]
for idx, (key, results) in enumerate(all_results.items()):
years = [r.completion_years for r in results]
energy = [r.total_energy_pj for r in results]
ax6a.scatter(years, energy, c=colors[idx], alpha=0.3, s=20,
label=scenarios[key]['name'])
# 添加均值点
ax6a.scatter(np.mean(years), np.mean(energy), c=colors[idx],
s=200, marker='*', edgecolors='black', linewidth=2)
ax6a.set_xlabel('Completion Years', fontsize=11)
ax6a.set_ylabel('Total Energy (PJ)', fontsize=11)
ax6a.set_title('Years vs Energy Trade-off\n(Stars = Mean)', fontsize=12)
ax6a.legend()
ax6a.grid(True, alpha=0.3)
# 右上:完成时间累积分布
ax6b = axes6[0, 1]
for idx, (key, results) in enumerate(all_results.items()):
years = sorted([r.completion_years for r in results])
cdf = np.arange(1, len(years) + 1) / len(years)
ax6b.plot(years, cdf, color=colors[idx], linewidth=2, label=scenarios[key]['name'])
ax6b.set_xlabel('Completion Years', fontsize=11)
ax6b.set_ylabel('Cumulative Probability', fontsize=11)
ax6b.set_title('Completion Time CDF', fontsize=12)
ax6b.legend()
ax6b.grid(True, alpha=0.3)
# 左下:各方案统计摘要表格
ax6c = axes6[1, 0]
ax6c.axis('off')
summary_data = []
for key, results in all_results.items():
years = [r.completion_years for r in results]
energy = [r.total_energy_pj for r in results]
summary_data.append([
scenarios[key]['name'][:20],
f"{np.mean(years):.1f}",
f"{np.std(years):.1f}",
f"[{np.percentile(years, 5):.0f}, {np.percentile(years, 95):.0f}]",
f"{np.mean(energy):.0f}",
f"{np.std(energy):.0f}",
])
table = ax6c.table(
cellText=summary_data,
colLabels=['Scenario', 'Years\n(Mean)', 'Years\n(Std)', '90% CI', 'Energy\n(PJ Mean)', 'Energy\n(PJ Std)'],
loc='center',
cellLoc='center'
)
table.auto_set_font_size(False)
table.set_fontsize(10)
table.scale(1.2, 2)
ax6c.set_title('Statistical Summary', fontsize=12, pad=20)
# 右下:相对于理想情况的延迟分布
ax6d = axes6[1, 1]
# 计算相对延迟
for idx, (key, results) in enumerate(all_results.items()):
target = scenarios[key]['target_years']
delays = [(r.completion_years - target) / target * 100 for r in results]
ax6d.hist(delays, bins=25, color=colors[idx], alpha=0.5,
label=scenarios[key]['name'], edgecolor='black')
ax6d.axvline(x=0, color='black', linestyle='--', linewidth=2, label='Target (0% delay)')
ax6d.set_xlabel('Delay Relative to Target (%)', fontsize=11)
ax6d.set_ylabel('Frequency', fontsize=11)
ax6d.set_title('Delay Distribution (Due to Failures)', fontsize=12)
ax6d.legend()
ax6d.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(f'{save_dir}/comprehensive_comparison.png', dpi=150, bbox_inches='tight')
print(f"综合对比图已保存")
return fig1, fig2, fig3, fig4, fig5, fig6
def generate_statistics_report(
all_results: Dict[str, List[SimulationResult]],
params: FailureParameters
) -> str:
"""生成统计报告"""
scenarios = define_scenarios()
report = []
report.append("=" * 80)
report.append("MONTE CARLO SIMULATION RESULTS - STATISTICAL SUMMARY")
report.append("=" * 80)
report.append("\n## 故障参数设置")
report.append(f"- 火箭发射失败率: {params.rocket_failure_rate * 100:.1f}%")
report.append(f"- 电梯年故障次数: {params.elevator_failure_rate_per_year:.1f} 次/部/年")
report.append(f"- 电梯平均停机时间: {params.elevator_downtime_days_mean:.0f} ± {params.elevator_downtime_days_std:.0f}")
report.append(f"- 缆索摆动角度标准差: {params.tether_sway_std_deg:.2f}°")
report.append(f"- 天气取消率范围: {min(params.weather_cancel_rates.values())*100:.0f}% - {max(params.weather_cancel_rates.values())*100:.0f}%")
for key, results in all_results.items():
scenario = scenarios[key]
years = [r.completion_years for r in results]
energy = [r.total_energy_pj for r in results]
failures = [r.total_rocket_failures for r in results]
weather = [r.total_weather_cancellations for r in results]
downtime = [r.total_elevator_downtime_days for r in results]
sway = [r.total_sway_energy_penalty_pj for r in results]
report.append(f"\n{'=' * 80}")
report.append(f"## {scenario['name_cn']}")
report.append(f" {scenario['description']}")
report.append(f"{'=' * 80}")
report.append(f"\n### 完成时间统计")
report.append(f" 目标年限: {scenario['target_years']:.1f}")
report.append(f" 均值: {np.mean(years):.1f}")
report.append(f" 标准差: {np.std(years):.1f}")
report.append(f" 90% 置信区间: [{np.percentile(years, 5):.1f}, {np.percentile(years, 95):.1f}] 年")
report.append(f" 最短: {np.min(years):.1f} 年, 最长: {np.max(years):.1f}")
report.append(f" 相对目标延迟: {(np.mean(years) - scenario['target_years']) / scenario['target_years'] * 100:.1f}%")
report.append(f"\n### 能量消耗统计")
report.append(f" 均值: {np.mean(energy):.0f} PJ")
report.append(f" 标准差: {np.std(energy):.0f} PJ")
report.append(f" 90% 置信区间: [{np.percentile(energy, 5):.0f}, {np.percentile(energy, 95):.0f}] PJ")
if scenario['use_rockets']:
report.append(f"\n### 火箭故障统计")
report.append(f" 平均失败次数: {np.mean(failures):.1f}")
report.append(f" 平均天气取消: {np.mean(weather):.1f}")
if scenario['use_elevator']:
report.append(f"\n### 电梯故障统计")
report.append(f" 平均总停机时间: {np.mean(downtime):.0f} 天 ({np.mean(downtime)/365:.1f} 年)")
report.append(f" 平均摆动能量惩罚: {np.mean(sway):.1f} PJ ({np.mean(sway)/np.mean(energy)*100:.2f}%)")
# 方案对比
report.append(f"\n{'=' * 80}")
report.append("## 三方案对比总结")
report.append(f"{'=' * 80}")
report.append(f"\n{'方案':<25} {'完成年限':<15} {'总能耗(PJ)':<15} {'延迟率':<12}")
report.append("-" * 70)
for key, results in all_results.items():
scenario = scenarios[key]
years = [r.completion_years for r in results]
energy = [r.total_energy_pj for r in results]
delay = (np.mean(years) - scenario['target_years']) / scenario['target_years'] * 100
report.append(f"{scenario['name']:<25} {np.mean(years):.1f} ± {np.std(years):.1f} {np.mean(energy):.0f} ± {np.std(energy):.0f} {delay:+.1f}%")
return "\n".join(report)
# ============== 主程序 ==============
def run_full_simulation(n_simulations: int = 1000):
"""运行完整的蒙特卡洛模拟"""
import sys
print("=" * 80)
print("任务二:蒙特卡洛模拟 - 非完美运行条件分析")
print("=" * 80)
sys.stdout.flush()
# 初始化参数
params = FailureParameters()
simulator = MonteCarloSimulator(params)
scenarios = define_scenarios()
print(f"\n模拟次数: {n_simulations}")
print(f"\n故障参数:")
print(f" - 火箭失败率: {params.rocket_failure_rate * 100:.1f}%")
print(f" - 电梯年故障次数: {params.elevator_failure_rate_per_year:.1f}")
print(f" - 缆索摆动标准差: {params.tether_sway_std_deg:.2f}°")
# 运行三种方案的模拟
all_results = {}
for key, scenario in scenarios.items():
print(f"\n{'='*60}")
print(f"正在模拟 {scenario['name_cn']}...")
print(f"目标年限: {scenario['target_years']:.1f}")
print(f"{'='*60}")
results = simulator.run_monte_carlo(
scenario_name=scenario['name'],
target_years=scenario['target_years'],
use_elevator=scenario['use_elevator'],
use_rockets=scenario['use_rockets'],
n_simulations=n_simulations,
show_progress=True
)
all_results[key] = results
# 打印简要统计
years = [r.completion_years for r in results]
energy = [r.total_energy_pj for r in results]
print(f"\n ✓ 完成年限: {np.mean(years):.1f} ± {np.std(years):.1f}")
print(f" ✓ 总能耗: {np.mean(energy):.0f} ± {np.std(energy):.0f} PJ")
# 生成可视化
print("\n正在生成可视化图表...")
plot_monte_carlo_results(all_results)
# 生成统计报告
print("\n正在生成统计报告...")
report = generate_statistics_report(all_results, params)
print(report)
# 保存报告
with open('/Volumes/Files/code/mm/20260130_b/p2/simulation_report.txt', 'w', encoding='utf-8') as f:
f.write(report)
print(f"\n统计报告已保存至: p2/simulation_report.txt")
# 保存结果数据
summary_data = []
for key, results in all_results.items():
scenario = scenarios[key]
for r in results:
summary_data.append({
'scenario': key,
'scenario_name': scenario['name'],
'target_years': scenario['target_years'],
'completion_years': r.completion_years,
'total_energy_pj': r.total_energy_pj,
'elevator_payload': r.elevator_payload,
'rocket_payload': r.rocket_payload,
'rocket_failures': r.total_rocket_failures,
'weather_cancellations': r.total_weather_cancellations,
'elevator_downtime_days': r.total_elevator_downtime_days,
'sway_energy_penalty_pj': r.total_sway_energy_penalty_pj
})
df = pd.DataFrame(summary_data)
df.to_csv('/Volumes/Files/code/mm/20260130_b/p2/simulation_results.csv', index=False)
print(f"模拟数据已保存至: p2/simulation_results.csv")
return all_results, params
if __name__ == "__main__":
all_results, params = run_full_simulation(n_simulations=1000)