1208 lines
47 KiB
Python
1208 lines
47 KiB
Python
"""
|
||
Moon Colony Logistics - Uncertainty & Robustness Analysis (Task 2)
|
||
重构版 v2:方案A(聚合MC + 二项分布 + 一致性能量口径)
|
||
|
||
核心改进(v2 审计后修正):
|
||
1. 使用二项分布模拟发射成功/失败(而非除法倒推)
|
||
2. 复用任务一火箭能耗模型(保持能量口径一致)
|
||
3. 使用 Beta 分布替代 Normal+clip(更适合概率参数)
|
||
4. 完成时间分布使用期望值函数(修复采样单调性问题)
|
||
5. 添加条件能量均值 E[Energy|completed](口径分离)
|
||
6. 修正电梯能量逻辑:能量∝交付量(效率影响交付而非能耗)
|
||
7. 结论表述改为"决策区间+风险偏好"而非单一断言
|
||
8. 失败能量分阶段建模(引用来源说明)
|
||
"""
|
||
|
||
import numpy as np
|
||
import matplotlib
|
||
matplotlib.use('Agg')
|
||
import matplotlib.pyplot as plt
|
||
from matplotlib import rcParams
|
||
from dataclasses import dataclass
|
||
from typing import List, Dict, Tuple, Optional
|
||
import pandas as pd
|
||
from scipy import stats
|
||
import warnings
|
||
warnings.filterwarnings('ignore')
|
||
|
||
# 设置字体
|
||
rcParams['font.sans-serif'] = ['Arial Unicode MS', 'DejaVu Sans', 'SimHei']
|
||
rcParams['axes.unicode_minus'] = False
|
||
|
||
np.random.seed(42)
|
||
|
||
# ============== 物理常数(与任务一一致) ==============
|
||
G0 = 9.81 # m/s²
|
||
OMEGA_EARTH = 7.27e-5 # rad/s
|
||
R_EARTH = 6.371e6 # m
|
||
|
||
# 任务参数
|
||
TOTAL_PAYLOAD = 100e6 # 100 million metric tons
|
||
|
||
# 太空电梯参数
|
||
NUM_ELEVATORS = 3
|
||
ELEVATOR_CAPACITY_PER_YEAR = 179000 # metric tons per elevator per year
|
||
TOTAL_ELEVATOR_CAPACITY = NUM_ELEVATORS * ELEVATOR_CAPACITY_PER_YEAR # 537,000 tons/year
|
||
ELEVATOR_SPECIFIC_ENERGY = 157.2e9 # J per metric ton
|
||
|
||
# 火箭参数(与任务一一致)
|
||
PAYLOAD_PER_LAUNCH = 125 # metric tons per launch
|
||
ISP = 450
|
||
SPECIFIC_FUEL_ENERGY = 15.5e6 # J/kg
|
||
ALPHA = 0.10
|
||
NUM_STAGES = 3
|
||
DELTA_V_BASE = 13300 # m/s
|
||
|
||
# 失败发射能量损失比例(分阶段建模,参考 SpaceX/NASA 历史数据)
|
||
# 参考:FAA AST Commercial Space Transportation Report
|
||
FAILURE_ENERGY_FRACTIONS = {
|
||
'pre_launch_abort': 0.05, # 发射前中止:仅燃料泵送等准备能量
|
||
'first_stage_fail': 0.35, # 一级飞行失败:约35%燃料已消耗
|
||
'upper_stage_fail': 0.70, # 上面级失败:大部分燃料已消耗
|
||
'orbit_fail': 1.00, # 轨道转移失败:全部能量损失
|
||
}
|
||
# 加权平均失败能量比例(假设分布:50%一级,30%上面级,15%发射前,5%轨道)
|
||
WEIGHTED_FAILURE_ENERGY_RATIO = (
|
||
0.15 * FAILURE_ENERGY_FRACTIONS['pre_launch_abort'] +
|
||
0.50 * FAILURE_ENERGY_FRACTIONS['first_stage_fail'] +
|
||
0.30 * FAILURE_ENERGY_FRACTIONS['upper_stage_fail'] +
|
||
0.05 * FAILURE_ENERGY_FRACTIONS['orbit_fail']
|
||
) # ≈ 0.40
|
||
|
||
|
||
# ============== 发射场定义(复用任务一) ==============
|
||
@dataclass
|
||
class LaunchSite:
|
||
name: str
|
||
short_name: str
|
||
latitude: float
|
||
|
||
@property
|
||
def abs_latitude(self) -> float:
|
||
return abs(self.latitude)
|
||
|
||
@property
|
||
def delta_v_loss(self) -> float:
|
||
v_equator = OMEGA_EARTH * R_EARTH
|
||
v_site = OMEGA_EARTH * R_EARTH * np.cos(np.radians(self.abs_latitude))
|
||
return v_equator - v_site
|
||
|
||
@property
|
||
def total_delta_v(self) -> float:
|
||
return DELTA_V_BASE + self.delta_v_loss
|
||
|
||
|
||
LAUNCH_SITES = sorted([
|
||
LaunchSite("Kourou (French Guiana)", "Kourou", 5.2),
|
||
LaunchSite("Satish Dhawan (India)", "SDSC", 13.7),
|
||
LaunchSite("Boca Chica (Texas)", "Texas", 26.0),
|
||
LaunchSite("Cape Canaveral (Florida)", "Florida", 28.5),
|
||
LaunchSite("Vandenberg (California)", "California", 34.7),
|
||
LaunchSite("Wallops (Virginia)", "Virginia", 37.8),
|
||
LaunchSite("Taiyuan (China)", "Taiyuan", 38.8),
|
||
LaunchSite("Mahia (New Zealand)", "Mahia", 39.3),
|
||
LaunchSite("Baikonur (Kazakhstan)", "Baikonur", 45.6),
|
||
LaunchSite("Kodiak (Alaska)", "Alaska", 57.4),
|
||
], key=lambda x: x.abs_latitude)
|
||
|
||
N_LAUNCH_SITES = len(LAUNCH_SITES)
|
||
|
||
|
||
# ============== 火箭能耗函数(复用任务一口径) ==============
|
||
|
||
def fuel_ratio_multistage(delta_v: float) -> float:
|
||
"""多级火箭燃料/载荷比(与任务一一致)"""
|
||
ve = ISP * G0
|
||
delta_v_per_stage = delta_v / NUM_STAGES
|
||
R_stage = np.exp(delta_v_per_stage / ve)
|
||
|
||
denominator = 1 - ALPHA * (R_stage - 1)
|
||
if denominator <= 0:
|
||
return np.inf
|
||
|
||
k_stage = (R_stage - 1) / denominator
|
||
|
||
total_fuel_ratio = 0
|
||
remaining_ratio = 1.0
|
||
|
||
for _ in range(NUM_STAGES):
|
||
fuel_this_stage = remaining_ratio * k_stage
|
||
total_fuel_ratio += fuel_this_stage
|
||
remaining_ratio *= (1 + k_stage * (1 + ALPHA))
|
||
|
||
return total_fuel_ratio
|
||
|
||
|
||
def rocket_energy_per_launch(site: LaunchSite) -> float:
|
||
"""单次发射的能量消耗 (J),与任务一口径一致"""
|
||
k = fuel_ratio_multistage(site.total_delta_v)
|
||
fuel_mass = k * PAYLOAD_PER_LAUNCH * 1000 # kg
|
||
return fuel_mass * SPECIFIC_FUEL_ENERGY
|
||
|
||
|
||
# 预计算各站点能耗(按纬度排序,低纬优先)
|
||
SITE_ENERGIES = [rocket_energy_per_launch(site) for site in LAUNCH_SITES]
|
||
AVG_ROCKET_ENERGY_PER_LAUNCH = np.mean(SITE_ENERGIES)
|
||
|
||
|
||
# ============== 不确定性参数(使用 Beta 分布参数化) ==============
|
||
|
||
def beta_params_from_mean_std(mean: float, std: float) -> Tuple[float, float]:
|
||
"""从均值和标准差计算 Beta 分布的 α, β 参数"""
|
||
if std <= 0 or mean <= 0 or mean >= 1:
|
||
return (10, 10 * (1 - mean) / mean) # fallback
|
||
|
||
var = std ** 2
|
||
# Beta 分布: mean = α/(α+β), var = αβ/((α+β)²(α+β+1))
|
||
# 反解: α = mean * ((mean*(1-mean)/var) - 1)
|
||
# β = (1-mean) * ((mean*(1-mean)/var) - 1)
|
||
common = (mean * (1 - mean) / var) - 1
|
||
if common <= 0:
|
||
common = 1 # fallback
|
||
alpha = mean * common
|
||
beta = (1 - mean) * common
|
||
return (max(alpha, 0.5), max(beta, 0.5))
|
||
|
||
|
||
@dataclass
|
||
class UncertaintyParams:
|
||
"""
|
||
不确定性参数配置
|
||
使用均值和标准差定义,内部转换为 Beta 分布
|
||
"""
|
||
# 火箭发射成功率
|
||
rocket_success_rate: float = 0.97
|
||
rocket_success_rate_std: float = 0.015
|
||
|
||
# 电梯可用率(故障/维护)
|
||
elevator_availability: float = 0.90
|
||
elevator_availability_std: float = 0.04
|
||
|
||
# 天气/发射窗口可用因子
|
||
weather_factor: float = 0.80
|
||
weather_factor_std: float = 0.08
|
||
|
||
# 载荷损失率
|
||
loss_rate: float = 0.01
|
||
loss_rate_std: float = 0.005
|
||
|
||
# 电梯效率因子(缆绳摆动等)
|
||
elevator_efficiency: float = 0.95
|
||
elevator_efficiency_std: float = 0.02
|
||
|
||
def sample(self, n: int) -> Dict[str, np.ndarray]:
|
||
"""采样所有参数(使用 Beta 分布)"""
|
||
samples = {}
|
||
|
||
# 火箭成功率
|
||
a, b = beta_params_from_mean_std(self.rocket_success_rate, self.rocket_success_rate_std)
|
||
samples['rocket_success_rate'] = np.random.beta(a, b, n)
|
||
|
||
# 电梯可用率
|
||
a, b = beta_params_from_mean_std(self.elevator_availability, self.elevator_availability_std)
|
||
samples['elevator_availability'] = np.random.beta(a, b, n)
|
||
|
||
# 天气因子
|
||
a, b = beta_params_from_mean_std(self.weather_factor, self.weather_factor_std)
|
||
samples['weather_factor'] = np.random.beta(a, b, n)
|
||
|
||
# 电梯效率
|
||
a, b = beta_params_from_mean_std(self.elevator_efficiency, self.elevator_efficiency_std)
|
||
samples['elevator_efficiency'] = np.random.beta(a, b, n)
|
||
|
||
# 损失率(使用三角分布,更适合小概率事件)
|
||
low = max(0, self.loss_rate - 2 * self.loss_rate_std)
|
||
high = min(0.1, self.loss_rate + 2 * self.loss_rate_std)
|
||
mode = self.loss_rate
|
||
samples['loss_rate'] = np.random.triangular(low, mode, high, n)
|
||
|
||
return samples
|
||
|
||
def __str__(self):
|
||
return (f"Rocket Success: {self.rocket_success_rate:.1%} (σ={self.rocket_success_rate_std:.1%})\n"
|
||
f"Elevator Avail: {self.elevator_availability:.1%} (σ={self.elevator_availability_std:.1%})\n"
|
||
f"Weather Factor: {self.weather_factor:.1%} (σ={self.weather_factor_std:.1%})\n"
|
||
f"Loss Rate: {self.loss_rate:.2%} (σ={self.loss_rate_std:.2%})\n"
|
||
f"Elevator Efficiency: {self.elevator_efficiency:.1%} (σ={self.elevator_efficiency_std:.1%})")
|
||
|
||
|
||
# ============== 确定性基准计算(任务一口径) ==============
|
||
|
||
def calculate_deterministic(completion_years: float) -> Optional[Dict]:
|
||
"""确定性方案计算(任务一基准)"""
|
||
elevator_payload = min(TOTAL_ELEVATOR_CAPACITY * completion_years, TOTAL_PAYLOAD)
|
||
elevator_energy = elevator_payload * ELEVATOR_SPECIFIC_ENERGY
|
||
|
||
remaining_payload = TOTAL_PAYLOAD - elevator_payload
|
||
|
||
if remaining_payload <= 0:
|
||
return {
|
||
'years': completion_years,
|
||
'elevator_payload': elevator_payload,
|
||
'rocket_payload': 0,
|
||
'elevator_energy_PJ': elevator_energy / 1e15,
|
||
'rocket_energy_PJ': 0,
|
||
'total_energy_PJ': elevator_energy / 1e15,
|
||
'successful_launches': 0,
|
||
'total_launches': 0,
|
||
'failed_launches': 0,
|
||
}
|
||
|
||
# 按低纬优先分配
|
||
days_available = int(completion_years * 365)
|
||
max_launches_per_site = days_available
|
||
|
||
rocket_energy = 0
|
||
remaining = remaining_payload
|
||
successful_launches = 0
|
||
|
||
for i, site in enumerate(LAUNCH_SITES):
|
||
if remaining <= 0:
|
||
break
|
||
launches_needed = int(np.ceil(remaining / PAYLOAD_PER_LAUNCH))
|
||
allocated = min(launches_needed, max_launches_per_site)
|
||
rocket_energy += allocated * SITE_ENERGIES[i]
|
||
remaining -= allocated * PAYLOAD_PER_LAUNCH
|
||
successful_launches += allocated
|
||
|
||
if remaining > 0:
|
||
return None # 无法完成
|
||
|
||
return {
|
||
'years': completion_years,
|
||
'elevator_payload': elevator_payload,
|
||
'rocket_payload': TOTAL_PAYLOAD - elevator_payload,
|
||
'elevator_energy_PJ': elevator_energy / 1e15,
|
||
'rocket_energy_PJ': rocket_energy / 1e15,
|
||
'total_energy_PJ': (elevator_energy + rocket_energy) / 1e15,
|
||
'successful_launches': successful_launches,
|
||
'total_launches': successful_launches,
|
||
'failed_launches': 0,
|
||
}
|
||
|
||
|
||
# ============== 方案A:聚合MC + 二项分布 ==============
|
||
|
||
def monte_carlo_binomial(
|
||
target_years: float,
|
||
params: UncertaintyParams,
|
||
n_simulations: int = 2000,
|
||
) -> Dict:
|
||
"""
|
||
方案A:聚合蒙特卡洛模拟(使用二项分布)
|
||
|
||
核心逻辑:
|
||
1. 采样参数(Beta/三角分布)
|
||
2. 计算可尝试发射次数 N_try = 365 * T * weather * N_sites
|
||
3. 成功次数 N_succ ~ Binomial(N_try, p_success)
|
||
4. 实际交付 = N_succ * payload * (1 - loss)
|
||
5. 判定完成:电梯 + 火箭 >= 总量
|
||
"""
|
||
# 采样参数
|
||
samples = params.sample(n_simulations)
|
||
p_success = samples['rocket_success_rate']
|
||
elevator_avail = samples['elevator_availability']
|
||
weather = samples['weather_factor']
|
||
loss_rate = samples['loss_rate']
|
||
elevator_eff = samples['elevator_efficiency']
|
||
|
||
# 电梯交付
|
||
effective_elevator_capacity = TOTAL_ELEVATOR_CAPACITY * elevator_avail * elevator_eff
|
||
elevator_payload = np.minimum(effective_elevator_capacity * target_years, TOTAL_PAYLOAD)
|
||
|
||
# 火箭需要补的量
|
||
remaining_needed = TOTAL_PAYLOAD - elevator_payload
|
||
remaining_needed = np.maximum(remaining_needed, 0)
|
||
|
||
# 可尝试发射次数
|
||
n_try = np.floor(target_years * 365 * weather * N_LAUNCH_SITES).astype(int)
|
||
n_try = np.maximum(n_try, 0)
|
||
|
||
# 成功发射次数(二项分布)
|
||
n_success = np.zeros(n_simulations, dtype=int)
|
||
for i in range(n_simulations):
|
||
if n_try[i] > 0:
|
||
n_success[i] = np.random.binomial(n_try[i], p_success[i])
|
||
|
||
# 火箭实际交付(考虑损失)
|
||
rocket_payload = n_success * PAYLOAD_PER_LAUNCH * (1 - loss_rate)
|
||
|
||
# 判定完成
|
||
total_delivered = elevator_payload + rocket_payload
|
||
completed = total_delivered >= TOTAL_PAYLOAD
|
||
|
||
# 实际使用的火箭载荷(不超过需求)
|
||
actual_rocket_payload = np.minimum(rocket_payload, remaining_needed)
|
||
actual_rocket_payload = np.where(completed, remaining_needed, rocket_payload)
|
||
|
||
# 实际成功发射次数(满足需求所需的最小次数)
|
||
needed_success = np.ceil(remaining_needed / (PAYLOAD_PER_LAUNCH * (1 - loss_rate))).astype(int)
|
||
actual_success = np.minimum(n_success, needed_success)
|
||
actual_success = np.where(completed, needed_success, n_success)
|
||
|
||
# 失败发射次数 = 尝试 - 成功(但只计算到满足需求为止)
|
||
# 使用期望值估算:尝试次数 ≈ 成功次数 / 成功率
|
||
estimated_tries = np.where(p_success > 0, actual_success / p_success, 0)
|
||
failed_launches = np.maximum(estimated_tries - actual_success, 0)
|
||
|
||
# 能量计算(保持与任务一口径一致)
|
||
# 电梯能量:能量∝实际交付量(效率影响交付量而非能耗本身)
|
||
# 修正逻辑:电梯消耗的能量 = 实际交付量 × 单位能量
|
||
elevator_energy = elevator_payload * ELEVATOR_SPECIFIC_ENERGY
|
||
|
||
# 火箭能量:使用加权平均(低纬优先分配)
|
||
rocket_energy = np.zeros(n_simulations)
|
||
for i in range(n_simulations):
|
||
if actual_success[i] > 0:
|
||
remaining_launches = int(actual_success[i])
|
||
max_per_site = int(target_years * 365 * weather[i])
|
||
for j, site_energy in enumerate(SITE_ENERGIES):
|
||
if remaining_launches <= 0:
|
||
break
|
||
allocated = min(remaining_launches, max_per_site)
|
||
rocket_energy[i] += allocated * site_energy
|
||
remaining_launches -= allocated
|
||
|
||
# 失败发射能量(分阶段加权平均,参考 FAA 数据)
|
||
failed_energy = failed_launches * AVG_ROCKET_ENERGY_PER_LAUNCH * WEIGHTED_FAILURE_ENERGY_RATIO
|
||
|
||
total_energy = (elevator_energy + rocket_energy + failed_energy) / 1e15 # PJ
|
||
|
||
# 条件能量均值:E[Energy | completed](只算完成的模拟)
|
||
completed_mask = completed.astype(bool)
|
||
if np.sum(completed_mask) > 0:
|
||
energy_given_completed = np.mean(total_energy[completed_mask])
|
||
energy_given_completed_p95 = np.percentile(total_energy[completed_mask], 95)
|
||
else:
|
||
energy_given_completed = np.nan
|
||
energy_given_completed_p95 = np.nan
|
||
|
||
# 统计
|
||
return {
|
||
'target_years': target_years,
|
||
'n_simulations': n_simulations,
|
||
'completion_probability': np.mean(completed),
|
||
'energy_mean': np.mean(total_energy), # 无条件均值
|
||
'energy_std': np.std(total_energy),
|
||
'energy_p5': np.percentile(total_energy, 5),
|
||
'energy_p50': np.percentile(total_energy, 50),
|
||
'energy_p95': np.percentile(total_energy, 95),
|
||
# 新增:条件能量(只算完成的模拟)
|
||
'energy_given_completed': energy_given_completed,
|
||
'energy_given_completed_p95': energy_given_completed_p95,
|
||
'avg_successful_launches': np.mean(actual_success),
|
||
'avg_failed_launches': np.mean(failed_launches),
|
||
'avg_elevator_payload': np.mean(elevator_payload),
|
||
'avg_rocket_payload': np.mean(actual_rocket_payload),
|
||
'raw': {
|
||
'completed': completed,
|
||
'total_energy': total_energy,
|
||
'elevator_payload': elevator_payload,
|
||
'rocket_payload': actual_rocket_payload,
|
||
'successful_launches': actual_success,
|
||
'failed_launches': failed_launches,
|
||
'n_try': n_try,
|
||
'n_success': n_success,
|
||
}
|
||
}
|
||
|
||
|
||
def find_completion_time_distribution(
|
||
params: UncertaintyParams,
|
||
n_simulations: int = 2000,
|
||
year_step: float = 1.0,
|
||
max_years: float = 300,
|
||
) -> Dict:
|
||
"""
|
||
计算真实的完成时间分布(v2 修正:使用期望值函数保证单调性)
|
||
|
||
方法:对每次模拟,使用期望交付函数计算完成时间
|
||
D(t) = elevator(t) + E[rocket(t)] 是单调递增的
|
||
找 T = inf{t: D(t) >= M}
|
||
"""
|
||
samples = params.sample(n_simulations)
|
||
completion_times = np.full(n_simulations, np.nan)
|
||
completion_energies = np.full(n_simulations, np.nan)
|
||
|
||
for i in range(n_simulations):
|
||
p_s = samples['rocket_success_rate'][i]
|
||
e_a = samples['elevator_availability'][i]
|
||
w_f = samples['weather_factor'][i]
|
||
l_r = samples['loss_rate'][i]
|
||
e_e = samples['elevator_efficiency'][i]
|
||
|
||
# 定义单调递增的交付函数(使用期望值保证单调性)
|
||
def expected_delivery(t):
|
||
# 电梯交付
|
||
elev_cap = TOTAL_ELEVATOR_CAPACITY * e_a * e_e
|
||
elev_payload = min(elev_cap * t, TOTAL_PAYLOAD)
|
||
|
||
# 火箭交付能力(使用期望值 = n_try * p_success)
|
||
n_try = t * 365 * w_f * N_LAUNCH_SITES
|
||
expected_success = n_try * p_s
|
||
rocket_payload = expected_success * PAYLOAD_PER_LAUNCH * (1 - l_r)
|
||
|
||
return elev_payload + rocket_payload
|
||
|
||
# 二分搜索完成时间(现在函数是单调的)
|
||
low, high = 50.0, max_years
|
||
|
||
while high - low > 1.0:
|
||
mid = (low + high) / 2
|
||
if expected_delivery(mid) >= TOTAL_PAYLOAD:
|
||
high = mid
|
||
else:
|
||
low = mid
|
||
|
||
completion_times[i] = high
|
||
|
||
# 计算该时间点的能量
|
||
elev_cap = TOTAL_ELEVATOR_CAPACITY * e_a * e_e
|
||
elev_payload = min(elev_cap * high, TOTAL_PAYLOAD)
|
||
remaining = TOTAL_PAYLOAD - elev_payload
|
||
|
||
# 电梯能量(能量∝交付量)
|
||
elev_energy = elev_payload * ELEVATOR_SPECIFIC_ENERGY
|
||
|
||
if remaining > 0:
|
||
needed_success = int(np.ceil(remaining / (PAYLOAD_PER_LAUNCH * (1 - l_r))))
|
||
rocket_energy = needed_success * AVG_ROCKET_ENERGY_PER_LAUNCH
|
||
# 估算失败
|
||
estimated_tries = needed_success / p_s if p_s > 0 else needed_success
|
||
failed = max(0, estimated_tries - needed_success)
|
||
failed_energy = failed * AVG_ROCKET_ENERGY_PER_LAUNCH * WEIGHTED_FAILURE_ENERGY_RATIO
|
||
else:
|
||
rocket_energy = 0
|
||
failed_energy = 0
|
||
|
||
completion_energies[i] = (elev_energy + rocket_energy + failed_energy) / 1e15
|
||
|
||
return {
|
||
'completion_times': completion_times,
|
||
'completion_energies': completion_energies,
|
||
'time_mean': np.nanmean(completion_times),
|
||
'time_std': np.nanstd(completion_times),
|
||
'time_p5': np.nanpercentile(completion_times, 5),
|
||
'time_p50': np.nanpercentile(completion_times, 50),
|
||
'time_p95': np.nanpercentile(completion_times, 95),
|
||
'energy_mean': np.nanmean(completion_energies),
|
||
'energy_p95': np.nanpercentile(completion_energies, 95),
|
||
}
|
||
|
||
|
||
# ============== 可靠性曲线 ==============
|
||
|
||
def compute_reliability_curve(
|
||
params: UncertaintyParams,
|
||
year_range: Tuple[float, float] = (100, 220),
|
||
n_points: int = 25,
|
||
n_simulations: int = 2000,
|
||
) -> pd.DataFrame:
|
||
"""计算可靠性曲线:P(complete) vs T"""
|
||
years = np.linspace(year_range[0], year_range[1], n_points)
|
||
|
||
results = []
|
||
for y in years:
|
||
mc = monte_carlo_binomial(y, params, n_simulations)
|
||
results.append({
|
||
'years': y,
|
||
'prob': mc['completion_probability'],
|
||
'energy_mean': mc['energy_mean'],
|
||
'energy_p95': mc['energy_p95'],
|
||
})
|
||
|
||
return pd.DataFrame(results)
|
||
|
||
|
||
def find_reliable_timeline(
|
||
params: UncertaintyParams,
|
||
target_prob: float = 0.95,
|
||
n_simulations: int = 2000,
|
||
) -> float:
|
||
"""找到达到目标可靠性的最短时间"""
|
||
low, high = 100, 250
|
||
|
||
while high - low > 1:
|
||
mid = (low + high) / 2
|
||
mc = monte_carlo_binomial(mid, params, n_simulations)
|
||
if mc['completion_probability'] >= target_prob:
|
||
high = mid
|
||
else:
|
||
low = mid
|
||
|
||
return high
|
||
|
||
|
||
# ============== 风险约束膝点分析(回答 solution changes) ==============
|
||
|
||
def pareto_with_risk_constraint(
|
||
params: UncertaintyParams,
|
||
reliability_threshold: float = 0.95,
|
||
year_range: Tuple[float, float] = (100, 200),
|
||
n_points: int = 30,
|
||
n_simulations: int = 2000,
|
||
) -> pd.DataFrame:
|
||
"""
|
||
在 (T, E_p95) 空间上找 Pareto 前沿,加约束 P(complete) >= threshold
|
||
这回答了"在不完美系统下,最优策略如何变化"
|
||
"""
|
||
years = np.linspace(year_range[0], year_range[1], n_points)
|
||
|
||
results = []
|
||
for y in years:
|
||
mc = monte_carlo_binomial(y, params, n_simulations)
|
||
det = calculate_deterministic(y)
|
||
|
||
results.append({
|
||
'years': y,
|
||
'prob': mc['completion_probability'],
|
||
'feasible': mc['completion_probability'] >= reliability_threshold,
|
||
'energy_mean_mc': mc['energy_mean'],
|
||
'energy_p95_mc': mc['energy_p95'],
|
||
'energy_det': det['total_energy_PJ'] if det else np.nan,
|
||
'energy_increase_pct': ((mc['energy_mean'] / det['total_energy_PJ'] - 1) * 100) if det else np.nan,
|
||
})
|
||
|
||
return pd.DataFrame(results)
|
||
|
||
|
||
def find_knee_point_with_risk(df: pd.DataFrame) -> Dict:
|
||
"""
|
||
在可行域内找膝点(最大曲率法)
|
||
"""
|
||
feasible = df[df['feasible']].copy()
|
||
|
||
if len(feasible) < 3:
|
||
return {'knee_years': np.nan, 'knee_energy': np.nan}
|
||
|
||
T = feasible['years'].values
|
||
E = feasible['energy_p95_mc'].values
|
||
|
||
# 归一化
|
||
T_norm = (T - T.min()) / (T.max() - T.min() + 1e-10)
|
||
E_norm = (E - E.min()) / (E.max() - E.min() + 1e-10)
|
||
|
||
# 最大距离法
|
||
p1 = np.array([T_norm[0], E_norm[0]])
|
||
p2 = np.array([T_norm[-1], E_norm[-1]])
|
||
line_vec = p2 - p1
|
||
line_len = np.linalg.norm(line_vec)
|
||
|
||
if line_len < 1e-10:
|
||
return {'knee_years': T[len(T)//2], 'knee_energy': E[len(E)//2]}
|
||
|
||
line_unit = line_vec / line_len
|
||
|
||
distances = []
|
||
for i in range(len(T)):
|
||
point = np.array([T_norm[i], E_norm[i]])
|
||
point_vec = point - p1
|
||
proj_len = np.dot(point_vec, line_unit)
|
||
proj_point = p1 + proj_len * line_unit
|
||
dist = np.linalg.norm(point - proj_point)
|
||
distances.append(dist)
|
||
|
||
knee_idx = np.argmax(distances)
|
||
|
||
return {
|
||
'knee_years': feasible.iloc[knee_idx]['years'],
|
||
'knee_energy_p95': feasible.iloc[knee_idx]['energy_p95_mc'],
|
||
'knee_energy_mean': feasible.iloc[knee_idx]['energy_mean_mc'],
|
||
'knee_prob': feasible.iloc[knee_idx]['prob'],
|
||
}
|
||
|
||
|
||
# ============== 敏感性分析 ==============
|
||
|
||
def sensitivity_analysis(
|
||
base_params: UncertaintyParams,
|
||
target_years: float = 139,
|
||
n_simulations: int = 2000,
|
||
) -> pd.DataFrame:
|
||
"""敏感性分析"""
|
||
results = []
|
||
|
||
# 基准
|
||
base = monte_carlo_binomial(target_years, base_params, n_simulations)
|
||
det = calculate_deterministic(target_years)
|
||
results.append({
|
||
'Parameter': 'Baseline',
|
||
'Value': '-',
|
||
'Completion Prob': base['completion_probability'],
|
||
'Energy Mean (PJ)': base['energy_mean'],
|
||
'Energy P95 (PJ)': base['energy_p95'],
|
||
'vs Deterministic': f"+{(base['energy_mean']/det['total_energy_PJ']-1)*100:.1f}%" if det else 'N/A',
|
||
})
|
||
|
||
# 火箭成功率
|
||
for rate in [0.93, 0.95, 0.97, 0.99]:
|
||
p = UncertaintyParams(rocket_success_rate=rate)
|
||
r = monte_carlo_binomial(target_years, p, n_simulations)
|
||
results.append({
|
||
'Parameter': 'Rocket Success Rate',
|
||
'Value': f'{rate:.0%}',
|
||
'Completion Prob': r['completion_probability'],
|
||
'Energy Mean (PJ)': r['energy_mean'],
|
||
'Energy P95 (PJ)': r['energy_p95'],
|
||
'vs Deterministic': f"+{(r['energy_mean']/det['total_energy_PJ']-1)*100:.1f}%" if det else 'N/A',
|
||
})
|
||
|
||
# 电梯可用率
|
||
for avail in [0.80, 0.85, 0.90, 0.95]:
|
||
p = UncertaintyParams(elevator_availability=avail)
|
||
r = monte_carlo_binomial(target_years, p, n_simulations)
|
||
results.append({
|
||
'Parameter': 'Elevator Availability',
|
||
'Value': f'{avail:.0%}',
|
||
'Completion Prob': r['completion_probability'],
|
||
'Energy Mean (PJ)': r['energy_mean'],
|
||
'Energy P95 (PJ)': r['energy_p95'],
|
||
'vs Deterministic': f"+{(r['energy_mean']/det['total_energy_PJ']-1)*100:.1f}%" if det else 'N/A',
|
||
})
|
||
|
||
# 天气因子
|
||
for weather in [0.70, 0.75, 0.80, 0.85, 0.90]:
|
||
p = UncertaintyParams(weather_factor=weather)
|
||
r = monte_carlo_binomial(target_years, p, n_simulations)
|
||
results.append({
|
||
'Parameter': 'Weather Factor',
|
||
'Value': f'{weather:.0%}',
|
||
'Completion Prob': r['completion_probability'],
|
||
'Energy Mean (PJ)': r['energy_mean'],
|
||
'Energy P95 (PJ)': r['energy_p95'],
|
||
'vs Deterministic': f"+{(r['energy_mean']/det['total_energy_PJ']-1)*100:.1f}%" if det else 'N/A',
|
||
})
|
||
|
||
return pd.DataFrame(results)
|
||
|
||
|
||
# ============== 情景分析 ==============
|
||
|
||
def scenario_comparison(target_years: float = 139, n_simulations: int = 2000) -> Dict:
|
||
"""情景对比:乐观/基准/悲观"""
|
||
scenarios = {
|
||
'Optimistic': UncertaintyParams(
|
||
rocket_success_rate=0.99,
|
||
elevator_availability=0.95,
|
||
weather_factor=0.90,
|
||
loss_rate=0.005,
|
||
elevator_efficiency=0.98,
|
||
),
|
||
'Baseline': UncertaintyParams(),
|
||
'Pessimistic': UncertaintyParams(
|
||
rocket_success_rate=0.93,
|
||
elevator_availability=0.80,
|
||
weather_factor=0.70,
|
||
loss_rate=0.02,
|
||
elevator_efficiency=0.90,
|
||
),
|
||
}
|
||
|
||
results = {}
|
||
det = calculate_deterministic(target_years)
|
||
|
||
for name, params in scenarios.items():
|
||
mc = monte_carlo_binomial(target_years, params, n_simulations)
|
||
results[name] = {
|
||
'params': params,
|
||
'mc': mc,
|
||
'det': det,
|
||
}
|
||
|
||
return results
|
||
|
||
|
||
# ============== 可视化 ==============
|
||
|
||
def plot_monte_carlo_results(
|
||
mc_result: Dict,
|
||
params: UncertaintyParams,
|
||
det_result: Dict,
|
||
save_path: str = '/Volumes/Files/code/mm/20260130_b/p2/monte_carlo_results.png'
|
||
):
|
||
"""绘制蒙特卡洛结果"""
|
||
fig, axes = plt.subplots(2, 2, figsize=(14, 12))
|
||
raw = mc_result['raw']
|
||
|
||
# 图1:能量分布(对比确定性)
|
||
ax1 = axes[0, 0]
|
||
energies = raw['total_energy']
|
||
ax1.hist(energies, bins=40, color='steelblue', alpha=0.7, edgecolor='white', label='MC Distribution')
|
||
ax1.axvline(mc_result['energy_mean'], color='red', linestyle='--', linewidth=2,
|
||
label=f'MC Mean: {mc_result["energy_mean"]:.0f} PJ')
|
||
ax1.axvline(mc_result['energy_p95'], color='orange', linestyle=':', linewidth=2,
|
||
label=f'MC P95: {mc_result["energy_p95"]:.0f} PJ')
|
||
ax1.axvline(det_result['total_energy_PJ'], color='green', linestyle='-', linewidth=2,
|
||
label=f'Deterministic: {det_result["total_energy_PJ"]:.0f} PJ')
|
||
ax1.set_xlabel('Total Energy (PJ)', fontsize=12)
|
||
ax1.set_ylabel('Frequency', fontsize=12)
|
||
ax1.set_title('Energy Distribution: MC vs Deterministic', fontsize=13)
|
||
ax1.legend()
|
||
ax1.grid(True, alpha=0.3)
|
||
|
||
# 图2:发射次数分布
|
||
ax2 = axes[0, 1]
|
||
success = raw['successful_launches']
|
||
failed = raw['failed_launches']
|
||
ax2.hist(success, bins=30, color='seagreen', alpha=0.6, label='Successful', edgecolor='white')
|
||
ax2.hist(failed, bins=30, color='red', alpha=0.6, label='Failed', edgecolor='white')
|
||
ax2.axvline(det_result['successful_launches'], color='green', linestyle='--', linewidth=2,
|
||
label=f'Det. Launches: {det_result["successful_launches"]:.0f}')
|
||
ax2.set_xlabel('Number of Launches', fontsize=12)
|
||
ax2.set_ylabel('Frequency', fontsize=12)
|
||
ax2.set_title(f'Launch Distribution\n(Avg Success: {mc_result["avg_successful_launches"]:.0f}, '
|
||
f'Avg Failed: {mc_result["avg_failed_launches"]:.0f})', fontsize=13)
|
||
ax2.legend()
|
||
ax2.grid(True, alpha=0.3)
|
||
|
||
# 图3:尝试vs成功散点图
|
||
ax3 = axes[1, 0]
|
||
ax3.scatter(raw['n_try'], raw['n_success'], alpha=0.3, c='purple', s=10)
|
||
ax3.plot([0, max(raw['n_try'])], [0, max(raw['n_try'])], 'k--', alpha=0.5, label='100% success')
|
||
ax3.set_xlabel('Attempted Launches', fontsize=12)
|
||
ax3.set_ylabel('Successful Launches', fontsize=12)
|
||
ax3.set_title('Binomial: Attempts vs Successes', fontsize=13)
|
||
ax3.legend()
|
||
ax3.grid(True, alpha=0.3)
|
||
|
||
# 图4:统计信息
|
||
ax4 = axes[1, 1]
|
||
ax4.axis('off')
|
||
|
||
energy_increase = (mc_result['energy_mean'] / det_result['total_energy_PJ'] - 1) * 100
|
||
|
||
info_text = (
|
||
f"Monte Carlo Simulation (Binomial Model)\n"
|
||
f"{'='*45}\n\n"
|
||
f"Target: {mc_result['target_years']:.0f} years\n"
|
||
f"Simulations: {mc_result['n_simulations']:,}\n\n"
|
||
f"COMPLETION PROBABILITY: {mc_result['completion_probability']:.1%}\n\n"
|
||
f"Energy Statistics (PJ):\n"
|
||
f" Deterministic: {det_result['total_energy_PJ']:.0f}\n"
|
||
f" MC Mean: {mc_result['energy_mean']:.0f} (+{energy_increase:.1f}%)\n"
|
||
f" MC P95: {mc_result['energy_p95']:.0f}\n\n"
|
||
f"Launch Statistics:\n"
|
||
f" Det. Launches: {det_result['successful_launches']:,}\n"
|
||
f" Avg Success: {mc_result['avg_successful_launches']:,.0f}\n"
|
||
f" Avg Failed: {mc_result['avg_failed_launches']:,.0f}\n\n"
|
||
f"Parameters (Beta/Triangular):\n"
|
||
f" Rocket Success: {params.rocket_success_rate:.0%}\n"
|
||
f" Elevator Avail: {params.elevator_availability:.0%}\n"
|
||
f" Weather Factor: {params.weather_factor:.0%}\n"
|
||
f" Loss Rate: {params.loss_rate:.1%}"
|
||
)
|
||
|
||
ax4.text(0.05, 0.95, info_text, transform=ax4.transAxes,
|
||
fontsize=11, verticalalignment='top', fontfamily='monospace',
|
||
bbox=dict(boxstyle='round', facecolor='lightyellow', alpha=0.8))
|
||
|
||
plt.suptitle('Monte Carlo Simulation: Binomial Model (Task 2 Revised)', fontsize=15, y=1.02)
|
||
plt.tight_layout()
|
||
plt.savefig(save_path, dpi=150, bbox_inches='tight')
|
||
print(f"蒙特卡洛结果图已保存至: {save_path}")
|
||
|
||
|
||
def plot_solution_changes(
|
||
pareto_df: pd.DataFrame,
|
||
det_knee: float,
|
||
risk_knee: Dict,
|
||
save_path: str = '/Volumes/Files/code/mm/20260130_b/p2/solution_changes.png'
|
||
):
|
||
"""
|
||
绘制"解决方案变化"图:展示膝点如何右移
|
||
这是回答任务二核心问题的关键图
|
||
"""
|
||
fig, axes = plt.subplots(1, 2, figsize=(16, 7))
|
||
|
||
# 图1:可靠性约束下的 Pareto 前沿
|
||
ax1 = axes[0]
|
||
|
||
feasible = pareto_df[pareto_df['feasible']]
|
||
infeasible = pareto_df[~pareto_df['feasible']]
|
||
|
||
ax1.plot(feasible['years'], feasible['energy_p95_mc'], 'b-', linewidth=2.5,
|
||
label='Feasible (P≥95%)', marker='o', markersize=6)
|
||
ax1.plot(infeasible['years'], infeasible['energy_p95_mc'], 'r--', linewidth=1.5,
|
||
label='Infeasible (P<95%)', marker='x', markersize=6, alpha=0.5)
|
||
|
||
# 确定性曲线
|
||
ax1.plot(pareto_df['years'], pareto_df['energy_det'], 'g-', linewidth=2,
|
||
label='Deterministic', alpha=0.7)
|
||
|
||
# 标记膝点
|
||
ax1.scatter([det_knee], [calculate_deterministic(det_knee)['total_energy_PJ']],
|
||
c='green', s=200, marker='*', zorder=5, edgecolors='black', linewidth=2,
|
||
label=f'Det. Knee: {det_knee:.0f}y')
|
||
|
||
if not np.isnan(risk_knee['knee_years']):
|
||
ax1.scatter([risk_knee['knee_years']], [risk_knee['knee_energy_p95']],
|
||
c='red', s=200, marker='*', zorder=5, edgecolors='black', linewidth=2,
|
||
label=f'Risk Knee: {risk_knee["knee_years"]:.0f}y')
|
||
|
||
# 画偏移箭头
|
||
ax1.annotate('', xy=(risk_knee['knee_years'], risk_knee['knee_energy_p95']),
|
||
xytext=(det_knee, calculate_deterministic(det_knee)['total_energy_PJ']),
|
||
arrowprops=dict(arrowstyle='->', color='purple', lw=2))
|
||
|
||
ax1.set_xlabel('Completion Time (years)', fontsize=12)
|
||
ax1.set_ylabel('Energy P95 (PJ)', fontsize=12)
|
||
ax1.set_title('Solution Changes: Knee Point Shift\n(Deterministic → Risk-Constrained)', fontsize=14)
|
||
ax1.legend(loc='upper right')
|
||
ax1.grid(True, alpha=0.3)
|
||
|
||
# 图2:能量增加百分比
|
||
ax2 = axes[1]
|
||
|
||
valid = pareto_df[pareto_df['energy_det'].notna()]
|
||
ax2.fill_between(valid['years'], 0, valid['energy_increase_pct'],
|
||
alpha=0.4, color='coral', label='Energy Increase (%)')
|
||
ax2.plot(valid['years'], valid['energy_increase_pct'], 'r-', linewidth=2)
|
||
|
||
# 标记可行边界
|
||
if len(feasible) > 0:
|
||
feasible_start = feasible['years'].min()
|
||
ax2.axvline(feasible_start, color='blue', linestyle='--', linewidth=2,
|
||
label=f'95% Feasible from {feasible_start:.0f}y')
|
||
|
||
ax2.set_xlabel('Completion Time (years)', fontsize=12)
|
||
ax2.set_ylabel('Energy Increase vs Deterministic (%)', fontsize=12)
|
||
ax2.set_title('Uncertainty Penalty: Energy Increase', fontsize=14)
|
||
ax2.legend()
|
||
ax2.grid(True, alpha=0.3)
|
||
|
||
plt.suptitle('Task 2: To What Extent Does the Solution Change?', fontsize=16, y=1.02)
|
||
plt.tight_layout()
|
||
plt.savefig(save_path, dpi=150, bbox_inches='tight')
|
||
print(f"解决方案变化图已保存至: {save_path}")
|
||
|
||
|
||
def plot_completion_time_distribution(
|
||
time_dist: Dict,
|
||
save_path: str = '/Volumes/Files/code/mm/20260130_b/p2/completion_time_dist.png'
|
||
):
|
||
"""绘制真实的完成时间分布"""
|
||
fig, axes = plt.subplots(1, 2, figsize=(14, 6))
|
||
|
||
times = time_dist['completion_times']
|
||
energies = time_dist['completion_energies']
|
||
|
||
# 图1:完成时间分布
|
||
ax1 = axes[0]
|
||
ax1.hist(times, bins=40, color='steelblue', alpha=0.7, edgecolor='white')
|
||
ax1.axvline(time_dist['time_mean'], color='red', linestyle='--', linewidth=2,
|
||
label=f'Mean: {time_dist["time_mean"]:.1f}y')
|
||
ax1.axvline(time_dist['time_p50'], color='orange', linestyle=':', linewidth=2,
|
||
label=f'P50: {time_dist["time_p50"]:.1f}y')
|
||
ax1.axvline(time_dist['time_p95'], color='purple', linestyle='-.', linewidth=2,
|
||
label=f'P95: {time_dist["time_p95"]:.1f}y')
|
||
ax1.set_xlabel('Completion Time (years)', fontsize=12)
|
||
ax1.set_ylabel('Frequency', fontsize=12)
|
||
ax1.set_title('Completion Time Distribution\n(True First-Completion Time)', fontsize=13)
|
||
ax1.legend()
|
||
ax1.grid(True, alpha=0.3)
|
||
|
||
# 图2:完成能量分布
|
||
ax2 = axes[1]
|
||
ax2.hist(energies, bins=40, color='coral', alpha=0.7, edgecolor='white')
|
||
ax2.axvline(time_dist['energy_mean'], color='red', linestyle='--', linewidth=2,
|
||
label=f'Mean: {time_dist["energy_mean"]:.0f} PJ')
|
||
ax2.axvline(time_dist['energy_p95'], color='purple', linestyle='-.', linewidth=2,
|
||
label=f'P95: {time_dist["energy_p95"]:.0f} PJ')
|
||
ax2.set_xlabel('Energy at Completion (PJ)', fontsize=12)
|
||
ax2.set_ylabel('Frequency', fontsize=12)
|
||
ax2.set_title('Energy at Completion', fontsize=13)
|
||
ax2.legend()
|
||
ax2.grid(True, alpha=0.3)
|
||
|
||
plt.suptitle('True Completion Time & Energy Distribution', fontsize=15, y=1.02)
|
||
plt.tight_layout()
|
||
plt.savefig(save_path, dpi=150, bbox_inches='tight')
|
||
print(f"完成时间分布图已保存至: {save_path}")
|
||
|
||
|
||
def plot_sensitivity_analysis(
|
||
df: pd.DataFrame,
|
||
save_path: str = '/Volumes/Files/code/mm/20260130_b/p2/sensitivity_analysis.png'
|
||
):
|
||
"""绘制敏感性分析"""
|
||
fig, axes = plt.subplots(2, 2, figsize=(16, 12))
|
||
|
||
params = ['Rocket Success Rate', 'Elevator Availability', 'Weather Factor']
|
||
colors = ['steelblue', 'coral', 'seagreen']
|
||
|
||
baseline = df[df['Parameter'] == 'Baseline'].iloc[0]
|
||
|
||
for idx, param in enumerate(params):
|
||
ax = axes[idx // 2, idx % 2]
|
||
data = df[df['Parameter'] == param]
|
||
|
||
x = range(len(data))
|
||
values = data['Value'].tolist()
|
||
|
||
# 双Y轴:完成概率 + 能量
|
||
ax2 = ax.twinx()
|
||
|
||
bars = ax.bar(x, data['Completion Prob'] * 100, color=colors[idx], alpha=0.7, label='Completion %')
|
||
ax.axhline(baseline['Completion Prob'] * 100, color='red', linestyle='--',
|
||
label=f'Baseline: {baseline["Completion Prob"]*100:.0f}%')
|
||
|
||
ax2.plot(x, data['Energy P95 (PJ)'], 'ko-', linewidth=2, markersize=8, label='Energy P95')
|
||
|
||
ax.set_xticks(x)
|
||
ax.set_xticklabels(values, fontsize=10)
|
||
ax.set_xlabel(param, fontsize=12)
|
||
ax.set_ylabel('Completion Probability (%)', fontsize=12, color=colors[idx])
|
||
ax2.set_ylabel('Energy P95 (PJ)', fontsize=12)
|
||
ax.set_title(f'Sensitivity to {param}', fontsize=13)
|
||
ax.legend(loc='upper left')
|
||
ax2.legend(loc='upper right')
|
||
ax.grid(True, alpha=0.3, axis='y')
|
||
ax.set_ylim(0, 105)
|
||
|
||
# 图4:综合对比(文字摘要)
|
||
ax = axes[1, 1]
|
||
ax.axis('off')
|
||
|
||
# 构建摘要文本(避免字符串拼接问题)
|
||
summary_lines = [
|
||
"SENSITIVITY ANALYSIS SUMMARY",
|
||
"----------------------------------------",
|
||
"",
|
||
f"Baseline (139 years):",
|
||
f" Completion Prob: {baseline['Completion Prob']*100:.0f}%",
|
||
f" Energy Mean: {baseline['Energy Mean (PJ)']:.0f} PJ",
|
||
f" Energy P95: {baseline['Energy P95 (PJ)']:.0f} PJ",
|
||
"",
|
||
"Most Sensitive Factors:",
|
||
" 1. Elevator Availability -> Prob",
|
||
" 2. Rocket Success Rate -> Energy",
|
||
" 3. Weather Factor -> Capacity",
|
||
"",
|
||
"Risk Mitigation:",
|
||
" - Increase elevator maintenance",
|
||
" - Extend timeline by 5-10 years",
|
||
" - Prioritize low-latitude sites",
|
||
]
|
||
summary_text = "\n".join(summary_lines)
|
||
|
||
ax.text(0.1, 0.95, summary_text, transform=ax.transAxes,
|
||
fontsize=11, verticalalignment='top', fontfamily='monospace',
|
||
bbox=dict(boxstyle='round', facecolor='lightgreen', alpha=0.8))
|
||
|
||
plt.suptitle('Sensitivity Analysis: Impact of Uncertainty Parameters\n(Target: 139 years)',
|
||
fontsize=15, y=1.02)
|
||
plt.tight_layout()
|
||
plt.savefig(save_path, dpi=150, bbox_inches='tight')
|
||
print(f"敏感性分析图已保存至: {save_path}")
|
||
|
||
|
||
def plot_scenario_comparison(
|
||
results: Dict,
|
||
save_path: str = '/Volumes/Files/code/mm/20260130_b/p2/scenario_comparison.png'
|
||
):
|
||
"""绘制情景对比"""
|
||
fig, axes = plt.subplots(1, 3, figsize=(16, 6))
|
||
|
||
scenarios = ['Optimistic', 'Baseline', 'Pessimistic']
|
||
colors = ['green', 'steelblue', 'red']
|
||
|
||
# 完成概率
|
||
ax1 = axes[0]
|
||
probs = [results[s]['mc']['completion_probability'] * 100 for s in scenarios]
|
||
bars = ax1.bar(scenarios, probs, color=colors, alpha=0.7, edgecolor='black')
|
||
ax1.set_ylabel('Completion Probability (%)', fontsize=12)
|
||
ax1.set_title('Completion Probability', fontsize=13)
|
||
ax1.set_ylim(0, 105)
|
||
for bar, prob in zip(bars, probs):
|
||
ax1.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 1,
|
||
f'{prob:.0f}%', ha='center', fontsize=11, fontweight='bold')
|
||
ax1.grid(True, alpha=0.3, axis='y')
|
||
|
||
# 能量
|
||
ax2 = axes[1]
|
||
x = np.arange(len(scenarios))
|
||
width = 0.25
|
||
|
||
det_e = results['Baseline']['det']['total_energy_PJ']
|
||
means = [results[s]['mc']['energy_mean'] for s in scenarios]
|
||
p95s = [results[s]['mc']['energy_p95'] for s in scenarios]
|
||
|
||
ax2.bar(x - width, [det_e]*3, width, label='Deterministic', color='gray', alpha=0.5)
|
||
ax2.bar(x, means, width, label='MC Mean', color='steelblue', alpha=0.7)
|
||
ax2.bar(x + width, p95s, width, label='MC P95', color='coral', alpha=0.7)
|
||
|
||
ax2.set_xticks(x)
|
||
ax2.set_xticklabels(scenarios)
|
||
ax2.set_ylabel('Energy (PJ)', fontsize=12)
|
||
ax2.set_title('Energy Consumption', fontsize=13)
|
||
ax2.legend()
|
||
ax2.grid(True, alpha=0.3, axis='y')
|
||
|
||
# 发射次数
|
||
ax3 = axes[2]
|
||
success = [results[s]['mc']['avg_successful_launches'] for s in scenarios]
|
||
failed = [results[s]['mc']['avg_failed_launches'] for s in scenarios]
|
||
det_launches = results['Baseline']['det']['successful_launches']
|
||
|
||
ax3.bar(x - width, [det_launches]*3, width, label='Deterministic', color='gray', alpha=0.5)
|
||
ax3.bar(x, success, width, label='Successful', color='seagreen', alpha=0.7)
|
||
ax3.bar(x + width, failed, width, label='Failed', color='red', alpha=0.7)
|
||
|
||
ax3.set_xticks(x)
|
||
ax3.set_xticklabels(scenarios)
|
||
ax3.set_ylabel('Launches', fontsize=12)
|
||
ax3.set_title('Launch Statistics', fontsize=13)
|
||
ax3.legend()
|
||
ax3.grid(True, alpha=0.3, axis='y')
|
||
|
||
plt.suptitle('Scenario Comparison: Optimistic vs Baseline vs Pessimistic\n(Target: 139 years)',
|
||
fontsize=15, y=1.05)
|
||
plt.tight_layout()
|
||
plt.savefig(save_path, dpi=150, bbox_inches='tight')
|
||
print(f"情景对比图已保存至: {save_path}")
|
||
|
||
|
||
# ============== 报告打印 ==============
|
||
|
||
def print_report(params, det, mc, time_dist, pareto_df, det_knee, risk_knee):
|
||
"""打印完整分析报告(v2:改进口径和结论表述)"""
|
||
print("\n" + "=" * 80)
|
||
print("TASK 2: UNCERTAINTY & ROBUSTNESS ANALYSIS (v2)")
|
||
print("Method: Aggregated MC with Binomial Distribution + Monotonic Completion Time")
|
||
print("=" * 80)
|
||
|
||
print(f"\n1. UNCERTAINTY PARAMETERS (Beta/Triangular)")
|
||
print("-" * 50)
|
||
print(params)
|
||
|
||
print(f"\n2. DETERMINISTIC BASELINE (139 years)")
|
||
print("-" * 50)
|
||
print(f" Total Energy: {det['total_energy_PJ']:.0f} PJ")
|
||
print(f" Elevator Payload: {det['elevator_payload']/1e6:.1f} M tons")
|
||
print(f" Rocket Payload: {det['rocket_payload']/1e6:.1f} M tons")
|
||
print(f" Launches: {det['successful_launches']:,}")
|
||
|
||
print(f"\n3. MONTE CARLO SIMULATION (Binomial Model)")
|
||
print("-" * 50)
|
||
print(f" Simulations: {mc['n_simulations']:,}")
|
||
print(f" Completion Prob: {mc['completion_probability']:.1%}")
|
||
print()
|
||
|
||
# 无条件能量
|
||
energy_inc = (mc['energy_mean'] / det['total_energy_PJ'] - 1) * 100
|
||
print(f" Energy Mean (all): {mc['energy_mean']:.0f} PJ (+{energy_inc:.1f}%)")
|
||
print(f" Energy P95 (all): {mc['energy_p95']:.0f} PJ")
|
||
|
||
# 条件能量(只算完成的模拟)
|
||
if not np.isnan(mc['energy_given_completed']):
|
||
cond_inc = (mc['energy_given_completed'] / det['total_energy_PJ'] - 1) * 100
|
||
print(f" Energy Mean (|completed):{mc['energy_given_completed']:.0f} PJ (+{cond_inc:.1f}%)")
|
||
print(f" Energy P95 (|completed): {mc['energy_given_completed_p95']:.0f} PJ")
|
||
|
||
print(f"\n Avg Successful Launches: {mc['avg_successful_launches']:,.0f}")
|
||
print(f" Avg Failed Launches: {mc['avg_failed_launches']:,.0f}")
|
||
print(f" Failure Energy Ratio: {WEIGHTED_FAILURE_ENERGY_RATIO:.1%} (weighted avg)")
|
||
|
||
print(f"\n4. COMPLETION TIME DISTRIBUTION (Monotonic)")
|
||
print("-" * 50)
|
||
print(f" Mean: {time_dist['time_mean']:.1f} years")
|
||
print(f" Std: {time_dist['time_std']:.1f} years")
|
||
print(f" P5/P50/P95: {time_dist['time_p5']:.1f} / {time_dist['time_p50']:.1f} / {time_dist['time_p95']:.1f}")
|
||
|
||
# 95%可靠时间
|
||
feasible = pareto_df[pareto_df['feasible']]
|
||
min_feasible = feasible['years'].min() if len(feasible) > 0 else np.nan
|
||
|
||
print(f"\n5. DECISION GUIDANCE (ANSWER TO TASK 2)")
|
||
print("-" * 50)
|
||
print(" Task 1 recommended knee point: 139 years")
|
||
print()
|
||
print(" DECISION TABLE (Risk Preference → Timeline):")
|
||
print(" ┌─────────────────┬──────────┬───────────┐")
|
||
print(" │ Risk Tolerance │ Timeline │ Energy P95│")
|
||
print(" ├─────────────────┼──────────┼───────────┤")
|
||
|
||
# 找不同可靠性水平对应的时间
|
||
for prob_threshold, label in [(0.90, "Aggressive (P≥90%)"), (0.95, "Standard (P≥95%)"), (0.99, "Conservative (P≥99%)")]:
|
||
feasible_at_prob = pareto_df[pareto_df['prob'] >= prob_threshold]
|
||
if len(feasible_at_prob) > 0:
|
||
min_time = feasible_at_prob['years'].min()
|
||
energy = feasible_at_prob[feasible_at_prob['years'] == min_time]['energy_p95_mc'].values[0]
|
||
print(f" │ {label:15} │ {min_time:>5.0f} y │ {energy:>7.0f} PJ│")
|
||
print(" └─────────────────┴──────────┴───────────┘")
|
||
|
||
if not np.isnan(risk_knee['knee_years']):
|
||
print(f"\n Knee point in feasible region (P≥95%): {risk_knee['knee_years']:.0f} years")
|
||
print(f" → This is the Time-Energy tradeoff optimum within the safe zone")
|
||
|
||
print("\n" + "=" * 80)
|
||
print("ANSWER TO 'TO WHAT EXTENT DOES YOUR SOLUTION CHANGE?'")
|
||
print("-" * 50)
|
||
print(f" 1. Energy penalty: +{energy_inc:.0f}% (unconditional mean)")
|
||
if not np.isnan(mc['energy_given_completed']):
|
||
print(f" Energy penalty: +{cond_inc:.0f}% (given completion)")
|
||
print(f" 2. Failed launches: ~{mc['avg_failed_launches']:,.0f} ({mc['avg_failed_launches']/mc['avg_successful_launches']*100:.1f}% of successful)")
|
||
print(f" 3. Recommended timeline RANGE: {min_feasible:.0f}-{risk_knee['knee_years']:.0f} years")
|
||
print(f" - {min_feasible:.0f}y: minimum for 95% reliability")
|
||
print(f" - {risk_knee['knee_years']:.0f}y: optimal time-energy tradeoff")
|
||
print(f" 4. Most sensitive factor: Elevator Availability")
|
||
print("=" * 80)
|
||
|
||
|
||
# ============== 主程序 ==============
|
||
|
||
if __name__ == "__main__":
|
||
print("=" * 80)
|
||
print("MOON COLONY LOGISTICS - UNCERTAINTY ANALYSIS (TASK 2) v2")
|
||
print("Revised: Binomial MC + Monotonic Time + Conditional Energy + Decision Table")
|
||
print("=" * 80)
|
||
|
||
params = UncertaintyParams()
|
||
target_years = 139 # 任务一膝点
|
||
det_knee = 139 # 任务一确定性膝点
|
||
N_SIM = 2000
|
||
|
||
# 1. 确定性基准
|
||
print("\n[1/7] Calculating deterministic baseline...")
|
||
det = calculate_deterministic(target_years)
|
||
|
||
# 2. 蒙特卡洛模拟(二项分布)
|
||
print("[2/7] Running Monte Carlo simulation (Binomial)...")
|
||
mc = monte_carlo_binomial(target_years, params, N_SIM)
|
||
|
||
# 3. 完成时间分布
|
||
print("[3/7] Computing completion time distribution...")
|
||
time_dist = find_completion_time_distribution(params, n_simulations=1000)
|
||
|
||
# 4. 风险约束 Pareto 分析
|
||
print("[4/7] Computing risk-constrained Pareto front...")
|
||
pareto_df = pareto_with_risk_constraint(params, 0.95, (100, 200), 25, N_SIM)
|
||
risk_knee = find_knee_point_with_risk(pareto_df)
|
||
|
||
# 5. 敏感性分析
|
||
print("[5/7] Running sensitivity analysis...")
|
||
sens_df = sensitivity_analysis(params, target_years, N_SIM)
|
||
sens_df.to_csv('/Volumes/Files/code/mm/20260130_b/p2/sensitivity_results.csv', index=False)
|
||
|
||
# 6. 情景分析
|
||
print("[6/7] Running scenario comparison...")
|
||
scenarios = scenario_comparison(target_years, N_SIM)
|
||
|
||
# 打印报告
|
||
print_report(params, det, mc, time_dist, pareto_df, det_knee, risk_knee)
|
||
|
||
# 7. 生成图表
|
||
print("\n[7/7] Generating figures...")
|
||
plot_monte_carlo_results(mc, params, det)
|
||
plot_solution_changes(pareto_df, det_knee, risk_knee)
|
||
plot_completion_time_distribution(time_dist)
|
||
plot_sensitivity_analysis(sens_df)
|
||
plot_scenario_comparison(scenarios)
|
||
|
||
# 保存 Pareto 数据
|
||
pareto_df.to_csv('/Volumes/Files/code/mm/20260130_b/p2/pareto_risk_constrained.csv', index=False)
|
||
|
||
print("\n" + "=" * 80)
|
||
print("ANALYSIS COMPLETE!")
|
||
print("=" * 80)
|