diff options
Diffstat (limited to 'crates/atuin-ai/src/fsm/mod.rs')
| -rw-r--r-- | crates/atuin-ai/src/fsm/mod.rs | 1103 |
1 files changed, 0 insertions, 1103 deletions
diff --git a/crates/atuin-ai/src/fsm/mod.rs b/crates/atuin-ai/src/fsm/mod.rs deleted file mode 100644 index 3d72a3ae..00000000 --- a/crates/atuin-ai/src/fsm/mod.rs +++ /dev/null @@ -1,1103 +0,0 @@ -//! Agent conversation FSM. -//! -//! Pure state machine that returns effects as data. -//! The driver is responsible for executing effects and feeding events back. -//! -//! The FSM owns the conversation event log and tool lifecycle state. -//! It never performs IO directly. - -pub(crate) mod effects; -pub(crate) mod events; -pub(crate) mod tools; - -#[cfg(test)] -mod tests; - -use std::collections::HashMap; - -use serde_json::Value; - -use crate::context_window::ContextWindowBuilder; -use crate::tui::state::ConversationEvent; - -use effects::{Effect, ExitAction, PermissionTarget, TimeoutKind}; -use events::{Event, PermissionChoice, PermissionResponse}; -use tools::{ToolManager, ToolState}; - -// ============================================================================ -// State -// ============================================================================ - -/// The discrete states of the agent FSM. -#[derive(Debug, Clone, PartialEq, Eq)] -pub(crate) enum AgentState { - /// Waiting for user input. - Idle { - confirmation: Option<PendingConfirmation>, - }, - - /// A conversation turn is in progress. - Turn { stream: StreamPhase }, - - /// Unrecoverable error. User can retry or exit. - Error(String), -} - -/// Stream connection lifecycle within a Turn. -#[derive(Debug, Clone, PartialEq, Eq)] -pub(crate) enum StreamPhase { - /// Request sent, awaiting first stream frame. - Connecting, - /// Actively receiving streamed response. - Streaming { status: Option<StreamingStatus> }, - /// Stream connection has ended (Done received). - Done, -} - -/// Streaming status indicators from server. -#[derive(Debug, Clone, PartialEq, Eq)] -pub(crate) enum StreamingStatus { - Processing, - Searching, - Thinking, - WaitingForTools, -} - -impl StreamingStatus { - pub(crate) fn from_str(s: &str) -> Self { - match s { - "processing" => Self::Processing, - "searching" => Self::Searching, - "waiting_for_tools" => Self::WaitingForTools, - _ => Self::Thinking, - } - } -} - -/// Pending dangerous command confirmation state. -#[derive(Debug, Clone, PartialEq, Eq)] -pub(crate) struct PendingConfirmation { - pub command: String, - pub timeout_id: u64, -} - -// ============================================================================ -// Context -// ============================================================================ - -/// Shared context owned by the FSM. -#[derive(Debug, Clone)] -pub(crate) struct AgentContext { - /// The full conversation event log (source of truth for API + persistence). - pub events: Vec<ConversationEvent>, - /// Server-assigned session ID. - pub session_id: Option<String>, - /// Accumulated text from current stream (committed to events on tool call or stream end). - pub current_response: String, - /// Per-tool lifecycle state and cached render data. - /// Tools persist across turns for rendering history. - pub tools: ToolManager, - /// Tool IDs that belong to the current turn. Cleared on continuation start. - /// Used to determine whether a turn needs continuation (has unprocessed results). - current_turn_tool_ids: Vec<String>, - /// Maps timeout_id → tool_id for active tool execution timeouts. - /// Cleaned up when a tool completes naturally, so stale timeouts are ignored. - tool_timeout_ids: HashMap<u64, String>, - /// Counter for generating unique timeout IDs. - next_timeout_id: u64, - /// Capabilities advertised to the server. - pub capabilities: Vec<String>, - /// Unique invocation ID for this CLI invocation. - pub invocation_id: String, - - // ─── View state (owned by FSM for atomic transitions) ─────── - /// Index into events where the current TUI invocation starts. - /// Events before this are context for the API but not rendered. - pub view_start_index: usize, - /// Whether this session was resumed from a prior invocation. - pub is_resumed: bool, - /// Time of the last event from a previous invocation. - pub last_event_time: Option<chrono::DateTime<chrono::Utc>>, - /// Events from archived sessions (/new) still rendered on screen. - pub archived_events: Vec<ConversationEvent>, -} - -impl AgentContext { - fn next_timeout_id(&mut self) -> u64 { - let id = self.next_timeout_id; - self.next_timeout_id += 1; - id - } -} - -// ============================================================================ -// The Agent FSM -// ============================================================================ - -/// The agent finite state machine. -/// -/// Pure state machine — `handle()` takes an event, mutates internal state, -/// and returns effects as data for the driver to execute. -#[derive(Debug, Clone)] -pub(crate) struct AgentFsm { - pub state: AgentState, - pub ctx: AgentContext, -} - -impl AgentFsm { - /// Create a new FSM in Idle state. - pub fn new(capabilities: Vec<String>, invocation_id: String) -> Self { - Self { - state: AgentState::Idle { confirmation: None }, - ctx: AgentContext { - events: Vec::new(), - session_id: None, - current_response: String::new(), - tools: ToolManager::new(), - current_turn_tool_ids: Vec::new(), - tool_timeout_ids: HashMap::new(), - next_timeout_id: 0, - capabilities, - invocation_id, - view_start_index: 0, - is_resumed: false, - last_event_time: None, - archived_events: Vec::new(), - }, - } - } - - /// Create an FSM from saved session state (for resume). - pub fn from_session( - events: Vec<ConversationEvent>, - session_id: Option<String>, - capabilities: Vec<String>, - invocation_id: String, - view_start_index: usize, - is_resumed: bool, - last_event_time: Option<chrono::DateTime<chrono::Utc>>, - ) -> Self { - Self { - state: AgentState::Idle { confirmation: None }, - ctx: AgentContext { - events, - session_id, - current_response: String::new(), - tools: ToolManager::new(), - current_turn_tool_ids: Vec::new(), - tool_timeout_ids: HashMap::new(), - next_timeout_id: 0, - capabilities, - invocation_id, - view_start_index, - is_resumed, - last_event_time, - archived_events: Vec::new(), - }, - } - } - - /// Handle an event, returning effects to execute. - pub fn handle(&mut self, event: Event) -> Vec<Effect> { - match (&self.state, event) { - // ================================================================ - // Idle state - // ================================================================ - (AgentState::Idle { confirmation: None }, Event::UserSubmit(msg)) => { - self.start_turn(msg) - } - - ( - AgentState::Idle { - confirmation: Some(_), - }, - Event::UserSubmit(msg), - ) => self.start_turn(msg), - - (AgentState::Idle { confirmation: None }, Event::ExecuteCommand) => { - let cmd = self.current_command(); - let Some(cmd) = cmd else { - // No command suggested — exit - return vec![Effect::ExitApp(ExitAction::Cancel)]; - }; - if self.is_current_command_dangerous() { - let timeout_id = self.ctx.next_timeout_id(); - self.state = AgentState::Idle { - confirmation: Some(PendingConfirmation { - command: cmd, - timeout_id, - }), - }; - vec![Effect::ScheduleTimeout { - timeout_id, - duration: std::time::Duration::from_secs(5), - kind: TimeoutKind::Confirmation, - }] - } else { - vec![Effect::ExitApp(ExitAction::Execute(cmd))] - } - } - - ( - AgentState::Idle { - confirmation: Some(_), - }, - Event::ExecuteCommand, - ) => { - let confirm = self.state_confirmation().unwrap().clone(); - self.state = AgentState::Idle { confirmation: None }; - vec![Effect::ExitApp(ExitAction::Execute(confirm.command))] - } - - (AgentState::Idle { .. }, Event::InsertCommand) => { - let cmd = self.current_command(); - match cmd { - Some(cmd) => vec![Effect::ExitApp(ExitAction::Insert(cmd))], - None => vec![], - } - } - - ( - AgentState::Idle { - confirmation: Some(_), - }, - Event::Cancel, - ) => { - self.state = AgentState::Idle { confirmation: None }; - vec![] - } - - (AgentState::Idle { confirmation: None }, Event::Cancel) => { - vec![Effect::ExitApp(ExitAction::Cancel)] - } - - (AgentState::Idle { .. }, Event::ConfirmationTimeout { timeout_id }) => { - if self - .state_confirmation() - .is_some_and(|c| c.timeout_id == timeout_id) - { - self.state = AgentState::Idle { confirmation: None }; - } - vec![] - } - - (AgentState::Idle { .. }, Event::NewSession) => { - // Archive visible events so they remain on screen but aren't - // sent to the API. Tools persist for rendering. - let visible = self.ctx.events[self.ctx.view_start_index..].to_vec(); - self.ctx.archived_events.extend(visible); - - self.ctx.events.clear(); - self.ctx.session_id = None; - self.ctx.current_turn_tool_ids.clear(); - self.ctx.view_start_index = 0; - self.ctx.is_resumed = false; - - // Add OOB indicator for the new session - self.ctx.events.push(ConversationEvent::OutOfBandOutput { - name: "System".to_string(), - command: Some("/new".to_string()), - content: "Started a new session.".to_string(), - }); - - self.state = AgentState::Idle { confirmation: None }; - vec![Effect::ArchiveSession, Effect::Persist] - } - - (AgentState::Idle { .. }, Event::SlashCommand { command, content }) => { - self.handle_slash_command(&command, &content); - vec![] - } - - ( - AgentState::Idle { .. }, - Event::SkillLoaded { - name, - arguments, - content, - }, - ) => { - self.ctx.events.push(ConversationEvent::SkillInvocation { - name, - arguments, - content, - }); - self.ctx.current_response.clear(); - self.ctx.current_turn_tool_ids.clear(); - - let messages = self.build_messages(); - let session_id = self.ctx.session_id.clone(); - self.state = AgentState::Turn { - stream: StreamPhase::Connecting, - }; - vec![Effect::StartStream { - messages, - session_id, - }] - } - - // ================================================================ - // Turn — stream lifecycle - // ================================================================ - ( - AgentState::Turn { - stream: StreamPhase::Connecting, - }, - Event::StreamStarted, - ) => { - self.state = AgentState::Turn { - stream: StreamPhase::Streaming { status: None }, - }; - vec![] - } - - ( - AgentState::Turn { - stream: StreamPhase::Connecting, - }, - Event::StreamError(e), - ) => { - self.state = AgentState::Error(e); - vec![] - } - - ( - AgentState::Turn { - stream: StreamPhase::Streaming { .. }, - }, - Event::StreamChunk(text), - ) => { - self.ctx.current_response.push_str(&text); - vec![] - } - - ( - AgentState::Turn { - stream: StreamPhase::Streaming { .. }, - }, - Event::StreamStatusChanged(status), - ) => { - self.state = AgentState::Turn { - stream: StreamPhase::Streaming { - status: Some(StreamingStatus::from_str(&status)), - }, - }; - vec![] - } - - (AgentState::Turn { .. }, Event::StreamToolCall { id, name, input }) => { - self.commit_streaming_text(); - self.handle_stream_tool_call(id, name, input) - } - - (AgentState::Turn { .. }, Event::SuggestCommand { id, input }) => { - self.commit_streaming_text(); - // Push the suggest_command as a ToolCall event (protocol requirement) - self.ctx.events.push(ConversationEvent::ToolCall { - id, - name: "suggest_command".to_string(), - input, - }); - self.state = AgentState::Idle { confirmation: None }; - vec![Effect::Persist] - } - - ( - AgentState::Turn { - stream: StreamPhase::Streaming { .. }, - }, - Event::StreamServerToolResult { - tool_use_id, - content, - is_error, - remote, - content_length, - }, - ) => { - self.ctx.events.push(ConversationEvent::ToolResult { - tool_use_id, - content, - is_error, - remote, - content_length, - }); - vec![] - } - - (AgentState::Turn { .. }, Event::StreamDone { session_id }) => { - self.commit_streaming_text(); - if !session_id.is_empty() { - self.ctx.session_id = Some(session_id); - } - self.state = AgentState::Turn { - stream: StreamPhase::Done, - }; - self.check_turn_completion() - } - - ( - AgentState::Turn { - stream: StreamPhase::Streaming { .. }, - }, - Event::StreamError(e), - ) => { - // Abort any executing tools on stream error - let abort_effects: Vec<_> = self - .ctx - .tools - .executing_ids() - .into_iter() - .map(|tool_id| Effect::AbortTool { tool_id }) - .collect(); - self.ctx.tool_timeout_ids.clear(); - self.state = AgentState::Error(e); - abort_effects - } - - // ================================================================ - // Turn — tool lifecycle (any stream phase) - // ================================================================ - (AgentState::Turn { .. }, Event::PermissionResolved { tool_id, response }) => { - self.handle_permission_resolved(tool_id, response) - } - - (AgentState::Turn { .. }, Event::PermissionUserChoice { tool_id, choice }) => { - self.handle_permission_choice(tool_id, choice) - } - - ( - AgentState::Turn { .. }, - Event::ToolExecutionDone { - tool_id, - outcome, - preview, - }, - ) => self.handle_tool_done(tool_id, outcome, preview), - - ( - AgentState::Turn { .. }, - Event::ToolPreviewUpdate { - tool_id, - lines, - exit_code, - }, - ) => { - if let Some(tracked) = self.ctx.tools.get_mut(&tool_id) { - if tracked.is_resolved() { - // Tool already completed — a late preview update raced with - // ToolExecutionDone. Update lines (they may carry the final - // screen) but preserve the finalized exit_code/interrupted. - if let Some(tools::ToolPreviewData::Shell { - lines: existing_lines, - .. - }) = &mut tracked.preview - { - *existing_lines = lines; - } - } else { - tracked.preview = Some(tools::ToolPreviewData::Shell { - lines, - exit_code, - interrupted: None, - }); - } - } - vec![] - } - - (AgentState::Turn { .. }, Event::InterruptTools) => { - let ids = self.ctx.tools.executing_ids(); - for id in &ids { - if let Some(tracked) = self.ctx.tools.get_mut(id) { - tracked.interrupt_reason = Some(tools::InterruptReason::User); - } - // Clear any pending execution timeout for this tool - self.ctx.tool_timeout_ids.retain(|_, tid| tid != id); - } - ids.into_iter() - .map(|tool_id| Effect::AbortTool { tool_id }) - .collect() - } - - ( - AgentState::Turn { .. }, - Event::ToolExecutionTimeout { - timeout_id, - tool_id, - }, - ) => self.handle_tool_execution_timeout(timeout_id, tool_id), - - // ─── Cancel during Turn ───────────────────────────────────── - (AgentState::Turn { stream }, Event::Cancel) => { - let mut effects = Vec::new(); - - // Abort stream if still active - if !matches!(stream, StreamPhase::Done) { - effects.push(Effect::AbortStream); - } - - // Cancel all pending tools - let pending = self.ctx.tools.pending_ids(); - for id in &pending { - if let Some(tracked) = self.ctx.tools.get_mut(id) { - if tracked.state == ToolState::Executing { - effects.push(Effect::AbortTool { - tool_id: id.clone(), - }); - } - tracked.state = ToolState::Completed; - } - self.ctx.events.push(ConversationEvent::ToolResult { - tool_use_id: id.clone(), - content: "Error: user cancelled this operation".to_string(), - is_error: true, - remote: false, - content_length: None, - }); - } - - // Commit any partial streaming text - self.commit_streaming_text_as_cancelled(); - - // Add context so the LLM knows what happened - if !pending.is_empty() { - self.ctx.events.push(ConversationEvent::SystemContext { - content: "The user cancelled the previous generation. Tool calls that were in progress have been aborted.".to_string(), - }); - } - - // Clear timeout mappings — stale timeouts will be ignored by the guard - self.ctx.tool_timeout_ids.clear(); - - self.state = AgentState::Idle { confirmation: None }; - effects.push(Effect::Persist); - effects - } - - // ================================================================ - // Error state - // ================================================================ - (AgentState::Error(_), Event::Retry) => { - let messages = self.build_messages(); - let session_id = self.ctx.session_id.clone(); - self.state = AgentState::Turn { - stream: StreamPhase::Connecting, - }; - vec![Effect::StartStream { - messages, - session_id, - }] - } - - (AgentState::Error(_), Event::Cancel) => { - vec![Effect::ExitApp(ExitAction::Cancel)] - } - - // ================================================================ - // Fallthrough — ignore events with no valid transition - // ================================================================ - - // StreamDone can arrive after SuggestCommand (which already moved to Idle). - // We still need to capture the session_id from it. - (_, Event::StreamDone { session_id }) => { - if !session_id.is_empty() { - self.ctx.session_id = Some(session_id); - } - vec![Effect::Persist] - } - - (_, Event::SlashCommand { command, content }) => { - self.handle_slash_command(&command, &content); - vec![] - } - - // RequestSkillLoad during non-idle: still emit the effect - (_, Event::RequestSkillLoad { name, arguments }) => { - vec![Effect::LoadSkill { name, arguments }] - } - - // SkillLoaded during non-idle: queue so it's visible - // in context for the next turn. - ( - _, - Event::SkillLoaded { - name, - arguments, - content, - }, - ) => { - self.ctx.events.push(ConversationEvent::SkillInvocation { - name, - arguments, - content, - }); - vec![] - } - - _ => vec![], - } - } - - // ──────────────────────────────────────────────────────────────────── - // Private helpers - // ──────────────────────────────────────────────────────────────────── - - /// Start a new turn: push user message, build messages, emit StartStream. - fn start_turn(&mut self, msg: String) -> Vec<Effect> { - self.ctx - .events - .push(ConversationEvent::UserMessage { content: msg }); - // Don't clear tools — completed tools persist for rendering history. - // Tools are only cleared on /new (session reset). - self.ctx.current_response.clear(); - self.ctx.current_turn_tool_ids.clear(); - - let messages = self.build_messages(); - let session_id = self.ctx.session_id.clone(); - self.state = AgentState::Turn { - stream: StreamPhase::Connecting, - }; - vec![Effect::StartStream { - messages, - session_id, - }] - } - - /// Build API messages from the conversation event log. - fn build_messages(&self) -> Vec<Value> { - ContextWindowBuilder::with_default_budget().build(&self.ctx.events) - } - - /// Commit accumulated streaming text to the event log. - fn commit_streaming_text(&mut self) { - let text = std::mem::take(&mut self.ctx.current_response); - let trimmed = text.trim_start().to_string(); - if !trimmed.is_empty() { - self.ctx - .events - .push(ConversationEvent::Text { content: trimmed }); - } - } - - /// Commit streaming text with a cancellation suffix. - fn commit_streaming_text_as_cancelled(&mut self) { - let text = std::mem::take(&mut self.ctx.current_response); - let trimmed = text.trim_start().to_string(); - if !trimmed.is_empty() { - self.ctx.events.push(ConversationEvent::Text { - content: format!("{trimmed}\n\n[User cancelled this generation]"), - }); - } - } - - /// Handle a client-side tool call from the stream. - fn handle_stream_tool_call(&mut self, id: String, name: String, input: Value) -> Vec<Effect> { - // Parse the tool call - let tool = match crate::tools::ClientToolCall::try_from((name.as_str(), &input)) { - Ok(tool) => tool, - Err(_) => { - // Unknown tool — push as event but don't track - self.ctx - .events - .push(ConversationEvent::ToolCall { id, name, input }); - return vec![]; - } - }; - - // Capability gating - if let Some(required_cap) = tool.descriptor().capability - && !self.ctx.capabilities.iter().any(|c| c == required_cap) - { - self.ctx.events.push(ConversationEvent::ToolCall { - id: id.clone(), - name, - input, - }); - self.ctx.events.push(ConversationEvent::ToolResult { - tool_use_id: id, - content: format!( - "Tool not enabled: capability '{required_cap}' was not advertised by this client" - ), - is_error: true, - remote: false, - content_length: None, - }); - return vec![]; - } - - // Track the tool and push ToolCall event - let tool_for_effect = tool.clone(); - self.ctx.tools.insert(id.clone(), tool); - self.ctx.current_turn_tool_ids.push(id.clone()); - self.ctx.events.push(ConversationEvent::ToolCall { - id: id.clone(), - name, - input, - }); - - // Transition to Turn if we were Streaming - if let AgentState::Turn { - stream: StreamPhase::Streaming { .. }, - } = &self.state - { - self.state = AgentState::Turn { - stream: StreamPhase::Streaming { status: None }, - }; - } - - vec![Effect::CheckPermission { - tool_id: id, - tool: tool_for_effect, - }] - } - - /// Handle permission resolver result. - fn handle_permission_resolved( - &mut self, - tool_id: String, - response: PermissionResponse, - ) -> Vec<Effect> { - let Some(tracked) = self.ctx.tools.get_mut(&tool_id) else { - return vec![]; - }; - - // If already resolved (e.g. cancelled while permission check was in flight), - // ignore the stale result to avoid re-executing a cancelled tool. - if tracked.is_resolved() { - return vec![]; - } - - match response { - PermissionResponse::Allowed | PermissionResponse::SessionGranted => { - tracked.state = ToolState::Executing; - let tool = tracked.tool.clone(); - self.emit_execute_tool(tool_id, tool) - } - PermissionResponse::Ask => { - tracked.state = ToolState::AwaitingPermission; - vec![] - } - PermissionResponse::Denied => { - tracked.state = ToolState::Denied; - self.ctx.events.push(ConversationEvent::ToolResult { - tool_use_id: tool_id, - content: "Permission denied on the user's system".to_string(), - is_error: true, - remote: false, - content_length: None, - }); - self.check_turn_completion() - } - } - } - - /// Handle user's permission choice from the dialog. - fn handle_permission_choice( - &mut self, - tool_id: String, - choice: PermissionChoice, - ) -> Vec<Effect> { - let Some(tracked) = self.ctx.tools.get_mut(&tool_id) else { - return vec![]; - }; - - if tracked.is_resolved() { - return vec![]; - } - - match choice { - PermissionChoice::Allow => { - tracked.state = ToolState::Executing; - let tool = tracked.tool.clone(); - self.emit_execute_tool(tool_id, tool) - } - PermissionChoice::AllowForSession => { - tracked.state = ToolState::Executing; - let tool = tracked.tool.clone(); - let mut effects = self.emit_execute_tool(tool_id, tool.clone()); - if let Some(path) = tool.resolved_file_path() { - effects.push(Effect::CacheSessionGrant { path }); - } - effects - } - PermissionChoice::AlwaysAllowInProject => { - tracked.state = ToolState::Executing; - let tool = tracked.tool.clone(); - let rule = crate::permissions::rule::Rule { - tool: tool.rule_name().to_string(), - scope: None, // project file provides the scoping - }; - let mut effects = self.emit_execute_tool(tool_id, tool); - effects.push(Effect::WritePermissionRule { - target: PermissionTarget::Project, - rule, - disposition: crate::permissions::writer::RuleDisposition::Allow, - }); - effects - } - PermissionChoice::AlwaysAllow => { - tracked.state = ToolState::Executing; - let tool = tracked.tool.clone(); - let scope = tool - .resolved_file_path() - .map(|p| p.to_string_lossy().to_string()); - let rule = crate::permissions::rule::Rule { - tool: tool.rule_name().to_string(), - scope, - }; - let mut effects = self.emit_execute_tool(tool_id, tool); - effects.push(Effect::WritePermissionRule { - target: PermissionTarget::Global, - rule, - disposition: crate::permissions::writer::RuleDisposition::Allow, - }); - effects - } - PermissionChoice::Deny => { - tracked.state = ToolState::Denied; - self.ctx.events.push(ConversationEvent::ToolResult { - tool_use_id: tool_id, - content: "Permission denied by the user".to_string(), - is_error: true, - remote: false, - content_length: None, - }); - self.check_turn_completion() - } - } - } - - /// Handle tool execution completion. - fn handle_tool_done( - &mut self, - tool_id: String, - outcome: crate::tools::ToolOutcome, - preview: Option<tools::ToolPreviewData>, - ) -> Vec<Effect> { - let Some(tracked) = self.ctx.tools.get_mut(&tool_id) else { - return vec![]; - }; - - // If already completed (e.g. cancelled), ignore stale result - if tracked.is_resolved() { - return vec![]; - } - - tracked.state = ToolState::Completed; - - // If the FSM tagged this tool with an interrupt reason (user or timeout), - // use it; otherwise derive from the outcome's interrupted flag. - let reason = tracked.interrupt_reason.take().or({ - if let crate::tools::ToolOutcome::Structured { - interrupted: true, .. - } = &outcome - { - Some(tools::InterruptReason::User) - } else { - None - } - }); - - // Merge shell preview: the final ToolExecutionDone carries exit_code/interrupted - // but has empty lines (the live lines were accumulated via ToolPreviewUpdate). - // Preserve the accumulated lines and fold in the terminal metadata. - match (&mut tracked.preview, preview) { - ( - Some(tools::ToolPreviewData::Shell { - exit_code, - interrupted, - .. - }), - Some(tools::ToolPreviewData::Shell { - exit_code: final_exit, - .. - }), - ) => { - *exit_code = final_exit; - *interrupted = reason.clone(); - } - (_, Some(mut p)) => { - if let tools::ToolPreviewData::Shell { - ref mut interrupted, - .. - } = p - { - *interrupted = reason.clone(); - } - tracked.preview = Some(p); - } - _ => {} - } - - // Clean up any pending execution timeout for this tool - self.ctx.tool_timeout_ids.retain(|_, tid| tid != &tool_id); - - let content = outcome.format_for_llm(reason.as_ref()); - let is_error = outcome.is_error(); - self.ctx.events.push(ConversationEvent::ToolResult { - tool_use_id: tool_id, - content, - is_error, - remote: false, - content_length: None, - }); - - self.check_turn_completion() - } - - /// Handle a tool execution timeout. Aborts the tool if it's still running. - fn handle_tool_execution_timeout(&mut self, timeout_id: u64, tool_id: String) -> Vec<Effect> { - // Guard: only act if this timeout is still registered (not cleaned up by natural completion) - if self.ctx.tool_timeout_ids.remove(&timeout_id).is_none() { - return vec![]; - } - - let Some(tracked) = self.ctx.tools.get_mut(&tool_id) else { - return vec![]; - }; - - if tracked.is_resolved() { - return vec![]; - } - - // Tag the tool so handle_tool_done can distinguish timeout from user interrupt. - // Only shell tools have entries in tool_timeout_ids, so this is always Shell. - let timeout_secs = match &tracked.tool { - crate::tools::ClientToolCall::Shell(s) => s.timeout_secs, - _ => unreachable!("only shell tools have execution timeouts"), - }; - tracked.interrupt_reason = Some(tools::InterruptReason::Timeout(timeout_secs)); - - // Abort the tool — the driver sends the interrupt signal via oneshot, - // and execute_shell_command_streaming returns a Structured outcome with - // interrupted: true and partial stdout/stderr. This flows through the - // normal ToolExecutionDone path. - vec![Effect::AbortTool { tool_id }] - } - - /// Emit effects to begin executing a tool. For shell commands, also schedules - /// an execution timeout based on the LLM-specified timeout_secs. - fn emit_execute_tool( - &mut self, - tool_id: String, - tool: crate::tools::ClientToolCall, - ) -> Vec<Effect> { - let mut effects = vec![Effect::ExecuteTool { - tool_id: tool_id.clone(), - tool: tool.clone(), - }]; - - if let crate::tools::ClientToolCall::Shell(ref shell) = tool { - let timeout_id = self.ctx.next_timeout_id(); - self.ctx - .tool_timeout_ids - .insert(timeout_id, tool_id.clone()); - effects.push(Effect::ScheduleTimeout { - timeout_id, - duration: std::time::Duration::from_secs(shell.timeout_secs), - kind: TimeoutKind::ToolExecution { tool_id }, - }); - } - - effects - } - - /// Check if the turn is complete (stream done + all tools resolved). - /// If so, either continue the conversation or go Idle. - fn check_turn_completion(&mut self) -> Vec<Effect> { - // Stream must be done - if !matches!( - self.state, - AgentState::Turn { - stream: StreamPhase::Done - } - ) { - return vec![]; - } - - // All current-turn tools must be resolved before the turn can complete - if !self.ctx.tools.all_resolved(&self.ctx.current_turn_tool_ids) { - return vec![]; - } - - // Turn is complete. Check if we need to continue (tool results to send back). - // We continue if this turn had any client tool calls (the LLM needs to see - // the results and respond). - if !self.ctx.current_turn_tool_ids.is_empty() { - // Continue conversation with tool results. - // Don't clear tools — they persist for rendering history. - // Clear turn IDs so the continuation turn doesn't loop. - self.ctx.current_turn_tool_ids.clear(); - let messages = self.build_messages(); - let session_id = self.ctx.session_id.clone(); - self.ctx.current_response.clear(); - self.state = AgentState::Turn { - stream: StreamPhase::Connecting, - }; - vec![Effect::StartStream { - messages, - session_id, - }] - } else { - // No tools — turn is done, go idle - self.state = AgentState::Idle { confirmation: None }; - vec![Effect::Persist] - } - } - - /// Extract the current confirmation state (if any). - fn state_confirmation(&self) -> Option<&PendingConfirmation> { - if let AgentState::Idle { - confirmation: Some(ref c), - } = self.state - { - Some(c) - } else { - None - } - } - - /// Get the most recent suggested command from the conversation. - /// Get the most recent command from the current invocation only. - fn current_command(&self) -> Option<String> { - self.current_invocation_events() - .rev() - .find_map(|e| e.as_command()) - .map(|s| s.to_string()) - } - - /// Check if the most recent command is dangerous. - fn is_current_command_dangerous(&self) -> bool { - self.current_invocation_events() - .rev() - .find_map(|e| { - if let ConversationEvent::ToolCall { name, input, .. } = e - && name == "suggest_command" - { - let danger = input - .get("danger") - .and_then(|v| v.as_str()) - .unwrap_or("low"); - Some(danger == "high" || danger == "medium" || danger == "med") - } else { - None - } - }) - .unwrap_or(false) - } - - /// Events from the current invocation only (from view_start_index onward). - fn current_invocation_events(&self) -> impl DoubleEndedIterator<Item = &ConversationEvent> { - let start = self.ctx.view_start_index.min(self.ctx.events.len()); - self.ctx.events[start..].iter() - } - - /// Handle a slash command by pushing an OOB event. - fn handle_slash_command(&mut self, command: &str, content: &str) { - self.ctx.events.push(ConversationEvent::OutOfBandOutput { - name: "System".to_string(), - command: Some(command.to_string()), - content: content.to_string(), - }); - } -} |
