diff options
| author | Michelle Tilley <michelle@michelletilley.net> | 2026-04-21 13:07:27 -0700 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2026-04-21 13:07:27 -0700 |
| commit | 2f702ad446fcd6a261a3bea0ab2807d70eca43e2 (patch) | |
| tree | 4cfa6276257cefbe73f7fa46a74026170aaf8435 /crates/atuin-ai/src/stream.rs | |
| parent | docs: document show_numeric_shortcuts (#3433) (diff) | |
| download | atuin-2f702ad446fcd6a261a3bea0ab2807d70eca43e2.zip | |
refactor: Replace ad-hoc dispatch with FSM + driver architecture (#3434)
Replaces the tangled dispatch handler system (`tui/dispatch.rs`,
`tui/state.rs`) with a pure finite state machine + driver architecture.
The FSM handles all state transitions as explicit `(State, Event) →
(NewState, Effects)` mappings. The driver executes IO effects and
bridges the TUI to the FSM.
Diffstat (limited to 'crates/atuin-ai/src/stream.rs')
| -rw-r--r-- | crates/atuin-ai/src/stream.rs | 154 |
1 files changed, 2 insertions, 152 deletions
diff --git a/crates/atuin-ai/src/stream.rs b/crates/atuin-ai/src/stream.rs index 24770abe..19d287e7 100644 --- a/crates/atuin-ai/src/stream.rs +++ b/crates/atuin-ai/src/stream.rs @@ -2,23 +2,16 @@ // SSE streaming // ─────────────────────────────────────────────────────────────────── -use std::sync::mpsc; - use atuin_client::settings::AiCapabilities; use atuin_common::tls::ensure_crypto_provider; use eventsource_stream::Eventsource; -use eye_declare::Handle; use eyre::{Context, Result}; use futures::StreamExt; use reqwest::Url; use reqwest::header::USER_AGENT; -use crate::{ - context::{AppContext, ClientContext}, - tools::ClientToolCall, - tui::{Session, events::AiTuiEvent}, -}; +use crate::context::ClientContext; static APP_USER_AGENT: &str = concat!("atuin/", env!("CARGO_PKG_VERSION")); @@ -100,7 +93,7 @@ impl ChatRequest { } } -fn create_chat_stream( +pub(crate) fn create_chat_stream( hub_address: String, token: String, request: ChatRequest, @@ -244,149 +237,6 @@ fn create_chat_stream( }) } -// ─────────────────────────────────────────────────────────────────── -// Async streaming task — pushes updates to app state via Handle -// ─────────────────────────────────────────────────────────────────── - -pub(crate) async fn run_chat_stream( - handle: Handle<Session>, - tx: mpsc::Sender<AiTuiEvent>, - app_ctx: AppContext, - client_ctx: ClientContext, - request: ChatRequest, -) { - let capabilities = request.capabilities.clone(); - let stream = create_chat_stream( - app_ctx.endpoint.clone(), - app_ctx.token.clone(), - request, - client_ctx, - app_ctx.send_cwd, - app_ctx.last_command.clone(), - ); - futures::pin_mut!(stream); - - while let Some(event) = stream.next().await { - match event { - Ok(StreamFrame::Content(content)) => { - apply_content_frame(&handle, &tx, &capabilities, content); - } - Ok(StreamFrame::Control(control)) => { - let terminal = apply_control_frame(&handle, control); - if terminal { - break; - } - } - Err(e) => { - let msg = e.to_string(); - handle.update(move |state| { - state.streaming_error(msg); - }); - break; - } - } - } -} - -/// Apply a content frame to session state. -/// Control flow: always continues the stream. -fn apply_content_frame( - handle: &Handle<Session>, - tx: &mpsc::Sender<AiTuiEvent>, - capabilities: &[String], - content: StreamContent, -) { - match content { - StreamContent::TextChunk(text) => { - handle.update(move |state| { - state.conversation.append_streaming_text(&text); - }); - } - StreamContent::ToolCall { id, name, input } => { - if let Ok(tool) = ClientToolCall::try_from((name.as_str(), &input)) { - // Enforce capability gating: reject tool calls the client didn't advertise. - if let Some(required_cap) = tool.descriptor().capability - && !capabilities.iter().any(|c| c == required_cap) - { - tracing::warn!( - tool = name, - capability = required_cap, - "Rejecting tool call: capability not advertised" - ); - handle.update(move |state| { - state.add_tool_call(id.clone(), name, input.clone()); - state.conversation.add_tool_result( - id, - format!("Tool not enabled: capability '{required_cap}' was not advertised by this client"), - true, - false, - None, - ); - }); - return; - } - - // Client-side tool — add to tracker and conversation, queue permission check - let id_for_event = id.clone(); - handle.update(move |state| { - state.handle_client_tool_call(id_for_event, tool, input); - }); - let _ = tx.send(AiTuiEvent::CheckToolCallPermission(id)); - } else { - // Server-side tool — just add to conversation events - handle.update(move |state| { - state.add_tool_call(id, name, input); - }); - } - } - StreamContent::ToolResult { - tool_use_id, - content, - is_error, - remote, - content_length, - } => { - handle.update(move |state| { - state.conversation.add_tool_result( - tool_use_id, - content, - is_error, - remote, - content_length, - ); - }); - } - } -} - -/// Apply a control frame to session state. -/// Returns true if the stream should terminate. -fn apply_control_frame(handle: &Handle<Session>, control: StreamControl) -> bool { - match control { - StreamControl::StatusChanged(status) => { - handle.update(move |state| { - state.update_streaming_status(&status); - }); - false - } - StreamControl::Done { session_id } => { - handle.update(move |state| { - if !session_id.is_empty() { - state.conversation.store_session_id(session_id); - } - state.finalize_streaming(); - }); - true - } - StreamControl::Error(msg) => { - handle.update(move |state| { - state.streaming_error(msg); - }); - true - } - } -} - fn hub_url(base: &str, path: &str) -> Result<Url> { let base_with_slash = if base.ends_with('/') { base.to_string() |
