Cyber-Physical Grounding¶
director_ai.core.cyber_physical screens proposed physical actions
(end-effector targets, actuator commands) against geometric, kinematic
and actuator limits before the action executes. Dependency-free by
default; lazy-loaded adapters for ROS 2 / MuJoCo / CARLA when those
stacks are present.
The base install does not install simulator SDKs. Use
pip install director-ai[physical] for the pinned MuJoCo Python
wheel. ROS 2 (rclpy) and CARLA are installed through their vendor
or distribution channels, then run behind the same isolated physical
runtime boundary.
Dependency boundary:
- Install
[physical]withuv sync --locked --extra physicalwhen working from the repository. - Keep ROS 2 and CARLA in vendor-managed images or venvs; do not add them to the base package.
- Pin simulator assets, robot description files, and driver images alongside the adapter version.
- Run adapters behind a local action gateway with CPU, memory, and request limits.
- Keep
physical_action_mode="warn"until the deployment has an external stop path and calibrated tenant budgets.
Quick start¶
from director_ai.core.cyber_physical import (
AABB, GroundingHook, JointChain, PhysicalAction,
SimpleKinematicModel, Vec3, WorkspaceConstraint,
)
chain = JointChain(base=Vec3(0, 0, 0), link_lengths=(1.0, 1.0))
model = SimpleKinematicModel(chain=chain)
workspace = AABB(min_corner=Vec3(-5, -5, -5), max_corner=Vec3(5, 5, 5))
hook = GroundingHook(
model=model,
constraints=(WorkspaceConstraint(name="cell", envelope=workspace),),
)
action = PhysicalAction(
actuator_id="arm",
target_position=Vec3(1.0, 0.0, 0.0),
velocity_magnitude=0.1,
torque_magnitude=0.5,
)
verdict = hook.evaluate(action)
if verdict.allowed:
robot.execute(action)
else:
for v in verdict.violations:
log.warning("%s rejected: %s", v.constraint, v.reason)
Primitives¶
Vec3¶
Immutable 3-D point / vector; component-wise + / - / scalar *,
dot, norm, distance, clamp. Rejects non-finite components on
construction.
AABB¶
Axis-aligned bounding box with contains(point), intersects(other),
expand(margin) and a centre property.
Sphere¶
Solid sphere with contains(point), intersects(other),
intersects_aabb(box). Uses the closest-point-on-box test; a negative
radius is rejected.
Kinematics¶
JointChain¶
Planar revolute chain — base position and a tuple of link lengths.
reach returns sum(link_lengths).
SimpleKinematicModel¶
Dependency-free KinematicModel implementation:
- Forward kinematics for arbitrary planar chains via cumulative angle walk.
- Analytical two-link IK from Spong's Robot Modeling and Control
(Ch. 5), both
elbow_upandelbow_downbranches. Longer chains raiseUnsupportedKinematicsError— the operator plugs in a numerical solver or capability-specificKinematicModeladapter for those. - Collision against
AABBandSphereobstacles, honouring the model'scollision_marginandend_effector_radius.
Actions & constraints¶
PhysicalAction¶
Frozen dataclass with actuator_id, target_position,
velocity_magnitude, torque_magnitude and optional joint_angles.
Construction rejects empty actuator ids and negative velocities /
torques.
PhysicalConstraint Protocol¶
Read-only name property + evaluate(action, model) -> str | None
returning either None (pass) or a short reason string (fail).
Four concrete implementations ship:
| Class | Checks |
|---|---|
SpatialConstraint |
End-effector must stay outside a given set of AABB / sphere obstacles. |
WorkspaceConstraint |
End-effector must stay inside an AABB envelope. |
VelocityConstraint |
Action velocity ≤ max_velocity. |
TorqueConstraint |
Action torque ≤ max_torque. |
GroundingHook¶
Orchestrator. evaluate(action) returns a GroundingVerdict listing
every failing constraint (no short-circuit) plus optional
reachability rejection if the kinematic model's inverse() returns
None for the target. Actions carrying an explicit joint_angles
vector skip the reachability check, since the joint vector already
fixes the posture.
Each verdict carries safety_event, a tenant-safe SafetyEvent with
the cyber_physical.grounding hook id, policy decision, constraint
references, and a bounded operator explanation.
Heavy-dependency adapters¶
All three are constructed via a from_* classmethod that imports the
heavy dependency lazily, so the subpackage imports cleanly on
deployments that do not install them.
Ros2Adapter.from_ros2(node)— rclpy node-driven, collision from caller-supplied obstacles. ROS 2 stays outside PyPI and must be installed through the target distribution in the isolated physical runtime. Forward / inverse kinematics are deployment-specific and raiseUnsupportedKinematicsErroruntil configured with FK / IK callables backed by robot-state services such as MoveIt FK / IK.MuJoCoAdapter.from_mjcf(path)— mjcf model loading,mj_forward+site_xposfor forward kinematics, livedata.nconcontact count for collisions. Inverse kinematics raisesUnsupportedKinematicsErrorunless an operator-supplied numerical solver is configured. Install viadirector-ai[physical].CarlaAdapter.from_carla(port=2000)— vehicle-class scenarios with spawn-filtered collision. Install CARLA from the simulator vendor into the isolated physical runtime. Joint-space FK / IK are intentionally unsupported; use CARLA vehicle controls or route planning for motion.
CoherenceAgent wiring¶
Pass a configured hook to the agent:
from director_ai.core.agent import CoherenceAgent
agent = CoherenceAgent(grounding_hook=hook)
verdict = agent.verify_physical_action(action)
Calling verify_physical_action with no hook configured raises
RuntimeError so callers cannot silently skip the check.
CoherenceAgent treats physical failures as warnings by default.
Blocking requires both physical_action_mode="block" and
allow_physical_action_blocking=True.
Tenant budgets¶
Use TenantPhysicalBudget on GroundingHook to cap expensive physical
checks per tenant:
from director_ai.core.cyber_physical import (
PhysicalBudgetLimits,
TenantPhysicalBudget,
)
budget = TenantPhysicalBudget(
PhysicalBudgetLimits(
window_seconds=60.0,
max_action_validations=120,
max_inverse_kinematics=30,
max_simulation_checks=60,
max_sensor_fusion=60,
)
)
hook = GroundingHook(model=model, constraints=constraints, budget=budget)
The hook consumes one action-validation unit for every proposed action,
one inverse-kinematics unit before calling model.inverse, and one
simulation-check unit before spatial constraints call model.collides_with.
PhysicalGroundingEvaluator additionally consumes one sensor-fusion unit
before comparing perception and simulator snapshots.
Budget exhaustion blocks before the expensive operation is called. This
budget block is not converted into warn-only mode, so it still prevents
tenant payloads from exhausting physical runtimes when
CoherenceAgent uses the default advisory physical policy.
Closed-loop evaluator¶
PhysicalGroundingEvaluator compares pre-action perception state,
pre-action simulator state, the planned PhysicalAction, and optional
post-action perception/simulator snapshots. It is a policy-control layer
over GroundingHook, not a replacement for kinematic checks.
from director_ai.core.cyber_physical import (
PhysicalGroundingEvaluator,
SensorStateSnapshot,
)
from director_ai.core.guard_control import RiskEnvelope
evaluator = PhysicalGroundingEvaluator(
grounding_hook=hook,
high_risk_physical_deployment=False,
state_tolerance_m=0.05,
budget=budget,
)
pre_perception = SensorStateSnapshot(
snapshot_ref="sensor://pre-1",
sensor_id="camera-1",
adapter_id="vision.cell",
timestamp=1.0,
end_effector_position=Vec3(0.0, 0.0, 0.0),
)
pre_simulation = SensorStateSnapshot(
snapshot_ref="sim://pre-1",
sensor_id="simulator-1",
adapter_id="sim.cell",
timestamp=1.0,
end_effector_position=Vec3(0.01, 0.0, 0.0),
)
result = evaluator.evaluate(
action=action,
risk_envelope=RiskEnvelope(
action_category="physical",
reversibility="reversible",
domain="physical",
calibrated_threshold=0.6,
no_go_threshold=0.8,
),
pre_perception=pre_perception,
pre_simulation=pre_simulation,
tenant_id="tenant-a",
)
Production semantics:
- unsupported sensors/adapters produce explicit
unsupportedviolations rather than silent passes - perception/simulator mismatch produces
warnby default - mismatch becomes
blockonly whenhigh_risk_physical_deployment=True - irreversible actions go through no-go policy and require human review
- post-action checks compare both perception and simulation with the planned target and with each other
SensorStateSnapshotserialises references and coordinates only; it does not serialise raw frames, point clouds, prompts, credentials, or driver payloads
Live physical grounding loop¶
PhysicalGroundingLoop wraps the evaluator for live deployments. It first runs
a pre-action sensor-fusion check, executes the supplied action callback only
when the pre-check allows, then re-runs the evaluator with post-action
perception and simulation snapshots.
High-risk physical deployments require both post-action suppliers. Missing
post-action sensor fusion returns a blocking evaluation with reason
post_action_sensor_fusion_required; the action callback is not invoked.
from director_ai.core.cyber_physical import PhysicalGroundingLoop
loop = PhysicalGroundingLoop(
evaluator=evaluator,
execute_action=lambda action: robot_driver.move_to(action.target_position),
)
result = loop.run(
action=action,
risk_envelope=risk_envelope,
pre_perception=pre_perception,
pre_simulation=pre_simulation,
post_perception=lambda: camera_adapter.snapshot(),
post_simulation=lambda: simulator_adapter.snapshot(),
tenant_id="tenant-a",
)
if result.final_evaluation.decision.decision == "block":
raise RuntimeError(result.final_evaluation.reason)
The loop deliberately stores only SensorStateSnapshot references and numeric
coordinates. Raw camera frames, simulator dumps, driver payloads, credentials,
and private sensor packets remain outside Director telemetry.
director_ai.core.cyber_physical.closed_loop.SensorStateSnapshot
dataclass
¶
director_ai.core.cyber_physical.closed_loop.PhysicalGroundingEvaluator
¶
PhysicalGroundingEvaluator(*, grounding_hook: GroundingHook, high_risk_physical_deployment: bool = False, state_tolerance_m: float = 0.05, budget: TenantPhysicalBudget | None = None, no_go_policy: NoGoPolicy | None = None)
Compare perception, simulation, action, and post-action state.
high_risk_physical_deployment
property
¶
Report whether this evaluator runs in high-risk deployment mode.
evaluate
¶
evaluate(*, action: PhysicalAction, risk_envelope: RiskEnvelope, pre_perception: SensorStateSnapshot, pre_simulation: SensorStateSnapshot, post_perception: SensorStateSnapshot | None = None, post_simulation: SensorStateSnapshot | None = None, tenant_id: str = '') -> PhysicalGroundingEvaluation
Run pre-action grounding, snapshot consistency, and post-action checks.
blocking_evaluation
¶
blocking_evaluation(*, action: PhysicalAction, risk_envelope: RiskEnvelope, pre_action: GroundingVerdict, reason: str, violations: tuple[PhysicalGroundingViolation, ...], post_action_verified: bool) -> PhysicalGroundingEvaluation
Build a maximal-risk evaluation that blocks the action.
Used when a pre-action check already failed; pins the risk score to 1.0 and carries the violations forward into the verdict.
director_ai.core.cyber_physical.closed_loop.PhysicalGroundingLoop
¶
PhysicalGroundingLoop(*, evaluator: PhysicalGroundingEvaluator, execute_action: Callable[[PhysicalAction], None] | None = None)
Live pre-action / actuation / post-action grounding orchestrator.
run
¶
run(*, action: PhysicalAction, risk_envelope: RiskEnvelope, pre_perception: SensorStateSnapshot, pre_simulation: SensorStateSnapshot, post_perception: Callable[[], SensorStateSnapshot] | None = None, post_simulation: Callable[[], SensorStateSnapshot] | None = None, tenant_id: str = '') -> PhysicalGroundingLoopResult
Gate a live action on pre-state and mandatory post-state checks.
director_ai.core.cyber_physical.closed_loop.PhysicalGroundingLoopResult
dataclass
¶
PhysicalGroundingLoopResult(pre_evaluation: PhysicalGroundingEvaluation, final_evaluation: PhysicalGroundingEvaluation, action_executed: bool)
Live physical grounding loop result.
director_ai.core.cyber_physical.closed_loop.PhysicalGroundingEvaluation
dataclass
¶
PhysicalGroundingEvaluation(action: PhysicalAction, decision: GuardDecision, reason: str, pre_action: GroundingVerdict, post_action_verified: bool, requires_human_review: bool = False, violations: tuple[PhysicalGroundingViolation, ...] = tuple())
Closed-loop evaluation result.
director_ai.core.cyber_physical.closed_loop.PhysicalGroundingViolation
dataclass
¶
PhysicalGroundingViolation(stage: str, status: PhysicalGroundingStatus, constraint: str, reason: str, evidence_refs: tuple[str, ...])
One closed-loop physical grounding violation.
Rust acceleration¶
When backfire_kernel is installed, AABB.contains, Sphere.contains
/ intersects / intersects_aabb and the two-link IK dispatch to
the rust_* FFI functions. Bit-exact on geometry, < 1e-12 ULP on
the trigonometric IK path. No change on the caller side.