How to Design a Streaming Decision Agent with Partial Reasoning, Online Replanning, and Reactive Mid-Execution Adaptation in Dynamic Environments

Paxful
How to Design a Streaming Decision Agent with Partial Reasoning, Online Replanning, and Reactive Mid-Execution Adaptation in Dynamic Environments
Blockonomics

from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Tuple, Generator
from collections import defaultdict
import time

@dataclass
class AgentConfig:
horizon: int = 6
replan_on_target_move: bool = True
replan_on_obstacle_change: bool = True
max_steps: int = 120
think_latency: float = 0.02
act_latency: float = 0.01
risk_gate: float = 0.85
alt_search_depth: int = 2

@dataclass
class StreamingDecisionAgent:
cfg: AgentConfig
world: DynamicGridWorld
start_time: float = field(init=False, default_factory=time.time)
step_id: int = field(init=False, default=0)
current_plan: List[Coord] = field(init=False, default_factory=list)
current_actions: List[str] = field(init=False, default_factory=list)
last_snapshot: Dict[str, Any] = field(init=False, default_factory=dict)
stats: Dict[str, Any] = field(init=False, default_factory=lambda: defaultdict(int))

def _now(self) -> float:
return time.time() – self.start_time

def _emit(self, kind: str, msg: str, data: Optional[Dict[str, Any]] = None) -> StreamEvent:
return StreamEvent(t=self._now(), kind=kind, step=self.step_id, msg=msg, data=data or {})

bybit

def _need_replan(self, obs: Dict[str, Any]) -> bool:
ch = obs[“changes”]
if obs[“done”]:
return False
if not self.current_plan or len(self.current_plan) <= 1:
return True
if self.cfg.replan_on_target_move and ch.get(“target_moved”):
return True
if self.cfg.replan_on_obstacle_change and (ch.get(“obstacles_added”) or ch.get(“obstacles_cleared”)):
return True
if len(self.current_plan) > 1 and self.current_plan[1] in self.world.obstacles:
return True
return False

def _plan(self) -> PlanResult:
time.sleep(self.cfg.think_latency)
self.stats[“replans”] += 1
return astar(self.world, self.world.agent, self.world.target)

def _choose_action(self, planned_action: str) -> Tuple[str, str]:
ax, ay = self.world.agent
action_to_delta = {“R”: (1, 0), “L”: (-1, 0), “D”: (0, 1), “U”: (0, -1), “S”: (0, 0)}
dx, dy = action_to_delta[planned_action]
nxt = (ax+dx, ay+dy)
if not self.world.in_bounds(nxt) or not self.world.passable(nxt):
self.stats[“overrides”] += 1
return “S”, “planned_move_invalid -> wait.”
r = action_risk(self.world, nxt)
if r > self.cfg.risk_gate:
candidates = [“U”, “D”, “L”, “R”, “S”]
best = (planned_action, float(“inf”), “keep_plan”)
for a in candidates:
dx, dy = action_to_delta[a]
p = (ax+dx, ay+dy)
if not self.world.in_bounds(p) or not self.world.passable(p):
continue
score = action_risk(self.world, p) + 0.05 * self.world.manhattan(p, self.world.target)
if score < best[1]:
best = (a, score, “risk_avoidance_override”)
if best[0] != planned_action:
self.stats[“overrides”] += 1
return best[0], best[2]
return planned_action, “follow_plan”

def run(self) -> Generator[StreamEvent, None, None]:
yield self._emit(“observe”, “Initialize: reading initial state.”, {“agent”: self.world.agent, “target”: self.world.target})
yield self._emit(“world”, “Initial world snapshot.”, {“grid”: self.world.render()})
for self.step_id in range(1, self.cfg.max_steps + 1):
if self.step_id == 1 or self._need_replan(self.last_snapshot):
pr = self._plan()
self.current_plan = pr.path
self.current_actions = path_to_actions(pr.path)
if pr.reason != “found_path”:
yield self._emit(“plan”, “Planner could not find a path within budget; switching to reactive exploration.”, {“reason”: pr.reason, “expanded”: pr.expanded})
self.current_actions = []
else:
horizon_path = pr.path[: max(2, min(len(pr.path), self.cfg.horizon + 1))]
yield self._emit(“plan”, f”Plan updated (online A*).”)

Bybit

Be the first to comment

Leave a Reply

Your email address will not be published.


*