aboutsummaryrefslogtreecommitdiffstats
path: root/crates/atuin-ai/src/stream.rs
diff options
context:
space:
mode:
authorMichelle Tilley <michelle@michelletilley.net>2026-04-21 13:07:27 -0700
committerGitHub <noreply@github.com>2026-04-21 13:07:27 -0700
commit2f702ad446fcd6a261a3bea0ab2807d70eca43e2 (patch)
tree4cfa6276257cefbe73f7fa46a74026170aaf8435 /crates/atuin-ai/src/stream.rs
parentdocs: document show_numeric_shortcuts (#3433) (diff)
downloadatuin-2f702ad446fcd6a261a3bea0ab2807d70eca43e2.zip
refactor: Replace ad-hoc dispatch with FSM + driver architecture (#3434)
Replaces the tangled dispatch handler system (`tui/dispatch.rs`, `tui/state.rs`) with a pure finite state machine + driver architecture. The FSM handles all state transitions as explicit `(State, Event) → (NewState, Effects)` mappings. The driver executes IO effects and bridges the TUI to the FSM.
Diffstat (limited to 'crates/atuin-ai/src/stream.rs')
-rw-r--r--crates/atuin-ai/src/stream.rs154
1 files changed, 2 insertions, 152 deletions
diff --git a/crates/atuin-ai/src/stream.rs b/crates/atuin-ai/src/stream.rs
index 24770abe..19d287e7 100644
--- a/crates/atuin-ai/src/stream.rs
+++ b/crates/atuin-ai/src/stream.rs
@@ -2,23 +2,16 @@
// SSE streaming
// ───────────────────────────────────────────────────────────────────
-use std::sync::mpsc;
-
use atuin_client::settings::AiCapabilities;
use atuin_common::tls::ensure_crypto_provider;
use eventsource_stream::Eventsource;
-use eye_declare::Handle;
use eyre::{Context, Result};
use futures::StreamExt;
use reqwest::Url;
use reqwest::header::USER_AGENT;
-use crate::{
- context::{AppContext, ClientContext},
- tools::ClientToolCall,
- tui::{Session, events::AiTuiEvent},
-};
+use crate::context::ClientContext;
static APP_USER_AGENT: &str = concat!("atuin/", env!("CARGO_PKG_VERSION"));
@@ -100,7 +93,7 @@ impl ChatRequest {
}
}
-fn create_chat_stream(
+pub(crate) fn create_chat_stream(
hub_address: String,
token: String,
request: ChatRequest,
@@ -244,149 +237,6 @@ fn create_chat_stream(
})
}
-// ───────────────────────────────────────────────────────────────────
-// Async streaming task — pushes updates to app state via Handle
-// ───────────────────────────────────────────────────────────────────
-
-pub(crate) async fn run_chat_stream(
- handle: Handle<Session>,
- tx: mpsc::Sender<AiTuiEvent>,
- app_ctx: AppContext,
- client_ctx: ClientContext,
- request: ChatRequest,
-) {
- let capabilities = request.capabilities.clone();
- let stream = create_chat_stream(
- app_ctx.endpoint.clone(),
- app_ctx.token.clone(),
- request,
- client_ctx,
- app_ctx.send_cwd,
- app_ctx.last_command.clone(),
- );
- futures::pin_mut!(stream);
-
- while let Some(event) = stream.next().await {
- match event {
- Ok(StreamFrame::Content(content)) => {
- apply_content_frame(&handle, &tx, &capabilities, content);
- }
- Ok(StreamFrame::Control(control)) => {
- let terminal = apply_control_frame(&handle, control);
- if terminal {
- break;
- }
- }
- Err(e) => {
- let msg = e.to_string();
- handle.update(move |state| {
- state.streaming_error(msg);
- });
- break;
- }
- }
- }
-}
-
-/// Apply a content frame to session state.
-/// Control flow: always continues the stream.
-fn apply_content_frame(
- handle: &Handle<Session>,
- tx: &mpsc::Sender<AiTuiEvent>,
- capabilities: &[String],
- content: StreamContent,
-) {
- match content {
- StreamContent::TextChunk(text) => {
- handle.update(move |state| {
- state.conversation.append_streaming_text(&text);
- });
- }
- StreamContent::ToolCall { id, name, input } => {
- if let Ok(tool) = ClientToolCall::try_from((name.as_str(), &input)) {
- // Enforce capability gating: reject tool calls the client didn't advertise.
- if let Some(required_cap) = tool.descriptor().capability
- && !capabilities.iter().any(|c| c == required_cap)
- {
- tracing::warn!(
- tool = name,
- capability = required_cap,
- "Rejecting tool call: capability not advertised"
- );
- handle.update(move |state| {
- state.add_tool_call(id.clone(), name, input.clone());
- state.conversation.add_tool_result(
- id,
- format!("Tool not enabled: capability '{required_cap}' was not advertised by this client"),
- true,
- false,
- None,
- );
- });
- return;
- }
-
- // Client-side tool — add to tracker and conversation, queue permission check
- let id_for_event = id.clone();
- handle.update(move |state| {
- state.handle_client_tool_call(id_for_event, tool, input);
- });
- let _ = tx.send(AiTuiEvent::CheckToolCallPermission(id));
- } else {
- // Server-side tool — just add to conversation events
- handle.update(move |state| {
- state.add_tool_call(id, name, input);
- });
- }
- }
- StreamContent::ToolResult {
- tool_use_id,
- content,
- is_error,
- remote,
- content_length,
- } => {
- handle.update(move |state| {
- state.conversation.add_tool_result(
- tool_use_id,
- content,
- is_error,
- remote,
- content_length,
- );
- });
- }
- }
-}
-
-/// Apply a control frame to session state.
-/// Returns true if the stream should terminate.
-fn apply_control_frame(handle: &Handle<Session>, control: StreamControl) -> bool {
- match control {
- StreamControl::StatusChanged(status) => {
- handle.update(move |state| {
- state.update_streaming_status(&status);
- });
- false
- }
- StreamControl::Done { session_id } => {
- handle.update(move |state| {
- if !session_id.is_empty() {
- state.conversation.store_session_id(session_id);
- }
- state.finalize_streaming();
- });
- true
- }
- StreamControl::Error(msg) => {
- handle.update(move |state| {
- state.streaming_error(msg);
- });
- true
- }
- }
-}
-
fn hub_url(base: &str, path: &str) -> Result<Url> {
let base_with_slash = if base.ends_with('/') {
base.to_string()