//! Driver loop for the agent FSM. //! //! Receives events from the channel, calls `fsm.handle()`, syncs ViewState //! to the Handle, and executes effects (spawning async tasks for IO). //! //! The driver runs on a blocking thread (`spawn_blocking`) so it can call //! `blocking_recv()` on the Handle and `block_on()` for async persistence. use std::path::PathBuf; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc; use eye_declare::Handle; use crate::context::{AppContext, ClientContext}; use crate::edit_permissions::EditPermissionCache; use crate::file_tracker::FileReadTracker; use crate::fsm::effects::{Effect, ExitAction, PermissionTarget}; use crate::fsm::events::{Event, PermissionChoice, PermissionResponse}; use crate::fsm::tools::ToolPreviewData; use crate::fsm::{AgentFsm, AgentState}; use crate::permissions::resolver::PermissionResolver; use crate::permissions::writer; use crate::session::SessionManager; use crate::stream::ChatRequest; use crate::tools::ClientToolCall; use crate::tui::events::{AiTuiEvent, PermissionResult}; use crate::tui::state::ConversationEvent; use crate::tui::view::turn; // ============================================================================ // Driver event — the unified channel type // ============================================================================ /// Events processed by the driver loop. /// /// Components emit `Tui` variants via the channel. Spawned async tasks /// (stream, tool execution) emit `Fsm` variants directly. #[derive(Debug)] pub(crate) enum DriverEvent { /// Event from a TUI component (key press, input change, etc.) Tui(AiTuiEvent), /// Internal FSM event (from spawned stream/tool tasks) Fsm(Event), } // ============================================================================ // IO context (driver-owned, not visible to FSM) // ============================================================================ pub(crate) struct IoContext { pub app_ctx: AppContext, pub client_ctx: ClientContext, pub session_mgr: SessionManager, pub file_tracker: FileReadTracker, pub edit_permissions: EditPermissionCache, pub snapshot_store: Option, pub skill_registry: crate::skills::SkillRegistry, } // ============================================================================ // ViewState (Handle payload for the render thread) // ============================================================================ /// State pushed to the Handle for the view/render thread. /// Synced from the FSM after each transition. #[derive(Debug)] pub(crate) struct ViewState { // ─── From FSM ─────────────────────────────────────────────── pub agent_state: AgentState, pub visible_events: Vec, pub all_events: Vec, pub session_id: Option, pub tools: crate::fsm::tools::ToolManager, pub current_response: String, // ─── Session metadata (set once) ──────────────────────────── pub is_resumed: bool, pub last_event_time: Option>, pub in_git_project: bool, // ─── View-only ────────────────────────────────────────────── pub archived_events: Vec, // ─── Pre-computed for rendering ──────────────────────────── pub turns: Vec, pub has_command: bool, pub committed_turn_count: usize, pub archived_turn_count: usize, // ─── Ephemeral interaction state ──────────────────────────── pub is_input_blank: bool, pub slash_command_input: Option, pub slash_command_search_results: Vec, pub exit_action: Option, pub slash_registry: crate::tui::slash::SlashCommandRegistry, pub skill_names: std::collections::HashSet, } impl ViewState { pub fn is_exiting(&self) -> bool { self.exit_action.is_some() } pub fn is_busy(&self) -> bool { matches!(self.agent_state, AgentState::Turn { .. }) } pub fn has_confirmation(&self) -> bool { matches!( self.agent_state, AgentState::Idle { confirmation: Some(_) } ) } pub fn is_input_active(&self) -> bool { matches!(self.agent_state, AgentState::Idle { .. }) && !self.has_confirmation() } pub fn footer_text(&self) -> &'static str { match &self.agent_state { AgentState::Idle { confirmation: None } => { if self.has_command && self.is_input_blank { "[Enter] Execute suggested command [Tab] Insert Command" } else { "[Enter] Send [Shift+Enter] New line [Esc] Exit" } } AgentState::Idle { confirmation: Some(_), } => "[Enter] Confirm dangerous command [Esc] Cancel", AgentState::Turn { .. } => "[Esc] Cancel", AgentState::Error(_) => "[Enter]/[r] Retry [Esc] Exit", } } } // ============================================================================ // Main driver loop // ============================================================================ struct DriverContext<'a> { fsm: &'a mut AgentFsm, io: &'a mut IoContext, handle: &'a Handle, tx: &'a mpsc::Sender, exiting: &'a Arc, stream_cancel_tx: &'a mut Option>, tool_abort_txs: &'a mut std::collections::HashMap>, } /// Main driver loop. Processes events, transitions FSM, syncs view, executes effects. /// /// Runs on a blocking thread. Returns when the event channel closes or exit is requested. /// The Handle already contains the initial ViewState (set by Application::builder). pub(crate) fn run_driver( mut fsm: AgentFsm, mut io: IoContext, handle: Handle, rx: mpsc::Receiver, tx: mpsc::Sender, exiting: Arc, in_git_project: bool, ) { // Dropping the sender cancels the stream (receiver sees Err on changed()). let mut stream_cancel_tx: Option> = None; // Per-tool interrupt senders for shell commands. let mut tool_abort_txs: std::collections::HashMap> = std::collections::HashMap::new(); while let Ok(driver_event) = rx.recv() { // Log and translate DriverEvent to FSM Event (or handle directly) let fsm_event = match driver_event { DriverEvent::Fsm(event) => { tracing::trace!(?event, state = ?fsm.state, "FSM event"); Some(event) } DriverEvent::Tui(tui_event) => { tracing::trace!(?tui_event, state = ?fsm.state, "TUI event"); translate_tui_event(tui_event, &handle) } }; if let Some(event) = fsm_event { // Feed event to FSM let effects = fsm.handle(event); tracing::trace!(?effects, state = ?fsm.state, "FSM transition"); // Sync ViewState to Handle (FSM owns all state now) sync_view_state(&handle, &fsm, in_git_project); // Execute effects (only persist when FSM says to) for effect in &effects { if matches!(effect, Effect::Persist) { persist(&fsm, &mut io); } let ctx = DriverContext { fsm: &mut fsm, io: &mut io, handle: &handle, tx: &tx, exiting: &exiting, stream_cancel_tx: &mut stream_cancel_tx, tool_abort_txs: &mut tool_abort_txs, }; execute_effect(effect, ctx); } // Final sync after effects — ensures the render thread sees // the absolute final state even if effects modified anything. if !effects.is_empty() { sync_view_state(&handle, &fsm, in_git_project); } } // InputUpdated (the only event that returns None) already pushed // its view-only changes via handle.update() — no FSM state changed, // so skip the expensive sync_view_state that clones all events. if exiting.load(Ordering::Acquire) { break; } tracing::trace!(state = ?fsm.state, "driver loop iteration complete, waiting for next event"); } } // ============================================================================ // TUI event translation // ============================================================================ /// Translate a TUI event into an FSM event. /// Returns None for events handled directly (e.g. InputUpdated). fn translate_tui_event(event: AiTuiEvent, handle: &Handle) -> Option { match event { AiTuiEvent::SubmitInput(input) => { // Clear slash state and reset is_input_blank (the InputBox clears // its text on submit but doesn't fire InputUpdated for the clear). handle.update(|vs| { vs.slash_command_input = None; vs.slash_command_search_results.clear(); vs.is_input_blank = true; }); let input = input.trim().to_string(); if input.is_empty() { Some(Event::ExecuteCommand) } else if input == "/new" { Some(Event::NewSession) } else if input.starts_with('/') { if let Some((skill_name, arguments)) = resolve_skill_name(&input, handle) { Some(Event::RequestSkillLoad { name: skill_name, arguments, }) } else { let content = resolve_slash_command(&input, handle); Some(Event::SlashCommand { command: input, content, }) } } else { Some(Event::UserSubmit(input)) } } AiTuiEvent::InputUpdated(text) => { let is_blank = text.is_empty(); // Hot path (every keystroke); uses handle.update_tracked // to allow read()ing the state without marking it dirty. handle.update_tracked(move |vs| { if vs.read().is_input_blank != is_blank { vs.is_input_blank = is_blank; } if text.starts_with('/') { let query = text.trim_start_matches('/').to_string(); let mut results = vs.slash_registry.search_fuzzy(&query); results.sort_by(|a, b| { b.relevance .partial_cmp(&a.relevance) .unwrap_or(std::cmp::Ordering::Equal) }); vs.slash_command_input = Some(query); vs.slash_command_search_results = results; } else { if vs.read().slash_command_input.is_some() { vs.slash_command_input = None; } if !vs.read().slash_command_search_results.is_empty() { vs.slash_command_search_results.clear(); } } }); None } AiTuiEvent::CancelGeneration => Some(Event::Cancel), AiTuiEvent::ExecuteCommand => Some(Event::ExecuteCommand), AiTuiEvent::InsertCommand => Some(Event::InsertCommand), AiTuiEvent::CancelConfirmation => Some(Event::Cancel), AiTuiEvent::InterruptToolExecution => Some(Event::InterruptTools), AiTuiEvent::Retry => Some(Event::Retry), AiTuiEvent::Exit => Some(Event::Cancel), AiTuiEvent::SelectPermission(result) => { let tool_id = handle .fetch(|vs| vs.tools.awaiting_permission().map(|t| t.id.clone())) .blocking_recv() .ok() .flatten(); let tool_id = tool_id?; let choice = match result { PermissionResult::Allow => PermissionChoice::Allow, PermissionResult::AllowFileForSession => PermissionChoice::AllowForSession, PermissionResult::AlwaysAllowInDir => PermissionChoice::AlwaysAllowInProject, PermissionResult::AlwaysAllow => PermissionChoice::AlwaysAllow, PermissionResult::Deny => PermissionChoice::Deny, }; Some(Event::PermissionUserChoice { tool_id, choice }) } AiTuiEvent::SlashCommand(cmd) => { if let Some((skill_name, arguments)) = resolve_skill_name(&cmd, handle) { Some(Event::RequestSkillLoad { name: skill_name, arguments, }) } else { let content = resolve_slash_command(&cmd, handle); Some(Event::SlashCommand { command: cmd, content, }) } } } } /// Resolve a slash command to its output content. /// If the input starts with `/`, check whether the command name matches a /// registered skill. Returns `Some((skill_name, arguments))` if it does. fn resolve_skill_name(input: &str, handle: &Handle) -> Option<(String, Option)> { let after_slash = input.trim_start_matches('/'); let cmd_name = after_slash.split_whitespace().next()?.to_string(); let is_skill = handle .fetch({ let cmd_name = cmd_name.clone(); move |vs| vs.skill_names.contains(&cmd_name) }) .blocking_recv() .unwrap_or(false); if !is_skill { return None; } let args = after_slash .strip_prefix(&cmd_name) .map(|s| s.trim()) .filter(|s| !s.is_empty()) .map(|s| s.to_string()); Some((cmd_name, args)) } fn resolve_slash_command(command: &str, handle: &Handle) -> String { match command.trim() { "/help" => { let commands = handle .fetch(|vs| { vs.slash_registry .get_commands() .iter() .map(|cmd| format!("- `/{}` — {}", cmd.name, cmd.description)) .collect::>() .join("\n") }) .blocking_recv() .unwrap_or_default(); include_str!("tui/content/help.md").replace("{commands}", &commands) } _ => format!("Unknown command: {command}"), } } // ============================================================================ // ViewState sync // ============================================================================ fn sync_view_state(handle: &Handle, fsm: &AgentFsm, in_git_project: bool) { let state = fsm.state.clone(); let safe_start = fsm.ctx.view_start_index.min(fsm.ctx.events.len()); let mut visible_events = fsm.ctx.events[safe_start..].to_vec(); let all_events = fsm.ctx.events.clone(); let tools = fsm.ctx.tools.clone(); let current_response = fsm.ctx.current_response.clone(); let session_id = fsm.ctx.session_id.clone(); let is_resumed = fsm.ctx.is_resumed; let last_event_time = fsm.ctx.last_event_time; let archived_events = fsm.ctx.archived_events.clone(); // Inject streaming text as a synthetic event for live rendering. // The FSM commits text to events on stream end; this makes it visible during streaming. let trimmed = current_response.trim_start(); if !trimmed.is_empty() { visible_events.push(ConversationEvent::Text { content: trimmed.to_string(), }); } // Pre-compute turns and has_command on the driver thread so the // render-thread view function doesn't redo O(n) work every frame. let mut archived_builder = turn::TurnBuilder::new(&tools); for event in &archived_events { archived_builder.add_event(event); } let archived_turns = archived_builder.build(); let archived_turn_count = archived_turns.len(); let mut visible_builder = turn::TurnBuilder::new_starting_at(&tools, archived_turn_count); for event in &visible_events { visible_builder.add_event(event); } let visible_turns = visible_builder.build(); let mut turns = archived_turns; turns.extend(visible_turns); let has_command = visible_events.iter().any(|e| { if let ConversationEvent::ToolCall { name, input, .. } = e { name == "suggest_command" && input.get("command").and_then(|v| v.as_str()).is_some() } else { false } }); tracing::trace!(?state, "sync_view_state pushing to handle"); handle.update(move |vs| { vs.agent_state = state; vs.visible_events = visible_events; vs.all_events = all_events; vs.tools = tools; vs.current_response = current_response; vs.session_id = session_id; vs.is_resumed = is_resumed; vs.last_event_time = last_event_time; vs.in_git_project = in_git_project; vs.archived_events = archived_events; vs.turns = turns; vs.has_command = has_command; vs.archived_turn_count = archived_turn_count; }); } // ============================================================================ // Effect execution // ============================================================================ fn execute_effect(effect: &Effect, ctx: DriverContext) { let DriverContext { fsm, io, handle, tx, exiting, stream_cancel_tx, tool_abort_txs, } = ctx; match effect { Effect::StartStream { messages, session_id, } => { // Cancel any existing stream before starting a new one stream_cancel_tx.take(); let (cancel_tx, cancel_rx) = tokio::sync::watch::channel(()); *stream_cancel_tx = Some(cancel_tx); let tx = tx.clone(); let app = io.app_ctx.clone(); let cc = io.client_ctx.clone(); let (skill_summaries, skill_overflow) = io.skill_registry.server_skills(); let request = ChatRequest::new( messages.clone(), session_id.clone(), &app.capabilities, fsm.ctx.invocation_id.clone(), ); tokio::spawn(async move { run_stream_bridge( request, app, cc, tx, cancel_rx, skill_summaries, skill_overflow, ) .await; }); } Effect::AbortStream => { // Drop the sender — the bridge's cancel_rx.changed() will error, // breaking the stream loop and dropping the HTTP connection. stream_cancel_tx.take(); } Effect::CheckPermission { tool_id, tool } => { let tool_id = tool_id.clone(); let tool = tool.clone(); let tx = tx.clone(); // Auto-approved tools (e.g. load_skill) bypass permission checks entirely if tool.is_auto_approved() { let _ = tx.send(DriverEvent::Fsm(Event::PermissionResolved { tool_id, response: PermissionResponse::Allowed, })); return; } let working_dir = tool .target_dir() .map(|p| p.to_path_buf()) .or_else(|| std::env::current_dir().ok()) .unwrap_or_else(|| PathBuf::from(".")); // Check session grants first (synchronous) if let Some(resolved) = tool.resolved_file_path() && io.edit_permissions.has_valid_grant(&resolved) { let _ = tx.send(DriverEvent::Fsm(Event::PermissionResolved { tool_id, response: PermissionResponse::SessionGranted, })); return; } tokio::spawn(async move { let response = match PermissionResolver::new(working_dir).await { Ok(resolver) => match resolver.check(&tool).await { Ok(crate::permissions::check::PermissionResponse::Allowed) => { PermissionResponse::Allowed } Ok(crate::permissions::check::PermissionResponse::Denied) => { PermissionResponse::Denied } Ok(crate::permissions::check::PermissionResponse::Ask) => { PermissionResponse::Ask } Err(_) => PermissionResponse::Ask, }, Err(_) => PermissionResponse::Ask, }; let _ = tx.send(DriverEvent::Fsm(Event::PermissionResolved { tool_id, response, })); }); } Effect::ExecuteTool { tool_id, tool } => { let tool_id = tool_id.clone(); let tool = tool.clone(); let tx = tx.clone(); let db = io.app_ctx.history_db.clone(); match &tool { ClientToolCall::Shell(shell_call) => { let shell_call = shell_call.clone(); let tx_preview = tx.clone(); let tool_id_for_preview = tool_id.clone(); // Create interrupt channel and store the sender for AbortTool let (interrupt_tx, interrupt_rx) = tokio::sync::oneshot::channel(); tool_abort_txs.insert(tool_id.clone(), interrupt_tx); tokio::spawn(async move { let (output_tx, mut output_rx) = tokio::sync::mpsc::channel::>(16); let preview_id = tool_id_for_preview; let tx_fwd = tx_preview; tokio::spawn(async move { while let Some(lines) = output_rx.recv().await { let _ = tx_fwd.send(DriverEvent::Fsm(Event::ToolPreviewUpdate { tool_id: preview_id.clone(), lines, exit_code: None, })); } }); let outcome = crate::tools::execute_shell_command_streaming( &shell_call, output_tx, interrupt_rx, ) .await; let preview = if let crate::tools::ToolOutcome::Structured { exit_code, .. } = &outcome { Some(ToolPreviewData::Shell { lines: vec![], exit_code: *exit_code, // Reason is set by the FSM in handle_tool_done // based on whether it was a user interrupt or timeout. interrupted: None, }) } else { None }; let _ = tx.send(DriverEvent::Fsm(Event::ToolExecutionDone { tool_id, outcome, preview, })); }); } ClientToolCall::Edit(edit_call) => { let resolved = edit_call.resolved_path(); // Capture old content for snapshot + diff preview let old_content = std::fs::read(&resolved).ok(); if let Some(ref content) = old_content && let Some(ref mut store) = io.snapshot_store && let Err(e) = store.ensure_snapshot(&resolved, content) { tracing::warn!("Failed to snapshot before edit: {e}"); } // Edit is fast (file read + string replace + write) — run inline let (outcome, new_content) = edit_call.execute(&resolved, &io.file_tracker); // Update file tracker with new content if let Some(new_bytes) = &new_content && let Ok(mtime) = std::fs::metadata(&resolved).and_then(|m| m.modified()) { io.file_tracker .update_after_edit(&resolved, new_bytes, mtime); } // Compute diff preview let preview = match (&old_content, &new_content) { (Some(old_bytes), Some(new_bytes)) => { let old_str = String::from_utf8_lossy(old_bytes); let new_str = String::from_utf8_lossy(new_bytes); let diff = crate::diff::EditPreview::compute(&old_str, &new_str); if diff.hunks.is_empty() { None } else { Some(ToolPreviewData::Edit(diff)) } } _ => None, }; let _ = tx.send(DriverEvent::Fsm(Event::ToolExecutionDone { tool_id, outcome, preview, })); } ClientToolCall::Write(write_call) => { let resolved = write_call.resolved_path(); // Snapshot existing file before overwriting if let Ok(content) = std::fs::read(&resolved) && let Some(ref mut store) = io.snapshot_store && let Err(e) = store.ensure_snapshot(&resolved, &content) { tracing::warn!("Failed to snapshot before write: {e}"); } // Write is fast (atomic file write) — run inline let (outcome, written_bytes) = write_call.execute(&resolved); // Update file tracker with new content if let Some(new_bytes) = &written_bytes && let Ok(mtime) = std::fs::metadata(&resolved).and_then(|m| m.modified()) { io.file_tracker .update_after_edit(&resolved, new_bytes, mtime); } let preview = if !outcome.is_error() { Some(ToolPreviewData::Write( crate::diff::WritePreview::from_content(&write_call.content), )) } else { None }; let _ = tx.send(DriverEvent::Fsm(Event::ToolExecutionDone { tool_id, outcome, preview, })); } ClientToolCall::Read(read_call) => { // Read is fast (file read) — run inline so we can update file_tracker let outcome = read_call.execute(); // Track the read for freshness checking on subsequent edits if !outcome.is_error() { let resolved = read_call.resolved_path(); if resolved.is_file() && let Ok(content) = std::fs::read(&resolved) && let Ok(mtime) = std::fs::metadata(&resolved).and_then(|m| m.modified()) { io.file_tracker.record_read(resolved, &content, mtime); } } let _ = tx.send(DriverEvent::Fsm(Event::ToolExecutionDone { tool_id, outcome, preview: None, })); } ClientToolCall::AtuinHistory(_) => { // History search needs async DB access tokio::spawn(async move { let outcome = tool.execute(&db).await; let _ = tx.send(DriverEvent::Fsm(Event::ToolExecutionDone { tool_id, outcome, preview: None, })); }); } ClientToolCall::LoadSkill(skill_call) => { let skill_name = skill_call.name.clone(); let registry = io.skill_registry.clone(); let shell = io .client_ctx .shell .clone() .unwrap_or_else(|| "sh".to_string()); tokio::spawn(async move { let content = load_skill_content(®istry, &skill_name, &shell, None).await; let outcome = crate::tools::ToolOutcome::Success(content); let _ = tx.send(DriverEvent::Fsm(Event::ToolExecutionDone { tool_id, outcome, preview: None, })); }); } } } Effect::LoadSkill { name, arguments } => { let name = name.clone(); let arguments = arguments.clone(); let registry = io.skill_registry.clone(); let shell = io .client_ctx .shell .clone() .unwrap_or_else(|| "sh".to_string()); let tx = tx.clone(); tokio::spawn(async move { let content = load_skill_content(®istry, &name, &shell, arguments.as_deref()).await; let _ = tx.send(DriverEvent::Fsm(Event::SkillLoaded { name, arguments, content, })); }); } Effect::AbortTool { tool_id } => { if let Some(abort_tx) = tool_abort_txs.remove(tool_id) { let _ = abort_tx.send(()); } } Effect::Persist => { // Handled inline in the driver loop (before this function is called). } Effect::WritePermissionRule { target, rule, disposition, } => { let file_path = match target { PermissionTarget::Project => { let project_root = io .app_ctx .git_root .clone() .or_else(|| std::env::current_dir().ok()) .unwrap_or_else(|| PathBuf::from(".")); writer::project_permissions_path(&project_root) } PermissionTarget::Global => writer::global_permissions_path(), }; let rule = rule.clone(); let disposition = disposition.clone(); tokio::spawn(async move { if let Err(e) = writer::write_rule(&file_path, &rule, disposition).await { tracing::error!("Failed to write permission rule: {e}"); } }); } Effect::CacheSessionGrant { path } => { io.edit_permissions.grant(path.clone()); } Effect::ArchiveSession => { let rt = tokio::runtime::Handle::current(); if let Err(e) = rt.block_on(io.session_mgr.archive_and_reset()) { tracing::warn!("Failed to archive session: {e}"); } } Effect::ScheduleTimeout { timeout_id, duration, kind, } => { let timeout_id = *timeout_id; let duration = *duration; let kind = kind.clone(); let tx = tx.clone(); tokio::spawn(async move { tokio::time::sleep(duration).await; use crate::fsm::effects::TimeoutKind; let event = match kind { TimeoutKind::Confirmation => Event::ConfirmationTimeout { timeout_id }, TimeoutKind::ToolExecution { tool_id } => Event::ToolExecutionTimeout { timeout_id, tool_id, }, }; let _ = tx.send(DriverEvent::Fsm(event)); }); } Effect::ExitApp(action) => { let action = action.clone(); handle.update(move |vs| { vs.exit_action = Some(action); }); exiting.store(true, Ordering::Release); let h2 = handle.clone(); h2.exit(); } } } // ============================================================================ // Persistence // ============================================================================ fn persist(fsm: &AgentFsm, io: &mut IoContext) { let start = std::time::Instant::now(); let rt = tokio::runtime::Handle::current(); if let Err(e) = rt.block_on(io.session_mgr.persist_events(&fsm.ctx.events)) { tracing::warn!("Failed to persist session events: {e}"); } if let Some(ref sid) = fsm.ctx.session_id && let Err(e) = rt.block_on(io.session_mgr.persist_server_session_id(sid)) { tracing::warn!("Failed to persist server session ID: {e}"); } if let Ok(json) = io.file_tracker.to_json() && let Err(e) = rt.block_on( io.session_mgr .set_metadata(crate::file_tracker::METADATA_KEY, &json), ) { tracing::warn!("Failed to persist file tracker: {e}"); } if let Ok(json) = io.edit_permissions.to_json() && let Err(e) = rt.block_on( io.session_mgr .set_metadata(crate::edit_permissions::METADATA_KEY, &json), ) { tracing::warn!("Failed to persist edit permissions: {e}"); } tracing::trace!(elapsed_ms = start.elapsed().as_millis(), "persist complete"); } // ============================================================================ // Skill loading // ============================================================================ async fn load_skill_content( registry: &crate::skills::SkillRegistry, name: &str, shell: &str, arguments: Option<&str>, ) -> String { match registry.load(name, shell, arguments).await { Ok(body) => body, Err(e) => format!("Failed to load skill '{name}': {e}"), } } // ============================================================================ // Stream bridge // ============================================================================ async fn run_stream_bridge( request: ChatRequest, app_ctx: AppContext, client_ctx: ClientContext, tx: mpsc::Sender, mut cancel_rx: tokio::sync::watch::Receiver<()>, skill_summaries: Vec, skill_overflow: Option, ) { use crate::stream::{StreamContent, StreamControl, StreamFrame, create_chat_stream}; use futures::StreamExt; // Gather user context files (TERMINAL.md) and interpolate commands. let shell = client_ctx.shell.as_deref().unwrap_or("sh"); let start_dir = std::env::current_dir().unwrap_or_default(); let global_ctx_path = crate::user_context::global_context_path(); let user_contexts = crate::user_context::gather(&start_dir, Some(&global_ctx_path), shell).await; let stream = create_chat_stream( app_ctx.endpoint.clone(), app_ctx.token.clone(), request, client_ctx, app_ctx.send_cwd, app_ctx.last_command.clone(), user_contexts, skill_summaries, skill_overflow, ); futures::pin_mut!(stream); let _ = tx.send(DriverEvent::Fsm(Event::StreamStarted)); loop { // Select between the next stream frame and cancellation. // When the driver drops the cancel sender, changed() returns Err // and we break — dropping the HTTP stream and cancelling the request. let frame = tokio::select! { biased; _ = cancel_rx.changed() => break, frame = stream.next() => match frame { Some(frame) => frame, None => break, }, }; let event = match frame { Ok(StreamFrame::Content(content)) => match content { StreamContent::TextChunk(text) => Some(Event::StreamChunk(text)), StreamContent::ToolCall { id, name, input } => { if name == "suggest_command" { Some(Event::SuggestCommand { id, input }) } else { Some(Event::StreamToolCall { id, name, input }) } } StreamContent::ToolResult { tool_use_id, content, is_error, remote, content_length, } => Some(Event::StreamServerToolResult { tool_use_id, content, is_error, remote, content_length, }), }, Ok(StreamFrame::Control(control)) => match control { StreamControl::StatusChanged(status) => Some(Event::StreamStatusChanged(status)), StreamControl::Done { session_id } => Some(Event::StreamDone { session_id }), StreamControl::Error(msg) => Some(Event::StreamError(msg)), }, Err(e) => Some(Event::StreamError(e.to_string())), }; if let Some(event) = event { // StreamDone and StreamError are terminal — the server won't send more. // SuggestCommand is NOT terminal: the server sends StreamDone after it // with the session_id we need to capture. let is_terminal = matches!(event, Event::StreamDone { .. } | Event::StreamError(_)); if tx.send(DriverEvent::Fsm(event)).is_err() { break; } if is_terminal { break; } } } }