Files
traffic-safe/services/strategy.py

158 lines
7.0 KiB
Python
Raw Normal View History

from __future__ import annotations
import pandas as pd
import streamlit as st
from services.forecast import fit_and_extrapolate, arima_forecast_with_grid_search
from config.settings import MIN_PRE_DAYS, MAX_PRE_DAYS
def evaluate_strategy_effectiveness(actual_series: pd.Series,
counterfactual_series: pd.Series,
severity_series: pd.Series,
strategy_date: pd.Timestamp,
window: int = 30):
strategy_date = pd.to_datetime(strategy_date)
window_end = strategy_date + pd.Timedelta(days=window - 1)
pre_sev = severity_series.loc[strategy_date - pd.Timedelta(days=window):strategy_date - pd.Timedelta(days=1)].sum()
post_sev = severity_series.loc[strategy_date:window_end].sum()
actual_post = actual_series.loc[strategy_date:window_end]
counter_post = counterfactual_series.loc[strategy_date:window_end].reindex(actual_post.index)
window_len = len(actual_post)
if window_len == 0:
return False, False, (0.0, 0.0), '三级'
effective_days = (actual_post < counter_post).sum()
count_effective = effective_days >= (window_len / 2)
severity_effective = post_sev < pre_sev
cf_sum = counter_post.sum()
F1 = (cf_sum - actual_post.sum()) / cf_sum if cf_sum > 0 else 0.0
F2 = (pre_sev - post_sev) / pre_sev if pre_sev > 0 else 0.0
if F1 > 0.5 and F2 > 0.5:
safety_state = '一级'
elif F1 > 0.3:
safety_state = '二级'
else:
safety_state = '三级'
return count_effective, severity_effective, (F1, F2), safety_state
@st.cache_data(show_spinner=False)
def generate_output_and_recommendations(combined_data: pd.DataFrame,
strategy_types: list,
region: str = '全市',
horizon: int = 30):
results = {}
combined_data = combined_data.copy().asfreq('D')
combined_data[['accident_count','severity']] = combined_data[['accident_count','severity']].fillna(0)
combined_data['strategy_type'] = combined_data['strategy_type'].apply(lambda x: x if isinstance(x, list) else [])
acc_full = combined_data['accident_count']
sev_full = combined_data['severity']
max_fit_days = max(horizon + 60, MAX_PRE_DAYS)
for strategy in strategy_types:
has_strategy = combined_data['strategy_type'].apply(lambda x: strategy in x)
if not has_strategy.any():
continue
candidate_dates = has_strategy[has_strategy].index
intervention_date = None
fit_start_dt = None
for dt in candidate_dates:
fit_start_dt = max(acc_full.index.min(), dt - pd.Timedelta(days=max_fit_days))
pre_hist = acc_full.loc[fit_start_dt:dt - pd.Timedelta(days=1)]
if len(pre_hist) >= MIN_PRE_DAYS:
intervention_date = dt
break
if intervention_date is None:
intervention_date = candidate_dates[0]
fit_start_dt = max(acc_full.index.min(), intervention_date - pd.Timedelta(days=max_fit_days))
acc = acc_full.loc[fit_start_dt:]
sev = sev_full.loc[fit_start_dt:]
horizon_eff = max(7, min(horizon, len(acc.loc[intervention_date:]) ))
glm_pred, svr_pred, residuals = fit_and_extrapolate(acc, intervention_date, days=horizon_eff)
counter = None
if svr_pred is not None:
counter = svr_pred
elif glm_pred is not None:
counter = glm_pred
else:
try:
arima_df = arima_forecast_with_grid_search(acc.loc[:intervention_date],
start_date=intervention_date + pd.Timedelta(days=1),
horizon=horizon_eff)
counter = pd.Series(arima_df['forecast'].values, index=arima_df.index, name='cf_arima')
residuals = (acc.reindex(counter.index) - counter)
except Exception:
counter = None
if counter is None:
continue
count_eff, sev_eff, (F1, F2), state = evaluate_strategy_effectiveness(
actual_series=acc,
counterfactual_series=counter,
severity_series=sev,
strategy_date=intervention_date,
window=horizon_eff
)
results[strategy] = {
'effect_strength': float(residuals.dropna().mean()) if residuals is not None else 0.0,
'adaptability': float(F1 + F2),
'count_effective': bool(count_eff),
'severity_effective': bool(sev_eff),
'safety_state': state,
'F1': float(F1),
'F2': float(F2),
'intervention_date': str(intervention_date.date())
}
# Secondary attempt with 14-day window if no results
if not results:
for strategy in strategy_types:
has_strategy = combined_data['strategy_type'].apply(lambda x: strategy in x)
if not has_strategy.any():
continue
intervention_date = has_strategy[has_strategy].index[0]
glm_pred, svr_pred, residuals = fit_and_extrapolate(acc_full, intervention_date, days=14)
counter = None
if svr_pred is not None:
counter = svr_pred
elif glm_pred is not None:
counter = glm_pred
else:
try:
arima_df = arima_forecast_with_grid_search(acc_full.loc[:intervention_date],
start_date=intervention_date + pd.Timedelta(days=1),
horizon=14)
counter = pd.Series(arima_df['forecast'].values, index=arima_df.index, name='cf_arima')
residuals = (acc_full.reindex(counter.index) - counter)
except Exception:
counter = None
if counter is None:
continue
count_eff, sev_eff, (F1, F2), state = evaluate_strategy_effectiveness(
actual_series=acc_full,
counterfactual_series=counter,
severity_series=sev_full,
strategy_date=intervention_date,
window=14
)
results[strategy] = {
'effect_strength': float(residuals.dropna().mean()) if residuals is not None else 0.0,
'adaptability': float(F1 + F2),
'count_effective': bool(count_eff),
'severity_effective': bool(sev_eff),
'safety_state': state,
'F1': float(F1),
'F2': float(F2),
'intervention_date': str(intervention_date.date())
}
best_strategy = max(results, key=lambda x: results[x]['adaptability']) if results else None
recommendation = f"建议在{region}区域长期实施策略类型 {best_strategy}" if best_strategy else "无足够数据推荐策略"
return results, recommendation