Source code for scpn_fusion.control.safe_rl_controller

# SPDX-License-Identifier: AGPL-3.0-or-later | Commercial license available
# © Concepts 1996–2026 Miroslav Šotek. All rights reserved.
# © Code 2020–2026 Miroslav Šotek. All rights reserved.
# ORCID: 0009-0009-3560-0851
# Contact: www.anulum.li | protoscience@anulum.li
# SCPN Fusion Core — Constrained Safe Reinforcement Learning
from __future__ import annotations

from dataclasses import dataclass
from typing import Any, Callable

import numpy as np


[docs] @dataclass class SafetyConstraint: """Named constraint: cost_fn(obs, act, next_obs) must stay below limit.""" name: str cost_fn: Callable[[np.ndarray, np.ndarray, np.ndarray], float] limit: float
[docs] class ConstrainedGymTokamakEnv: """Wrapper to compute and report constraint costs.""" def __init__(self, base_env: Any, constraints: list[SafetyConstraint]): self.base_env = base_env self.constraints = constraints self.n_constraints = len(constraints) self.action_space = base_env.action_space self.observation_space = base_env.observation_space
[docs] def reset(self, **kwargs: Any) -> tuple[np.ndarray, dict[str, Any]]: """Reset base environment and cache initial observation.""" obs, info = self.base_env.reset(**kwargs) self._last_obs = obs return obs, info
[docs] def step(self, action: np.ndarray) -> tuple[np.ndarray, float, bool, bool, dict[str, Any]]: """Step base env and append constraint costs to info dict.""" obs, reward, terminated, truncated, info = self.base_env.step(action) costs = [] for c in self.constraints: c_val = c.cost_fn(self._last_obs, action, obs) costs.append(c_val) info["constraint_costs"] = costs self._last_obs = obs return obs, reward, terminated, truncated, info
[docs] class LagrangianPPO: """PPO augmented with Lagrangian multipliers for constrained RL.""" def __init__(self, env: ConstrainedGymTokamakEnv, lambda_lr: float = 0.01, gamma: float = 0.99): self.env = env self.n_constraints = env.n_constraints self.lambdas = np.zeros(self.n_constraints) self.lambda_lr = lambda_lr self.gamma = gamma self.trained = False def _augmented_reward(self, reward: float, costs: list[float]) -> float: """r_aug = r - sum(lambda_i * c_i).""" penalty = sum(lam * c for lam, c in zip(self.lambdas, costs)) return float(reward - penalty)
[docs] def update_lambdas(self, episode_costs: list[float]) -> None: """Dual gradient ascent: lambda_i <- max(0, lambda_i + lr*(C_i - d_i)).""" for i, c in enumerate(episode_costs): limit = self.env.constraints[i].limit grad = c - limit self.lambdas[i] = max(0.0, self.lambdas[i] + self.lambda_lr * grad)
[docs] def train(self, total_timesteps: int) -> None: """Mock training loop — collects rollouts and updates lambdas.""" current_step = 0 while current_step < total_timesteps: obs, info = self.env.reset() done = False ep_costs = [0.0] * self.n_constraints steps = 0 while not done and steps < 100: action = self.env.action_space.sample() obs, reward, term, trunc, info = self.env.step(action) done = term or trunc costs = info.get("constraint_costs", [0.0] * self.n_constraints) for i in range(self.n_constraints): ep_costs[i] += costs[i] current_step += 1 steps += 1 self.update_lambdas(ep_costs) self.trained = True
[docs] def predict(self, obs: np.ndarray) -> np.ndarray: """Return action for given observation. Currently samples randomly.""" return np.asarray(self.env.action_space.sample())
[docs] def q95_cost_fn(obs: np.ndarray, act: np.ndarray, next_obs: np.ndarray) -> float: q95 = next_obs[2] return float(max(0.0, 2.0 - q95))
[docs] def beta_n_cost_fn(obs: np.ndarray, act: np.ndarray, next_obs: np.ndarray) -> float: beta_N = next_obs[1] return float(max(0.0, beta_N - 3.5))
[docs] def ip_cost_fn(obs: np.ndarray, act: np.ndarray, next_obs: np.ndarray) -> float: Ip = next_obs[0] return float(max(0.0, -Ip))
[docs] def default_safety_constraints() -> list[SafetyConstraint]: return [ SafetyConstraint("q95_lower_bound", q95_cost_fn, limit=0.0), SafetyConstraint("beta_n_upper_bound", beta_n_cost_fn, limit=0.0), SafetyConstraint("ip_positive", ip_cost_fn, limit=0.0), ]