diff options
| author | Benedikt Peetz <benedikt.peetz@b-peetz.de> | 2026-06-10 22:01:45 +0200 |
|---|---|---|
| committer | Benedikt Peetz <benedikt.peetz@b-peetz.de> | 2026-06-10 22:01:45 +0200 |
| commit | 5e31a81cd2207f053b8cd8ad84ebe2a2f691b29d (patch) | |
| tree | 5d76811ab0d693c01fa472d41aa2ceaf3bd0b415 /crates/atuin-ai/src/fsm | |
| parent | chore: Remove unneeded files (diff) | |
| download | atuin-5e31a81cd2207f053b8cd8ad84ebe2a2f691b29d.zip | |
chore: Remove some unused rust code
Diffstat (limited to 'crates/atuin-ai/src/fsm')
| -rw-r--r-- | crates/atuin-ai/src/fsm/effects.rs | 99 | ||||
| -rw-r--r-- | crates/atuin-ai/src/fsm/events.rs | 140 | ||||
| -rw-r--r-- | crates/atuin-ai/src/fsm/mod.rs | 1103 | ||||
| -rw-r--r-- | crates/atuin-ai/src/fsm/tests.rs | 890 | ||||
| -rw-r--r-- | crates/atuin-ai/src/fsm/tools.rs | 178 |
5 files changed, 0 insertions, 2410 deletions
diff --git a/crates/atuin-ai/src/fsm/effects.rs b/crates/atuin-ai/src/fsm/effects.rs deleted file mode 100644 index adc9628e..00000000 --- a/crates/atuin-ai/src/fsm/effects.rs +++ /dev/null @@ -1,99 +0,0 @@ -//! Effects (outputs) from the agent FSM. -//! -//! The FSM returns these as data; the driver is responsible for executing them. - -use std::path::PathBuf; -use std::time::Duration; - -use serde_json::Value; - -use crate::permissions::rule::Rule; -use crate::permissions::writer::RuleDisposition; -use crate::tools::ClientToolCall; - -/// Where to write a permission rule. -#[derive(Debug, Clone, PartialEq, Eq)] -pub(crate) enum PermissionTarget { - /// Project-level: `<git_root_or_cwd>/.atuin/permissions.ai.toml` - Project, - /// Global: `~/.config/atuin/permissions.ai.toml` - Global, -} - -/// Side effects the driver should execute after a state transition. -#[derive(Debug, Clone)] -pub(crate) enum Effect { - // ─── Network ──────────────────────────────────────────────── - /// Start a new streaming request to the server. - StartStream { - messages: Vec<Value>, - session_id: Option<String>, - }, - /// Abort the active stream connection. - AbortStream, - - // ─── Tool orchestration ───────────────────────────────────── - /// Run the permission resolver for a tool call. - CheckPermission { - tool_id: String, - tool: ClientToolCall, - }, - /// Execute a tool (file read, edit, write, shell, history search). - ExecuteTool { - tool_id: String, - tool: ClientToolCall, - }, - /// Kill a running tool (send interrupt to shell command). - AbortTool { tool_id: String }, - /// Load a skill's content asynchronously (read + interpolate). - LoadSkill { - name: String, - arguments: Option<String>, - }, - - // ─── Persistence ──────────────────────────────────────────── - /// Persist current conversation state to disk. - Persist, - /// Write a permanent permission rule to disk. - WritePermissionRule { - target: PermissionTarget, - rule: Rule, - disposition: RuleDisposition, - }, - /// Cache a session-scoped file permission grant. - CacheSessionGrant { path: PathBuf }, - /// Archive current session and start fresh (IO only — state already updated by FSM). - ArchiveSession, - - // ─── Timers ───────────────────────────────────────────────── - /// Schedule a timer that fires an event after the given delay. - ScheduleTimeout { - timeout_id: u64, - duration: Duration, - kind: TimeoutKind, - }, - - // ─── Exit ─────────────────────────────────────────────────── - /// Exit the application with the given action. - ExitApp(ExitAction), -} - -/// What kind of timeout was scheduled. -#[derive(Debug, Clone, PartialEq, Eq)] -pub(crate) enum TimeoutKind { - /// Dangerous command confirmation dialog auto-dismiss. - Confirmation, - /// Shell tool execution timeout — abort the tool if it's still running. - ToolExecution { tool_id: String }, -} - -/// What to do when exiting the TUI. -#[derive(Debug, Clone, PartialEq, Eq)] -pub(crate) enum ExitAction { - /// Run the suggested command. - Execute(String), - /// Insert the command into the shell without running. - Insert(String), - /// Exit without action. - Cancel, -} diff --git a/crates/atuin-ai/src/fsm/events.rs b/crates/atuin-ai/src/fsm/events.rs deleted file mode 100644 index e591db41..00000000 --- a/crates/atuin-ai/src/fsm/events.rs +++ /dev/null @@ -1,140 +0,0 @@ -//! Events (inputs) to the agent FSM. - -use serde_json::Value; - -use crate::tools::ToolOutcome; - -/// Events that drive state transitions in the agent FSM. -#[derive(Debug, Clone)] -pub(crate) enum Event { - // ─── User actions ─────────────────────────────────────────── - /// User submitted a message from the input box. - UserSubmit(String), - /// User pressed Esc or equivalent cancel action. - Cancel, - /// User pressed Enter to execute the suggested command. - ExecuteCommand, - /// User pressed Tab to insert the suggested command. - InsertCommand, - /// User chose to retry after an error. - Retry, - /// User interrupted executing tools (Ctrl+C / Esc during shell execution). - InterruptTools, - - // ─── Stream lifecycle ─────────────────────────────────────── - /// Stream connection established, first frame received. - StreamStarted, - /// Received a chunk of streamed text content. - StreamChunk(String), - /// Stream delivered a client-side tool call. - StreamToolCall { - id: String, - name: String, - input: Value, - }, - /// Stream delivered a server-side tool result (executed remotely). - StreamServerToolResult { - tool_use_id: String, - content: String, - is_error: bool, - remote: bool, - content_length: Option<usize>, - }, - /// Stream status changed (e.g. "thinking", "searching"). - StreamStatusChanged(String), - /// Stream ended normally. - StreamDone { session_id: String }, - /// Stream encountered an error. - StreamError(String), - - // ─── Suggest command (terminal tool call) ─────────────────── - /// The suggest_command tool call acts as a stream terminal event. - /// This is the server signaling "turn complete, here's the command." - SuggestCommand { id: String, input: Value }, - - // ─── Tool lifecycle ───────────────────────────────────────── - /// Permission resolver completed for a tool. - PermissionResolved { - tool_id: String, - response: PermissionResponse, - }, - /// User made a permission choice via the dialog. - PermissionUserChoice { - tool_id: String, - choice: PermissionChoice, - }, - /// Tool execution completed. - ToolExecutionDone { - tool_id: String, - outcome: ToolOutcome, - /// Preview data computed by the driver (diff, content preview, final shell state). - preview: Option<super::tools::ToolPreviewData>, - }, - /// Live preview update for an executing shell command. - ToolPreviewUpdate { - tool_id: String, - lines: Vec<String>, - exit_code: Option<i32>, - }, - - // ─── Timers ───────────────────────────────────────────────── - /// Confirmation timeout expired. - ConfirmationTimeout { timeout_id: u64 }, - /// Shell tool execution timeout expired. - ToolExecutionTimeout { timeout_id: u64, tool_id: String }, - - // ─── Session management ───────────────────────────────────── - /// User ran /new to start a fresh session. - NewSession, - - // ─── Slash commands ───────────────────────────────────────── - /// User submitted a slash command (other than /new). - /// The driver resolves known commands (like /help) and passes the - /// rendered content; the FSM just pushes an OOB event. - SlashCommand { command: String, content: String }, - - // ─── Skills ──────────────────────────────────────────────── - /// User invoked a skill via /skill-name. FSM emits a LoadSkill - /// effect; the driver loads the content asynchronously and sends - /// SkillLoaded when ready. - RequestSkillLoad { - name: String, - arguments: Option<String>, - }, - /// A skill's content has been loaded and interpolated. - /// Pushes skill content as OOB context and starts a turn so the - /// LLM sees the skill and acts on it. - SkillLoaded { - name: String, - arguments: Option<String>, - content: String, - }, -} - -/// Result of the permission resolver check. -#[derive(Debug, Clone, PartialEq, Eq)] -pub(crate) enum PermissionResponse { - /// Rule allows this tool call — execute immediately. - Allowed, - /// Rule denies this tool call — reject with error. - Denied, - /// No matching rule — ask the user. - Ask, - /// Session-scoped grant exists — execute immediately (bypass resolver). - SessionGranted, -} - -/// User's choice from the permission dialog. -#[derive(Debug, Clone, PartialEq, Eq)] -pub(crate) enum PermissionChoice { - /// Allow this one time. - Allow, - /// Allow this file for the remainder of the session. - AllowForSession, - /// Always allow in this project (writes to project permissions file). - AlwaysAllowInProject, - /// Always allow globally (writes to global permissions file, scoped to file). - AlwaysAllow, - /// Deny this tool call. - Deny, -} diff --git a/crates/atuin-ai/src/fsm/mod.rs b/crates/atuin-ai/src/fsm/mod.rs deleted file mode 100644 index 3d72a3ae..00000000 --- a/crates/atuin-ai/src/fsm/mod.rs +++ /dev/null @@ -1,1103 +0,0 @@ -//! Agent conversation FSM. -//! -//! Pure state machine that returns effects as data. -//! The driver is responsible for executing effects and feeding events back. -//! -//! The FSM owns the conversation event log and tool lifecycle state. -//! It never performs IO directly. - -pub(crate) mod effects; -pub(crate) mod events; -pub(crate) mod tools; - -#[cfg(test)] -mod tests; - -use std::collections::HashMap; - -use serde_json::Value; - -use crate::context_window::ContextWindowBuilder; -use crate::tui::state::ConversationEvent; - -use effects::{Effect, ExitAction, PermissionTarget, TimeoutKind}; -use events::{Event, PermissionChoice, PermissionResponse}; -use tools::{ToolManager, ToolState}; - -// ============================================================================ -// State -// ============================================================================ - -/// The discrete states of the agent FSM. -#[derive(Debug, Clone, PartialEq, Eq)] -pub(crate) enum AgentState { - /// Waiting for user input. - Idle { - confirmation: Option<PendingConfirmation>, - }, - - /// A conversation turn is in progress. - Turn { stream: StreamPhase }, - - /// Unrecoverable error. User can retry or exit. - Error(String), -} - -/// Stream connection lifecycle within a Turn. -#[derive(Debug, Clone, PartialEq, Eq)] -pub(crate) enum StreamPhase { - /// Request sent, awaiting first stream frame. - Connecting, - /// Actively receiving streamed response. - Streaming { status: Option<StreamingStatus> }, - /// Stream connection has ended (Done received). - Done, -} - -/// Streaming status indicators from server. -#[derive(Debug, Clone, PartialEq, Eq)] -pub(crate) enum StreamingStatus { - Processing, - Searching, - Thinking, - WaitingForTools, -} - -impl StreamingStatus { - pub(crate) fn from_str(s: &str) -> Self { - match s { - "processing" => Self::Processing, - "searching" => Self::Searching, - "waiting_for_tools" => Self::WaitingForTools, - _ => Self::Thinking, - } - } -} - -/// Pending dangerous command confirmation state. -#[derive(Debug, Clone, PartialEq, Eq)] -pub(crate) struct PendingConfirmation { - pub command: String, - pub timeout_id: u64, -} - -// ============================================================================ -// Context -// ============================================================================ - -/// Shared context owned by the FSM. -#[derive(Debug, Clone)] -pub(crate) struct AgentContext { - /// The full conversation event log (source of truth for API + persistence). - pub events: Vec<ConversationEvent>, - /// Server-assigned session ID. - pub session_id: Option<String>, - /// Accumulated text from current stream (committed to events on tool call or stream end). - pub current_response: String, - /// Per-tool lifecycle state and cached render data. - /// Tools persist across turns for rendering history. - pub tools: ToolManager, - /// Tool IDs that belong to the current turn. Cleared on continuation start. - /// Used to determine whether a turn needs continuation (has unprocessed results). - current_turn_tool_ids: Vec<String>, - /// Maps timeout_id → tool_id for active tool execution timeouts. - /// Cleaned up when a tool completes naturally, so stale timeouts are ignored. - tool_timeout_ids: HashMap<u64, String>, - /// Counter for generating unique timeout IDs. - next_timeout_id: u64, - /// Capabilities advertised to the server. - pub capabilities: Vec<String>, - /// Unique invocation ID for this CLI invocation. - pub invocation_id: String, - - // ─── View state (owned by FSM for atomic transitions) ─────── - /// Index into events where the current TUI invocation starts. - /// Events before this are context for the API but not rendered. - pub view_start_index: usize, - /// Whether this session was resumed from a prior invocation. - pub is_resumed: bool, - /// Time of the last event from a previous invocation. - pub last_event_time: Option<chrono::DateTime<chrono::Utc>>, - /// Events from archived sessions (/new) still rendered on screen. - pub archived_events: Vec<ConversationEvent>, -} - -impl AgentContext { - fn next_timeout_id(&mut self) -> u64 { - let id = self.next_timeout_id; - self.next_timeout_id += 1; - id - } -} - -// ============================================================================ -// The Agent FSM -// ============================================================================ - -/// The agent finite state machine. -/// -/// Pure state machine — `handle()` takes an event, mutates internal state, -/// and returns effects as data for the driver to execute. -#[derive(Debug, Clone)] -pub(crate) struct AgentFsm { - pub state: AgentState, - pub ctx: AgentContext, -} - -impl AgentFsm { - /// Create a new FSM in Idle state. - pub fn new(capabilities: Vec<String>, invocation_id: String) -> Self { - Self { - state: AgentState::Idle { confirmation: None }, - ctx: AgentContext { - events: Vec::new(), - session_id: None, - current_response: String::new(), - tools: ToolManager::new(), - current_turn_tool_ids: Vec::new(), - tool_timeout_ids: HashMap::new(), - next_timeout_id: 0, - capabilities, - invocation_id, - view_start_index: 0, - is_resumed: false, - last_event_time: None, - archived_events: Vec::new(), - }, - } - } - - /// Create an FSM from saved session state (for resume). - pub fn from_session( - events: Vec<ConversationEvent>, - session_id: Option<String>, - capabilities: Vec<String>, - invocation_id: String, - view_start_index: usize, - is_resumed: bool, - last_event_time: Option<chrono::DateTime<chrono::Utc>>, - ) -> Self { - Self { - state: AgentState::Idle { confirmation: None }, - ctx: AgentContext { - events, - session_id, - current_response: String::new(), - tools: ToolManager::new(), - current_turn_tool_ids: Vec::new(), - tool_timeout_ids: HashMap::new(), - next_timeout_id: 0, - capabilities, - invocation_id, - view_start_index, - is_resumed, - last_event_time, - archived_events: Vec::new(), - }, - } - } - - /// Handle an event, returning effects to execute. - pub fn handle(&mut self, event: Event) -> Vec<Effect> { - match (&self.state, event) { - // ================================================================ - // Idle state - // ================================================================ - (AgentState::Idle { confirmation: None }, Event::UserSubmit(msg)) => { - self.start_turn(msg) - } - - ( - AgentState::Idle { - confirmation: Some(_), - }, - Event::UserSubmit(msg), - ) => self.start_turn(msg), - - (AgentState::Idle { confirmation: None }, Event::ExecuteCommand) => { - let cmd = self.current_command(); - let Some(cmd) = cmd else { - // No command suggested — exit - return vec![Effect::ExitApp(ExitAction::Cancel)]; - }; - if self.is_current_command_dangerous() { - let timeout_id = self.ctx.next_timeout_id(); - self.state = AgentState::Idle { - confirmation: Some(PendingConfirmation { - command: cmd, - timeout_id, - }), - }; - vec![Effect::ScheduleTimeout { - timeout_id, - duration: std::time::Duration::from_secs(5), - kind: TimeoutKind::Confirmation, - }] - } else { - vec![Effect::ExitApp(ExitAction::Execute(cmd))] - } - } - - ( - AgentState::Idle { - confirmation: Some(_), - }, - Event::ExecuteCommand, - ) => { - let confirm = self.state_confirmation().unwrap().clone(); - self.state = AgentState::Idle { confirmation: None }; - vec![Effect::ExitApp(ExitAction::Execute(confirm.command))] - } - - (AgentState::Idle { .. }, Event::InsertCommand) => { - let cmd = self.current_command(); - match cmd { - Some(cmd) => vec![Effect::ExitApp(ExitAction::Insert(cmd))], - None => vec![], - } - } - - ( - AgentState::Idle { - confirmation: Some(_), - }, - Event::Cancel, - ) => { - self.state = AgentState::Idle { confirmation: None }; - vec![] - } - - (AgentState::Idle { confirmation: None }, Event::Cancel) => { - vec![Effect::ExitApp(ExitAction::Cancel)] - } - - (AgentState::Idle { .. }, Event::ConfirmationTimeout { timeout_id }) => { - if self - .state_confirmation() - .is_some_and(|c| c.timeout_id == timeout_id) - { - self.state = AgentState::Idle { confirmation: None }; - } - vec![] - } - - (AgentState::Idle { .. }, Event::NewSession) => { - // Archive visible events so they remain on screen but aren't - // sent to the API. Tools persist for rendering. - let visible = self.ctx.events[self.ctx.view_start_index..].to_vec(); - self.ctx.archived_events.extend(visible); - - self.ctx.events.clear(); - self.ctx.session_id = None; - self.ctx.current_turn_tool_ids.clear(); - self.ctx.view_start_index = 0; - self.ctx.is_resumed = false; - - // Add OOB indicator for the new session - self.ctx.events.push(ConversationEvent::OutOfBandOutput { - name: "System".to_string(), - command: Some("/new".to_string()), - content: "Started a new session.".to_string(), - }); - - self.state = AgentState::Idle { confirmation: None }; - vec![Effect::ArchiveSession, Effect::Persist] - } - - (AgentState::Idle { .. }, Event::SlashCommand { command, content }) => { - self.handle_slash_command(&command, &content); - vec![] - } - - ( - AgentState::Idle { .. }, - Event::SkillLoaded { - name, - arguments, - content, - }, - ) => { - self.ctx.events.push(ConversationEvent::SkillInvocation { - name, - arguments, - content, - }); - self.ctx.current_response.clear(); - self.ctx.current_turn_tool_ids.clear(); - - let messages = self.build_messages(); - let session_id = self.ctx.session_id.clone(); - self.state = AgentState::Turn { - stream: StreamPhase::Connecting, - }; - vec![Effect::StartStream { - messages, - session_id, - }] - } - - // ================================================================ - // Turn — stream lifecycle - // ================================================================ - ( - AgentState::Turn { - stream: StreamPhase::Connecting, - }, - Event::StreamStarted, - ) => { - self.state = AgentState::Turn { - stream: StreamPhase::Streaming { status: None }, - }; - vec![] - } - - ( - AgentState::Turn { - stream: StreamPhase::Connecting, - }, - Event::StreamError(e), - ) => { - self.state = AgentState::Error(e); - vec![] - } - - ( - AgentState::Turn { - stream: StreamPhase::Streaming { .. }, - }, - Event::StreamChunk(text), - ) => { - self.ctx.current_response.push_str(&text); - vec![] - } - - ( - AgentState::Turn { - stream: StreamPhase::Streaming { .. }, - }, - Event::StreamStatusChanged(status), - ) => { - self.state = AgentState::Turn { - stream: StreamPhase::Streaming { - status: Some(StreamingStatus::from_str(&status)), - }, - }; - vec![] - } - - (AgentState::Turn { .. }, Event::StreamToolCall { id, name, input }) => { - self.commit_streaming_text(); - self.handle_stream_tool_call(id, name, input) - } - - (AgentState::Turn { .. }, Event::SuggestCommand { id, input }) => { - self.commit_streaming_text(); - // Push the suggest_command as a ToolCall event (protocol requirement) - self.ctx.events.push(ConversationEvent::ToolCall { - id, - name: "suggest_command".to_string(), - input, - }); - self.state = AgentState::Idle { confirmation: None }; - vec![Effect::Persist] - } - - ( - AgentState::Turn { - stream: StreamPhase::Streaming { .. }, - }, - Event::StreamServerToolResult { - tool_use_id, - content, - is_error, - remote, - content_length, - }, - ) => { - self.ctx.events.push(ConversationEvent::ToolResult { - tool_use_id, - content, - is_error, - remote, - content_length, - }); - vec![] - } - - (AgentState::Turn { .. }, Event::StreamDone { session_id }) => { - self.commit_streaming_text(); - if !session_id.is_empty() { - self.ctx.session_id = Some(session_id); - } - self.state = AgentState::Turn { - stream: StreamPhase::Done, - }; - self.check_turn_completion() - } - - ( - AgentState::Turn { - stream: StreamPhase::Streaming { .. }, - }, - Event::StreamError(e), - ) => { - // Abort any executing tools on stream error - let abort_effects: Vec<_> = self - .ctx - .tools - .executing_ids() - .into_iter() - .map(|tool_id| Effect::AbortTool { tool_id }) - .collect(); - self.ctx.tool_timeout_ids.clear(); - self.state = AgentState::Error(e); - abort_effects - } - - // ================================================================ - // Turn — tool lifecycle (any stream phase) - // ================================================================ - (AgentState::Turn { .. }, Event::PermissionResolved { tool_id, response }) => { - self.handle_permission_resolved(tool_id, response) - } - - (AgentState::Turn { .. }, Event::PermissionUserChoice { tool_id, choice }) => { - self.handle_permission_choice(tool_id, choice) - } - - ( - AgentState::Turn { .. }, - Event::ToolExecutionDone { - tool_id, - outcome, - preview, - }, - ) => self.handle_tool_done(tool_id, outcome, preview), - - ( - AgentState::Turn { .. }, - Event::ToolPreviewUpdate { - tool_id, - lines, - exit_code, - }, - ) => { - if let Some(tracked) = self.ctx.tools.get_mut(&tool_id) { - if tracked.is_resolved() { - // Tool already completed — a late preview update raced with - // ToolExecutionDone. Update lines (they may carry the final - // screen) but preserve the finalized exit_code/interrupted. - if let Some(tools::ToolPreviewData::Shell { - lines: existing_lines, - .. - }) = &mut tracked.preview - { - *existing_lines = lines; - } - } else { - tracked.preview = Some(tools::ToolPreviewData::Shell { - lines, - exit_code, - interrupted: None, - }); - } - } - vec![] - } - - (AgentState::Turn { .. }, Event::InterruptTools) => { - let ids = self.ctx.tools.executing_ids(); - for id in &ids { - if let Some(tracked) = self.ctx.tools.get_mut(id) { - tracked.interrupt_reason = Some(tools::InterruptReason::User); - } - // Clear any pending execution timeout for this tool - self.ctx.tool_timeout_ids.retain(|_, tid| tid != id); - } - ids.into_iter() - .map(|tool_id| Effect::AbortTool { tool_id }) - .collect() - } - - ( - AgentState::Turn { .. }, - Event::ToolExecutionTimeout { - timeout_id, - tool_id, - }, - ) => self.handle_tool_execution_timeout(timeout_id, tool_id), - - // ─── Cancel during Turn ───────────────────────────────────── - (AgentState::Turn { stream }, Event::Cancel) => { - let mut effects = Vec::new(); - - // Abort stream if still active - if !matches!(stream, StreamPhase::Done) { - effects.push(Effect::AbortStream); - } - - // Cancel all pending tools - let pending = self.ctx.tools.pending_ids(); - for id in &pending { - if let Some(tracked) = self.ctx.tools.get_mut(id) { - if tracked.state == ToolState::Executing { - effects.push(Effect::AbortTool { - tool_id: id.clone(), - }); - } - tracked.state = ToolState::Completed; - } - self.ctx.events.push(ConversationEvent::ToolResult { - tool_use_id: id.clone(), - content: "Error: user cancelled this operation".to_string(), - is_error: true, - remote: false, - content_length: None, - }); - } - - // Commit any partial streaming text - self.commit_streaming_text_as_cancelled(); - - // Add context so the LLM knows what happened - if !pending.is_empty() { - self.ctx.events.push(ConversationEvent::SystemContext { - content: "The user cancelled the previous generation. Tool calls that were in progress have been aborted.".to_string(), - }); - } - - // Clear timeout mappings — stale timeouts will be ignored by the guard - self.ctx.tool_timeout_ids.clear(); - - self.state = AgentState::Idle { confirmation: None }; - effects.push(Effect::Persist); - effects - } - - // ================================================================ - // Error state - // ================================================================ - (AgentState::Error(_), Event::Retry) => { - let messages = self.build_messages(); - let session_id = self.ctx.session_id.clone(); - self.state = AgentState::Turn { - stream: StreamPhase::Connecting, - }; - vec![Effect::StartStream { - messages, - session_id, - }] - } - - (AgentState::Error(_), Event::Cancel) => { - vec![Effect::ExitApp(ExitAction::Cancel)] - } - - // ================================================================ - // Fallthrough — ignore events with no valid transition - // ================================================================ - - // StreamDone can arrive after SuggestCommand (which already moved to Idle). - // We still need to capture the session_id from it. - (_, Event::StreamDone { session_id }) => { - if !session_id.is_empty() { - self.ctx.session_id = Some(session_id); - } - vec![Effect::Persist] - } - - (_, Event::SlashCommand { command, content }) => { - self.handle_slash_command(&command, &content); - vec![] - } - - // RequestSkillLoad during non-idle: still emit the effect - (_, Event::RequestSkillLoad { name, arguments }) => { - vec![Effect::LoadSkill { name, arguments }] - } - - // SkillLoaded during non-idle: queue so it's visible - // in context for the next turn. - ( - _, - Event::SkillLoaded { - name, - arguments, - content, - }, - ) => { - self.ctx.events.push(ConversationEvent::SkillInvocation { - name, - arguments, - content, - }); - vec![] - } - - _ => vec![], - } - } - - // ──────────────────────────────────────────────────────────────────── - // Private helpers - // ──────────────────────────────────────────────────────────────────── - - /// Start a new turn: push user message, build messages, emit StartStream. - fn start_turn(&mut self, msg: String) -> Vec<Effect> { - self.ctx - .events - .push(ConversationEvent::UserMessage { content: msg }); - // Don't clear tools — completed tools persist for rendering history. - // Tools are only cleared on /new (session reset). - self.ctx.current_response.clear(); - self.ctx.current_turn_tool_ids.clear(); - - let messages = self.build_messages(); - let session_id = self.ctx.session_id.clone(); - self.state = AgentState::Turn { - stream: StreamPhase::Connecting, - }; - vec![Effect::StartStream { - messages, - session_id, - }] - } - - /// Build API messages from the conversation event log. - fn build_messages(&self) -> Vec<Value> { - ContextWindowBuilder::with_default_budget().build(&self.ctx.events) - } - - /// Commit accumulated streaming text to the event log. - fn commit_streaming_text(&mut self) { - let text = std::mem::take(&mut self.ctx.current_response); - let trimmed = text.trim_start().to_string(); - if !trimmed.is_empty() { - self.ctx - .events - .push(ConversationEvent::Text { content: trimmed }); - } - } - - /// Commit streaming text with a cancellation suffix. - fn commit_streaming_text_as_cancelled(&mut self) { - let text = std::mem::take(&mut self.ctx.current_response); - let trimmed = text.trim_start().to_string(); - if !trimmed.is_empty() { - self.ctx.events.push(ConversationEvent::Text { - content: format!("{trimmed}\n\n[User cancelled this generation]"), - }); - } - } - - /// Handle a client-side tool call from the stream. - fn handle_stream_tool_call(&mut self, id: String, name: String, input: Value) -> Vec<Effect> { - // Parse the tool call - let tool = match crate::tools::ClientToolCall::try_from((name.as_str(), &input)) { - Ok(tool) => tool, - Err(_) => { - // Unknown tool — push as event but don't track - self.ctx - .events - .push(ConversationEvent::ToolCall { id, name, input }); - return vec![]; - } - }; - - // Capability gating - if let Some(required_cap) = tool.descriptor().capability - && !self.ctx.capabilities.iter().any(|c| c == required_cap) - { - self.ctx.events.push(ConversationEvent::ToolCall { - id: id.clone(), - name, - input, - }); - self.ctx.events.push(ConversationEvent::ToolResult { - tool_use_id: id, - content: format!( - "Tool not enabled: capability '{required_cap}' was not advertised by this client" - ), - is_error: true, - remote: false, - content_length: None, - }); - return vec![]; - } - - // Track the tool and push ToolCall event - let tool_for_effect = tool.clone(); - self.ctx.tools.insert(id.clone(), tool); - self.ctx.current_turn_tool_ids.push(id.clone()); - self.ctx.events.push(ConversationEvent::ToolCall { - id: id.clone(), - name, - input, - }); - - // Transition to Turn if we were Streaming - if let AgentState::Turn { - stream: StreamPhase::Streaming { .. }, - } = &self.state - { - self.state = AgentState::Turn { - stream: StreamPhase::Streaming { status: None }, - }; - } - - vec![Effect::CheckPermission { - tool_id: id, - tool: tool_for_effect, - }] - } - - /// Handle permission resolver result. - fn handle_permission_resolved( - &mut self, - tool_id: String, - response: PermissionResponse, - ) -> Vec<Effect> { - let Some(tracked) = self.ctx.tools.get_mut(&tool_id) else { - return vec![]; - }; - - // If already resolved (e.g. cancelled while permission check was in flight), - // ignore the stale result to avoid re-executing a cancelled tool. - if tracked.is_resolved() { - return vec![]; - } - - match response { - PermissionResponse::Allowed | PermissionResponse::SessionGranted => { - tracked.state = ToolState::Executing; - let tool = tracked.tool.clone(); - self.emit_execute_tool(tool_id, tool) - } - PermissionResponse::Ask => { - tracked.state = ToolState::AwaitingPermission; - vec![] - } - PermissionResponse::Denied => { - tracked.state = ToolState::Denied; - self.ctx.events.push(ConversationEvent::ToolResult { - tool_use_id: tool_id, - content: "Permission denied on the user's system".to_string(), - is_error: true, - remote: false, - content_length: None, - }); - self.check_turn_completion() - } - } - } - - /// Handle user's permission choice from the dialog. - fn handle_permission_choice( - &mut self, - tool_id: String, - choice: PermissionChoice, - ) -> Vec<Effect> { - let Some(tracked) = self.ctx.tools.get_mut(&tool_id) else { - return vec![]; - }; - - if tracked.is_resolved() { - return vec![]; - } - - match choice { - PermissionChoice::Allow => { - tracked.state = ToolState::Executing; - let tool = tracked.tool.clone(); - self.emit_execute_tool(tool_id, tool) - } - PermissionChoice::AllowForSession => { - tracked.state = ToolState::Executing; - let tool = tracked.tool.clone(); - let mut effects = self.emit_execute_tool(tool_id, tool.clone()); - if let Some(path) = tool.resolved_file_path() { - effects.push(Effect::CacheSessionGrant { path }); - } - effects - } - PermissionChoice::AlwaysAllowInProject => { - tracked.state = ToolState::Executing; - let tool = tracked.tool.clone(); - let rule = crate::permissions::rule::Rule { - tool: tool.rule_name().to_string(), - scope: None, // project file provides the scoping - }; - let mut effects = self.emit_execute_tool(tool_id, tool); - effects.push(Effect::WritePermissionRule { - target: PermissionTarget::Project, - rule, - disposition: crate::permissions::writer::RuleDisposition::Allow, - }); - effects - } - PermissionChoice::AlwaysAllow => { - tracked.state = ToolState::Executing; - let tool = tracked.tool.clone(); - let scope = tool - .resolved_file_path() - .map(|p| p.to_string_lossy().to_string()); - let rule = crate::permissions::rule::Rule { - tool: tool.rule_name().to_string(), - scope, - }; - let mut effects = self.emit_execute_tool(tool_id, tool); - effects.push(Effect::WritePermissionRule { - target: PermissionTarget::Global, - rule, - disposition: crate::permissions::writer::RuleDisposition::Allow, - }); - effects - } - PermissionChoice::Deny => { - tracked.state = ToolState::Denied; - self.ctx.events.push(ConversationEvent::ToolResult { - tool_use_id: tool_id, - content: "Permission denied by the user".to_string(), - is_error: true, - remote: false, - content_length: None, - }); - self.check_turn_completion() - } - } - } - - /// Handle tool execution completion. - fn handle_tool_done( - &mut self, - tool_id: String, - outcome: crate::tools::ToolOutcome, - preview: Option<tools::ToolPreviewData>, - ) -> Vec<Effect> { - let Some(tracked) = self.ctx.tools.get_mut(&tool_id) else { - return vec![]; - }; - - // If already completed (e.g. cancelled), ignore stale result - if tracked.is_resolved() { - return vec![]; - } - - tracked.state = ToolState::Completed; - - // If the FSM tagged this tool with an interrupt reason (user or timeout), - // use it; otherwise derive from the outcome's interrupted flag. - let reason = tracked.interrupt_reason.take().or({ - if let crate::tools::ToolOutcome::Structured { - interrupted: true, .. - } = &outcome - { - Some(tools::InterruptReason::User) - } else { - None - } - }); - - // Merge shell preview: the final ToolExecutionDone carries exit_code/interrupted - // but has empty lines (the live lines were accumulated via ToolPreviewUpdate). - // Preserve the accumulated lines and fold in the terminal metadata. - match (&mut tracked.preview, preview) { - ( - Some(tools::ToolPreviewData::Shell { - exit_code, - interrupted, - .. - }), - Some(tools::ToolPreviewData::Shell { - exit_code: final_exit, - .. - }), - ) => { - *exit_code = final_exit; - *interrupted = reason.clone(); - } - (_, Some(mut p)) => { - if let tools::ToolPreviewData::Shell { - ref mut interrupted, - .. - } = p - { - *interrupted = reason.clone(); - } - tracked.preview = Some(p); - } - _ => {} - } - - // Clean up any pending execution timeout for this tool - self.ctx.tool_timeout_ids.retain(|_, tid| tid != &tool_id); - - let content = outcome.format_for_llm(reason.as_ref()); - let is_error = outcome.is_error(); - self.ctx.events.push(ConversationEvent::ToolResult { - tool_use_id: tool_id, - content, - is_error, - remote: false, - content_length: None, - }); - - self.check_turn_completion() - } - - /// Handle a tool execution timeout. Aborts the tool if it's still running. - fn handle_tool_execution_timeout(&mut self, timeout_id: u64, tool_id: String) -> Vec<Effect> { - // Guard: only act if this timeout is still registered (not cleaned up by natural completion) - if self.ctx.tool_timeout_ids.remove(&timeout_id).is_none() { - return vec![]; - } - - let Some(tracked) = self.ctx.tools.get_mut(&tool_id) else { - return vec![]; - }; - - if tracked.is_resolved() { - return vec![]; - } - - // Tag the tool so handle_tool_done can distinguish timeout from user interrupt. - // Only shell tools have entries in tool_timeout_ids, so this is always Shell. - let timeout_secs = match &tracked.tool { - crate::tools::ClientToolCall::Shell(s) => s.timeout_secs, - _ => unreachable!("only shell tools have execution timeouts"), - }; - tracked.interrupt_reason = Some(tools::InterruptReason::Timeout(timeout_secs)); - - // Abort the tool — the driver sends the interrupt signal via oneshot, - // and execute_shell_command_streaming returns a Structured outcome with - // interrupted: true and partial stdout/stderr. This flows through the - // normal ToolExecutionDone path. - vec![Effect::AbortTool { tool_id }] - } - - /// Emit effects to begin executing a tool. For shell commands, also schedules - /// an execution timeout based on the LLM-specified timeout_secs. - fn emit_execute_tool( - &mut self, - tool_id: String, - tool: crate::tools::ClientToolCall, - ) -> Vec<Effect> { - let mut effects = vec![Effect::ExecuteTool { - tool_id: tool_id.clone(), - tool: tool.clone(), - }]; - - if let crate::tools::ClientToolCall::Shell(ref shell) = tool { - let timeout_id = self.ctx.next_timeout_id(); - self.ctx - .tool_timeout_ids - .insert(timeout_id, tool_id.clone()); - effects.push(Effect::ScheduleTimeout { - timeout_id, - duration: std::time::Duration::from_secs(shell.timeout_secs), - kind: TimeoutKind::ToolExecution { tool_id }, - }); - } - - effects - } - - /// Check if the turn is complete (stream done + all tools resolved). - /// If so, either continue the conversation or go Idle. - fn check_turn_completion(&mut self) -> Vec<Effect> { - // Stream must be done - if !matches!( - self.state, - AgentState::Turn { - stream: StreamPhase::Done - } - ) { - return vec![]; - } - - // All current-turn tools must be resolved before the turn can complete - if !self.ctx.tools.all_resolved(&self.ctx.current_turn_tool_ids) { - return vec![]; - } - - // Turn is complete. Check if we need to continue (tool results to send back). - // We continue if this turn had any client tool calls (the LLM needs to see - // the results and respond). - if !self.ctx.current_turn_tool_ids.is_empty() { - // Continue conversation with tool results. - // Don't clear tools — they persist for rendering history. - // Clear turn IDs so the continuation turn doesn't loop. - self.ctx.current_turn_tool_ids.clear(); - let messages = self.build_messages(); - let session_id = self.ctx.session_id.clone(); - self.ctx.current_response.clear(); - self.state = AgentState::Turn { - stream: StreamPhase::Connecting, - }; - vec![Effect::StartStream { - messages, - session_id, - }] - } else { - // No tools — turn is done, go idle - self.state = AgentState::Idle { confirmation: None }; - vec![Effect::Persist] - } - } - - /// Extract the current confirmation state (if any). - fn state_confirmation(&self) -> Option<&PendingConfirmation> { - if let AgentState::Idle { - confirmation: Some(ref c), - } = self.state - { - Some(c) - } else { - None - } - } - - /// Get the most recent suggested command from the conversation. - /// Get the most recent command from the current invocation only. - fn current_command(&self) -> Option<String> { - self.current_invocation_events() - .rev() - .find_map(|e| e.as_command()) - .map(|s| s.to_string()) - } - - /// Check if the most recent command is dangerous. - fn is_current_command_dangerous(&self) -> bool { - self.current_invocation_events() - .rev() - .find_map(|e| { - if let ConversationEvent::ToolCall { name, input, .. } = e - && name == "suggest_command" - { - let danger = input - .get("danger") - .and_then(|v| v.as_str()) - .unwrap_or("low"); - Some(danger == "high" || danger == "medium" || danger == "med") - } else { - None - } - }) - .unwrap_or(false) - } - - /// Events from the current invocation only (from view_start_index onward). - fn current_invocation_events(&self) -> impl DoubleEndedIterator<Item = &ConversationEvent> { - let start = self.ctx.view_start_index.min(self.ctx.events.len()); - self.ctx.events[start..].iter() - } - - /// Handle a slash command by pushing an OOB event. - fn handle_slash_command(&mut self, command: &str, content: &str) { - self.ctx.events.push(ConversationEvent::OutOfBandOutput { - name: "System".to_string(), - command: Some(command.to_string()), - content: content.to_string(), - }); - } -} diff --git a/crates/atuin-ai/src/fsm/tests.rs b/crates/atuin-ai/src/fsm/tests.rs deleted file mode 100644 index 51c23915..00000000 --- a/crates/atuin-ai/src/fsm/tests.rs +++ /dev/null @@ -1,890 +0,0 @@ -//! Pure FSM transition tests. No IO, no async. - -use serde_json::json; - -use super::*; -use effects::{Effect, ExitAction}; -use events::{Event, PermissionChoice, PermissionResponse}; - -fn new_fsm() -> AgentFsm { - AgentFsm::new( - vec!["client_v1_read_file".to_string()], - "test-inv".to_string(), - ) -} - -// ============================================================================ -// Idle → Turn -// ============================================================================ - -#[test] -fn user_submit_starts_turn() { - let mut fsm = new_fsm(); - - let effects = fsm.handle(Event::UserSubmit("hello".into())); - - assert!(matches!( - fsm.state, - AgentState::Turn { - stream: StreamPhase::Connecting - } - )); - assert_eq!(effects.len(), 1); - assert!(matches!(effects[0], Effect::StartStream { .. })); - // User message was pushed to events - assert!(fsm.ctx.events.iter().any(|e| matches!( - e, - ConversationEvent::UserMessage { content } if content == "hello" - ))); -} - -#[test] -fn stream_started_transitions_to_streaming() { - let mut fsm = new_fsm(); - fsm.handle(Event::UserSubmit("hello".into())); - - let effects = fsm.handle(Event::StreamStarted); - - assert!(matches!( - fsm.state, - AgentState::Turn { - stream: StreamPhase::Streaming { status: None } - } - )); - assert!(effects.is_empty()); -} - -#[test] -fn stream_chunk_accumulates_text() { - let mut fsm = new_fsm(); - fsm.handle(Event::UserSubmit("hello".into())); - fsm.handle(Event::StreamStarted); - - fsm.handle(Event::StreamChunk("Hello ".into())); - fsm.handle(Event::StreamChunk("world!".into())); - - assert_eq!(fsm.ctx.current_response, "Hello world!"); -} - -#[test] -fn stream_done_without_tools_goes_idle() { - let mut fsm = new_fsm(); - fsm.handle(Event::UserSubmit("hello".into())); - fsm.handle(Event::StreamStarted); - fsm.handle(Event::StreamChunk("Hi there!".into())); - - let effects = fsm.handle(Event::StreamDone { - session_id: "s1".into(), - }); - - assert_eq!(fsm.state, AgentState::Idle { confirmation: None }); - assert_eq!(fsm.ctx.session_id, Some("s1".to_string())); - assert!(effects.iter().any(|e| matches!(e, Effect::Persist))); - // Text was committed to events - assert!(fsm.ctx.events.iter().any(|e| matches!( - e, - ConversationEvent::Text { content } if content == "Hi there!" - ))); -} - -// ============================================================================ -// Tool lifecycle -// ============================================================================ - -#[test] -fn stream_tool_call_tracks_tool_and_emits_check_permission() { - let mut fsm = new_fsm(); - fsm.handle(Event::UserSubmit("read a file".into())); - fsm.handle(Event::StreamStarted); - - let effects = fsm.handle(Event::StreamToolCall { - id: "t1".into(), - name: "read_file".into(), - input: json!({"file_path": "/tmp/test.txt"}), - }); - - assert!(fsm.ctx.tools.get("t1").is_some()); - assert_eq!(effects.len(), 1); - assert!(matches!(effects[0], Effect::CheckPermission { .. })); -} - -#[test] -fn permission_allowed_transitions_to_executing() { - let mut fsm = new_fsm(); - fsm.handle(Event::UserSubmit("read".into())); - fsm.handle(Event::StreamStarted); - fsm.handle(Event::StreamToolCall { - id: "t1".into(), - name: "read_file".into(), - input: json!({"file_path": "/tmp/test.txt"}), - }); - - let effects = fsm.handle(Event::PermissionResolved { - tool_id: "t1".into(), - response: PermissionResponse::Allowed, - }); - - assert_eq!(fsm.ctx.tools.get("t1").unwrap().state, ToolState::Executing); - assert!(matches!(effects[0], Effect::ExecuteTool { .. })); -} - -#[test] -fn permission_ask_transitions_to_awaiting() { - let mut fsm = new_fsm(); - fsm.handle(Event::UserSubmit("read".into())); - fsm.handle(Event::StreamStarted); - fsm.handle(Event::StreamToolCall { - id: "t1".into(), - name: "read_file".into(), - input: json!({"file_path": "/tmp/test.txt"}), - }); - - let effects = fsm.handle(Event::PermissionResolved { - tool_id: "t1".into(), - response: PermissionResponse::Ask, - }); - - assert_eq!( - fsm.ctx.tools.get("t1").unwrap().state, - ToolState::AwaitingPermission - ); - assert!(effects.is_empty()); -} - -#[test] -fn tool_done_after_stream_done_continues_conversation() { - let mut fsm = new_fsm(); - fsm.handle(Event::UserSubmit("read".into())); - fsm.handle(Event::StreamStarted); - fsm.handle(Event::StreamToolCall { - id: "t1".into(), - name: "read_file".into(), - input: json!({"file_path": "/tmp/test.txt"}), - }); - fsm.handle(Event::StreamDone { - session_id: "".into(), - }); - fsm.handle(Event::PermissionResolved { - tool_id: "t1".into(), - response: PermissionResponse::Allowed, - }); - - // Now in Turn { Done } with one tool Executing - let effects = fsm.handle(Event::ToolExecutionDone { - tool_id: "t1".into(), - outcome: crate::tools::ToolOutcome::Success("file contents".into()), - preview: None, - }); - - // Turn complete → continuation - assert!(matches!( - fsm.state, - AgentState::Turn { - stream: StreamPhase::Connecting - } - )); - assert!( - effects - .iter() - .any(|e| matches!(e, Effect::StartStream { .. })) - ); -} - -#[test] -fn continuation_turn_without_new_tools_goes_idle() { - let mut fsm = new_fsm(); - fsm.handle(Event::UserSubmit("read".into())); - fsm.handle(Event::StreamStarted); - fsm.handle(Event::StreamToolCall { - id: "t1".into(), - name: "read_file".into(), - input: json!({"file_path": "/tmp/test.txt"}), - }); - fsm.handle(Event::StreamDone { - session_id: "".into(), - }); - fsm.handle(Event::PermissionResolved { - tool_id: "t1".into(), - response: PermissionResponse::Allowed, - }); - // Tool completes → continuation starts - fsm.handle(Event::ToolExecutionDone { - tool_id: "t1".into(), - outcome: crate::tools::ToolOutcome::Success("contents".into()), - preview: None, - }); - assert!(matches!( - fsm.state, - AgentState::Turn { - stream: StreamPhase::Connecting - } - )); - - // Continuation stream: text only, no new tools - fsm.handle(Event::StreamStarted); - fsm.handle(Event::StreamChunk("Here's the file.".into())); - let effects = fsm.handle(Event::StreamDone { - session_id: "".into(), - }); - - // Should go Idle, NOT start another continuation - assert_eq!(fsm.state, AgentState::Idle { confirmation: None }); - assert!(effects.iter().any(|e| matches!(e, Effect::Persist))); - assert!( - !effects - .iter() - .any(|e| matches!(e, Effect::StartStream { .. })) - ); -} - -#[test] -fn tool_done_before_stream_done_stays_in_turn() { - let mut fsm = new_fsm(); - fsm.handle(Event::UserSubmit("read".into())); - fsm.handle(Event::StreamStarted); - fsm.handle(Event::StreamToolCall { - id: "t1".into(), - name: "read_file".into(), - input: json!({"file_path": "/tmp/test.txt"}), - }); - fsm.handle(Event::PermissionResolved { - tool_id: "t1".into(), - response: PermissionResponse::Allowed, - }); - - // Tool completes but stream hasn't sent Done yet - let effects = fsm.handle(Event::ToolExecutionDone { - tool_id: "t1".into(), - outcome: crate::tools::ToolOutcome::Success("contents".into()), - preview: None, - }); - - // Still in Turn — stream phase is Streaming, not Done - assert!(matches!( - fsm.state, - AgentState::Turn { - stream: StreamPhase::Streaming { .. } - } - )); - assert!(effects.is_empty()); -} - -// ============================================================================ -// Cancel -// ============================================================================ - -#[test] -fn cancel_during_streaming_goes_idle() { - let mut fsm = new_fsm(); - fsm.handle(Event::UserSubmit("hello".into())); - fsm.handle(Event::StreamStarted); - fsm.handle(Event::StreamChunk("partial text".into())); - - let effects = fsm.handle(Event::Cancel); - - assert_eq!(fsm.state, AgentState::Idle { confirmation: None }); - assert!(effects.iter().any(|e| matches!(e, Effect::AbortStream))); - assert!(effects.iter().any(|e| matches!(e, Effect::Persist))); - // Partial text committed with cancel suffix - assert!(fsm.ctx.events.iter().any(|e| matches!( - e, - ConversationEvent::Text { content } if content.contains("[User cancelled") - ))); -} - -#[test] -fn stale_permission_resolved_after_cancel_is_ignored() { - let mut fsm = new_fsm(); - fsm.handle(Event::UserSubmit("read".into())); - fsm.handle(Event::StreamStarted); - fsm.handle(Event::StreamToolCall { - id: "t1".into(), - name: "read_file".into(), - input: json!({"file_path": "/tmp/test.txt"}), - }); - fsm.handle(Event::StreamDone { - session_id: "".into(), - }); - // Tool is in CheckingPermission, cancel happens before permission resolves - fsm.handle(Event::Cancel); - assert_eq!(fsm.state, AgentState::Idle { confirmation: None }); - - // Stale permission result arrives — tool is already Completed (cancelled) - let effects = fsm.handle(Event::PermissionResolved { - tool_id: "t1".into(), - response: PermissionResponse::Allowed, - }); - - // Should NOT emit ExecuteTool — the tool was cancelled - assert!(effects.is_empty()); -} - -#[test] -fn cancel_during_turn_with_pending_tools() { - let mut fsm = new_fsm(); - fsm.handle(Event::UserSubmit("hello".into())); - fsm.handle(Event::StreamStarted); - fsm.handle(Event::StreamToolCall { - id: "t1".into(), - name: "read_file".into(), - input: json!({"file_path": "/tmp/test.txt"}), - }); - fsm.handle(Event::StreamDone { - session_id: "".into(), - }); - fsm.handle(Event::PermissionResolved { - tool_id: "t1".into(), - response: PermissionResponse::Allowed, - }); - // Tool is Executing, stream is Done - - let effects = fsm.handle(Event::Cancel); - - assert_eq!(fsm.state, AgentState::Idle { confirmation: None }); - assert!( - effects - .iter() - .any(|e| matches!(e, Effect::AbortTool { .. })) - ); - // Error ToolResult injected - assert!(fsm.ctx.events.iter().any(|e| matches!( - e, - ConversationEvent::ToolResult { tool_use_id, is_error: true, .. } if tool_use_id == "t1" - ))); - // SystemContext about cancellation - assert!(fsm.ctx.events.iter().any(|e| matches!( - e, - ConversationEvent::SystemContext { content } if content.contains("cancelled") - ))); -} - -#[test] -fn stale_tool_result_after_cancel_is_ignored() { - let mut fsm = new_fsm(); - fsm.handle(Event::UserSubmit("hello".into())); - fsm.handle(Event::StreamStarted); - fsm.handle(Event::StreamToolCall { - id: "t1".into(), - name: "read_file".into(), - input: json!({"file_path": "/tmp/test.txt"}), - }); - fsm.handle(Event::StreamDone { - session_id: "".into(), - }); - fsm.handle(Event::PermissionResolved { - tool_id: "t1".into(), - response: PermissionResponse::Allowed, - }); - fsm.handle(Event::Cancel); - - // Stale event arrives - let effects = fsm.handle(Event::ToolExecutionDone { - tool_id: "t1".into(), - outcome: crate::tools::ToolOutcome::Success("contents".into()), - preview: None, - }); - - assert_eq!(fsm.state, AgentState::Idle { confirmation: None }); - assert!(effects.is_empty()); -} - -// ============================================================================ -// Confirmation -// ============================================================================ - -#[test] -fn dangerous_command_enters_confirmation() { - let mut fsm = new_fsm(); - // Simulate a dangerous command in history - fsm.ctx.events.push(ConversationEvent::ToolCall { - id: "sc1".into(), - name: "suggest_command".into(), - input: json!({"command": "rm -rf /", "description": "bad", "confidence": "high", "danger": "high"}), - }); - - let effects = fsm.handle(Event::ExecuteCommand); - - assert!(matches!( - fsm.state, - AgentState::Idle { - confirmation: Some(_) - } - )); - assert!( - effects - .iter() - .any(|e| matches!(e, Effect::ScheduleTimeout { .. })) - ); -} - -#[test] -fn second_execute_confirms_and_exits() { - let mut fsm = new_fsm(); - fsm.ctx.events.push(ConversationEvent::ToolCall { - id: "sc1".into(), - name: "suggest_command".into(), - input: json!({"command": "rm -rf /", "description": "bad", "confidence": "high", "danger": "high"}), - }); - fsm.handle(Event::ExecuteCommand); - - let effects = fsm.handle(Event::ExecuteCommand); - - assert!(effects.iter().any(|e| matches!( - e, - Effect::ExitApp(ExitAction::Execute(cmd)) if cmd == "rm -rf /" - ))); -} - -#[test] -fn confirmation_timeout_clears_confirmation() { - let mut fsm = new_fsm(); - fsm.ctx.events.push(ConversationEvent::ToolCall { - id: "sc1".into(), - name: "suggest_command".into(), - input: json!({"command": "rm -rf /", "description": "bad", "confidence": "high", "danger": "high"}), - }); - fsm.handle(Event::ExecuteCommand); - let timeout_id = match &fsm.state { - AgentState::Idle { - confirmation: Some(c), - } => c.timeout_id, - _ => panic!("expected confirmation"), - }; - - fsm.handle(Event::ConfirmationTimeout { timeout_id }); - - assert_eq!(fsm.state, AgentState::Idle { confirmation: None }); -} - -// ============================================================================ -// Error / Retry -// ============================================================================ - -#[test] -fn stream_error_goes_to_error_state() { - let mut fsm = new_fsm(); - fsm.handle(Event::UserSubmit("hello".into())); - fsm.handle(Event::StreamStarted); - - fsm.handle(Event::StreamError("network error".into())); - - assert_eq!(fsm.state, AgentState::Error("network error".to_string())); -} - -#[test] -fn retry_from_error_starts_new_stream() { - let mut fsm = new_fsm(); - fsm.handle(Event::UserSubmit("hello".into())); - fsm.handle(Event::StreamStarted); - fsm.handle(Event::StreamError("fail".into())); - - let effects = fsm.handle(Event::Retry); - - assert!(matches!( - fsm.state, - AgentState::Turn { - stream: StreamPhase::Connecting - } - )); - assert!( - effects - .iter() - .any(|e| matches!(e, Effect::StartStream { .. })) - ); -} - -// ============================================================================ -// Permission choices -// ============================================================================ - -#[test] -fn permission_deny_completes_turn_and_continues() { - let mut fsm = new_fsm(); - fsm.handle(Event::UserSubmit("read".into())); - fsm.handle(Event::StreamStarted); - fsm.handle(Event::StreamToolCall { - id: "t1".into(), - name: "read_file".into(), - input: json!({"file_path": "/tmp/test.txt"}), - }); - fsm.handle(Event::StreamDone { - session_id: "".into(), - }); - fsm.handle(Event::PermissionResolved { - tool_id: "t1".into(), - response: PermissionResponse::Ask, - }); - - let effects = fsm.handle(Event::PermissionUserChoice { - tool_id: "t1".into(), - choice: PermissionChoice::Deny, - }); - - // Turn should complete since all tools resolved and stream is done - // → continuation needed (there was a tool result to send back) - assert!(matches!( - fsm.state, - AgentState::Turn { - stream: StreamPhase::Connecting - } - )); - assert!( - effects - .iter() - .any(|e| matches!(e, Effect::StartStream { .. })) - ); - // Error result was injected - assert!(fsm.ctx.events.iter().any(|e| matches!( - e, - ConversationEvent::ToolResult { tool_use_id, is_error: true, .. } if tool_use_id == "t1" - ))); -} - -// ============================================================================ -// Shell execution timeouts -// ============================================================================ - -fn fsm_with_shell() -> AgentFsm { - AgentFsm::new( - vec![ - "client_v1_read_file".to_string(), - "client_v1_execute_shell_command".to_string(), - ], - "test-inv".to_string(), - ) -} - -fn shell_tool_call_event(id: &str) -> Event { - Event::StreamToolCall { - id: id.into(), - name: "execute_shell_command".into(), - input: json!({ - "command": "sleep 999", - "shell": "bash", - "timeout": 60, - "description": "test" - }), - } -} - -#[test] -fn shell_tool_schedules_execution_timeout() { - let mut fsm = fsm_with_shell(); - fsm.handle(Event::UserSubmit("run something".into())); - fsm.handle(Event::StreamStarted); - fsm.handle(shell_tool_call_event("t1")); - - let effects = fsm.handle(Event::PermissionResolved { - tool_id: "t1".into(), - response: PermissionResponse::Allowed, - }); - - // Should have ExecuteTool + ScheduleTimeout - assert!( - effects - .iter() - .any(|e| matches!(e, Effect::ExecuteTool { .. })) - ); - assert!(effects.iter().any(|e| matches!( - e, - Effect::ScheduleTimeout { kind: effects::TimeoutKind::ToolExecution { tool_id }, .. } - if tool_id == "t1" - ))); - assert!(!fsm.ctx.tool_timeout_ids.is_empty()); -} - -#[test] -fn read_tool_does_not_schedule_timeout() { - let mut fsm = new_fsm(); - fsm.handle(Event::UserSubmit("read".into())); - fsm.handle(Event::StreamStarted); - fsm.handle(Event::StreamToolCall { - id: "t1".into(), - name: "read_file".into(), - input: json!({"file_path": "/tmp/test.txt"}), - }); - - let effects = fsm.handle(Event::PermissionResolved { - tool_id: "t1".into(), - response: PermissionResponse::Allowed, - }); - - assert!( - effects - .iter() - .any(|e| matches!(e, Effect::ExecuteTool { .. })) - ); - assert!( - !effects - .iter() - .any(|e| matches!(e, Effect::ScheduleTimeout { .. })) - ); - assert!(fsm.ctx.tool_timeout_ids.is_empty()); -} - -#[test] -fn tool_completion_clears_timeout_mapping() { - let mut fsm = fsm_with_shell(); - fsm.handle(Event::UserSubmit("run".into())); - fsm.handle(Event::StreamStarted); - fsm.handle(shell_tool_call_event("t1")); - fsm.handle(Event::PermissionResolved { - tool_id: "t1".into(), - response: PermissionResponse::Allowed, - }); - fsm.handle(Event::StreamDone { - session_id: "s1".into(), - }); - - assert!(!fsm.ctx.tool_timeout_ids.is_empty()); - - // Tool completes naturally - fsm.handle(Event::ToolExecutionDone { - tool_id: "t1".into(), - outcome: crate::tools::ToolOutcome::Success("done".into()), - preview: None, - }); - - assert!(fsm.ctx.tool_timeout_ids.is_empty()); -} - -#[test] -fn stale_timeout_after_natural_completion_is_ignored() { - let mut fsm = fsm_with_shell(); - fsm.handle(Event::UserSubmit("run".into())); - fsm.handle(Event::StreamStarted); - fsm.handle(shell_tool_call_event("t1")); - fsm.handle(Event::PermissionResolved { - tool_id: "t1".into(), - response: PermissionResponse::Allowed, - }); - fsm.handle(Event::StreamDone { - session_id: "s1".into(), - }); - - // Tool completes naturally - fsm.handle(Event::ToolExecutionDone { - tool_id: "t1".into(), - outcome: crate::tools::ToolOutcome::Success("done".into()), - preview: None, - }); - - // Stale timeout fires — should be no-op - let effects = fsm.handle(Event::ToolExecutionTimeout { - timeout_id: 0, - tool_id: "t1".into(), - }); - - assert!(effects.is_empty()); -} - -#[test] -fn timeout_fires_before_completion_emits_abort() { - let mut fsm = fsm_with_shell(); - fsm.handle(Event::UserSubmit("run".into())); - fsm.handle(Event::StreamStarted); - fsm.handle(shell_tool_call_event("t1")); - fsm.handle(Event::PermissionResolved { - tool_id: "t1".into(), - response: PermissionResponse::Allowed, - }); - fsm.handle(Event::StreamDone { - session_id: "s1".into(), - }); - - // Timeout fires while tool is still executing - let effects = fsm.handle(Event::ToolExecutionTimeout { - timeout_id: 0, - tool_id: "t1".into(), - }); - - assert_eq!(effects.len(), 1); - assert!(matches!( - effects[0], - Effect::AbortTool { ref tool_id } if tool_id == "t1" - )); - // Timeout mapping cleaned up - assert!(fsm.ctx.tool_timeout_ids.is_empty()); -} - -#[test] -fn timeout_respects_llm_specified_duration() { - let mut fsm = fsm_with_shell(); - fsm.handle(Event::UserSubmit("run".into())); - fsm.handle(Event::StreamStarted); - - // Tool call with timeout: 120 - fsm.handle(Event::StreamToolCall { - id: "t1".into(), - name: "execute_shell_command".into(), - input: json!({ - "command": "cargo build", - "shell": "bash", - "timeout": 120, - "description": "build" - }), - }); - - let effects = fsm.handle(Event::PermissionResolved { - tool_id: "t1".into(), - response: PermissionResponse::Allowed, - }); - - let timeout_effect = effects - .iter() - .find(|e| matches!(e, Effect::ScheduleTimeout { .. })); - assert!(matches!( - timeout_effect, - Some(Effect::ScheduleTimeout { duration, .. }) if *duration == std::time::Duration::from_secs(120) - )); -} - -#[test] -fn cancel_clears_timeout_mappings() { - let mut fsm = fsm_with_shell(); - fsm.handle(Event::UserSubmit("run".into())); - fsm.handle(Event::StreamStarted); - fsm.handle(shell_tool_call_event("t1")); - fsm.handle(Event::PermissionResolved { - tool_id: "t1".into(), - response: PermissionResponse::Allowed, - }); - - assert!(!fsm.ctx.tool_timeout_ids.is_empty()); - - fsm.handle(Event::Cancel); - - assert!(fsm.ctx.tool_timeout_ids.is_empty()); -} - -#[test] -fn timeout_abort_propagates_timeout_reason_to_preview_and_llm() { - use super::tools::InterruptReason; - - let mut fsm = fsm_with_shell(); - fsm.handle(Event::UserSubmit("run".into())); - fsm.handle(Event::StreamStarted); - fsm.handle(shell_tool_call_event("t1")); - fsm.handle(Event::PermissionResolved { - tool_id: "t1".into(), - response: PermissionResponse::Allowed, - }); - fsm.handle(Event::StreamDone { - session_id: "s1".into(), - }); - - // Timeout fires - fsm.handle(Event::ToolExecutionTimeout { - timeout_id: 0, - tool_id: "t1".into(), - }); - - // Tool completes after abort (interrupted: true from execute_shell_command_streaming) - fsm.handle(Event::ToolExecutionDone { - tool_id: "t1".into(), - outcome: crate::tools::ToolOutcome::Structured { - stdout: "partial output".into(), - stderr: String::new(), - exit_code: None, - duration_ms: 60000, - interrupted: true, - }, - preview: Some(super::tools::ToolPreviewData::Shell { - lines: vec!["partial output".into()], - exit_code: None, - interrupted: None, // FSM overrides this with the reason - }), - }); - - // Preview should carry Timeout reason - let tracked = fsm.ctx.tools.get("t1").unwrap(); - let preview = tracked.shell_preview().unwrap(); - assert_eq!(preview.interrupted, Some(InterruptReason::Timeout(60))); - - // LLM content should say "Timed out" not "Interrupted by user" - let tool_result = fsm.ctx.events.iter().find( - |e| matches!(e, ConversationEvent::ToolResult { tool_use_id, .. } if tool_use_id == "t1"), - ); - if let Some(ConversationEvent::ToolResult { content, .. }) = tool_result { - assert!( - content.contains("[Timed out after 60s]"), - "Expected timeout message, got: {content}" - ); - assert!(!content.contains("[Interrupted by user]")); - } else { - panic!("No ToolResult found for t1"); - } -} - -#[test] -fn user_interrupt_propagates_user_reason_to_preview_and_llm() { - use super::tools::InterruptReason; - - let mut fsm = fsm_with_shell(); - fsm.handle(Event::UserSubmit("run".into())); - fsm.handle(Event::StreamStarted); - fsm.handle(shell_tool_call_event("t1")); - fsm.handle(Event::PermissionResolved { - tool_id: "t1".into(), - response: PermissionResponse::Allowed, - }); - fsm.handle(Event::StreamDone { - session_id: "s1".into(), - }); - - // User interrupts - fsm.handle(Event::InterruptTools); - - // Tool completes after abort - fsm.handle(Event::ToolExecutionDone { - tool_id: "t1".into(), - outcome: crate::tools::ToolOutcome::Structured { - stdout: "partial".into(), - stderr: String::new(), - exit_code: None, - duration_ms: 5000, - interrupted: true, - }, - preview: Some(super::tools::ToolPreviewData::Shell { - lines: vec!["partial".into()], - exit_code: None, - interrupted: None, // FSM overrides this with the reason - }), - }); - - // Preview should carry User reason - let tracked = fsm.ctx.tools.get("t1").unwrap(); - let preview = tracked.shell_preview().unwrap(); - assert_eq!(preview.interrupted, Some(InterruptReason::User)); - - // LLM content should say "Interrupted by user" - let tool_result = fsm.ctx.events.iter().find( - |e| matches!(e, ConversationEvent::ToolResult { tool_use_id, .. } if tool_use_id == "t1"), - ); - if let Some(ConversationEvent::ToolResult { content, .. }) = tool_result { - assert!( - content.contains("[Interrupted by user]"), - "Expected user interrupt message, got: {content}" - ); - } else { - panic!("No ToolResult found for t1"); - } -} - -#[test] -fn user_interrupt_clears_timeout_mappings_for_aborted_tools() { - let mut fsm = fsm_with_shell(); - fsm.handle(Event::UserSubmit("run".into())); - fsm.handle(Event::StreamStarted); - fsm.handle(shell_tool_call_event("t1")); - fsm.handle(Event::PermissionResolved { - tool_id: "t1".into(), - response: PermissionResponse::Allowed, - }); - - assert!(!fsm.ctx.tool_timeout_ids.is_empty()); - - fsm.handle(Event::InterruptTools); - - assert!(fsm.ctx.tool_timeout_ids.is_empty()); -} diff --git a/crates/atuin-ai/src/fsm/tools.rs b/crates/atuin-ai/src/fsm/tools.rs deleted file mode 100644 index 96348672..00000000 --- a/crates/atuin-ai/src/fsm/tools.rs +++ /dev/null @@ -1,178 +0,0 @@ -//! Tool lifecycle management within the FSM. -//! -//! Each tool call goes through an independent lifecycle. The ToolManager -//! tracks all tools in the current turn and provides the "all resolved" -//! check that gates turn completion. - -use crate::diff::{EditPreview, WritePreview}; -use crate::tools::ClientToolCall; - -/// Why a tool execution was interrupted. -#[derive(Debug, Clone, PartialEq, Eq)] -pub(crate) enum InterruptReason { - /// User pressed Ctrl+C or Esc during execution. - User, - /// The LLM-specified execution timeout expired. - Timeout(u64), -} - -/// Per-tool lifecycle state. -#[derive(Debug, Clone, PartialEq, Eq)] -pub(crate) enum ToolState { - /// Permission resolver is running asynchronously. - CheckingPermission, - /// Waiting for user to grant/deny via the permission dialog. - AwaitingPermission, - /// Actively executing. - Executing, - /// Execution completed (result injected into conversation). - Completed, - /// User denied permission (error result injected into conversation). - Denied, -} - -/// Cached preview data for rendering tool output. -#[derive(Debug, Clone)] -pub(crate) enum ToolPreviewData { - /// Shell command VT100 output lines. - Shell { - lines: Vec<String>, - exit_code: Option<i32>, - interrupted: Option<InterruptReason>, - }, - /// File edit diff preview. - Edit(EditPreview), - /// File write content preview. - Write(WritePreview), -} - -/// A tracked tool call with its current lifecycle state. -#[derive(Debug, Clone)] -pub(crate) struct TrackedTool { - pub id: String, - pub tool: ClientToolCall, - pub state: ToolState, - /// Cached preview data for rendering (populated during/after execution). - pub preview: Option<ToolPreviewData>, - /// Set by the FSM when it emits AbortTool, so that ToolExecutionDone - /// can distinguish user interrupts from timeouts. - pub interrupt_reason: Option<InterruptReason>, -} - -impl TrackedTool { - /// Whether this tool has reached a terminal state. - pub fn is_resolved(&self) -> bool { - matches!(self.state, ToolState::Completed | ToolState::Denied) - } - - /// Extract shell preview data (for TurnBuilder compatibility). - pub fn shell_preview(&self) -> Option<crate::tools::ToolPreview> { - match &self.preview { - Some(ToolPreviewData::Shell { - lines, - exit_code, - interrupted, - }) => Some(crate::tools::ToolPreview { - lines: lines.clone(), - exit_code: *exit_code, - interrupted: interrupted.clone(), - }), - _ => None, - } - } - - /// Extract edit diff preview (for TurnBuilder compatibility). - pub fn edit_preview(&self) -> Option<&EditPreview> { - match &self.preview { - Some(ToolPreviewData::Edit(p)) => Some(p), - _ => None, - } - } - - /// Extract write content preview (for TurnBuilder compatibility). - pub fn write_preview(&self) -> Option<&WritePreview> { - match &self.preview { - Some(ToolPreviewData::Write(p)) => Some(p), - _ => None, - } - } -} - -/// Manages tool call lifecycles for a single turn. -/// -/// Tools are inserted when received from the stream and progress through -/// their lifecycle independently. The manager provides aggregate queries -/// (all resolved, any awaiting permission, etc.) that the FSM uses for -/// state transitions. -#[derive(Debug, Clone, Default)] -pub(crate) struct ToolManager { - tools: Vec<TrackedTool>, -} - -impl ToolManager { - pub fn new() -> Self { - Self { tools: Vec::new() } - } - - /// Insert a new tool in CheckingPermission state. - pub fn insert(&mut self, id: String, tool: ClientToolCall) { - self.tools.push(TrackedTool { - id, - tool, - state: ToolState::CheckingPermission, - preview: None, - interrupt_reason: None, - }); - } - - /// Look up a tool by ID. - pub fn get(&self, id: &str) -> Option<&TrackedTool> { - self.tools.iter().find(|t| t.id == id) - } - - /// Look up a tool mutably by ID. - pub fn get_mut(&mut self, id: &str) -> Option<&mut TrackedTool> { - self.tools.iter_mut().find(|t| t.id == id) - } - - /// True if all tools from the given set of IDs have reached a terminal state. - /// Returns true for an empty set (vacuously — no tools to wait for). - pub fn all_resolved(&self, tool_ids: &[String]) -> bool { - tool_ids - .iter() - .all(|id| self.get(id).is_some_and(|t| t.is_resolved())) - } - - /// Find the first tool awaiting user permission. - pub fn awaiting_permission(&self) -> Option<&TrackedTool> { - self.tools - .iter() - .find(|t| t.state == ToolState::AwaitingPermission) - } - - /// Get IDs of all non-resolved tools (for cancel). - pub fn pending_ids(&self) -> Vec<String> { - self.tools - .iter() - .filter(|t| !t.is_resolved()) - .map(|t| t.id.clone()) - .collect() - } - - /// Get IDs of all currently executing tools (for interrupt/abort). - pub fn executing_ids(&self) -> Vec<String> { - self.tools - .iter() - .filter(|t| t.state == ToolState::Executing) - .map(|t| t.id.clone()) - .collect() - } - - /// True if any tool has a shell preview with live output. - pub fn has_executing_preview(&self) -> bool { - self.tools.iter().any(|t| { - t.state == ToolState::Executing - && matches!(t.preview, Some(ToolPreviewData::Shell { .. })) - }) - } -} |
