aboutsummaryrefslogtreecommitdiffstats
path: root/crates/turtle/src/atuin_daemon/server.rs
blob: 36954cca1c4dabdf6bfa330d5fd05c29aff0143f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use eyre::Result;

use crate::{
    atuin_client::settings::Settings,
    atuin_daemon::{
        components::{
            history::HistoryGrpcService, search::SearchGrpcService, semantic::SemanticGrpcService,
        },
        daemon::DaemonHandle,
        generated::{
            control::{ControlService, control_server::ControlServer},
            history::history_server::HistoryServer,
            search::search_server::SearchServer,
            semantic::semantic_server::SemanticServer,
        },
    },
};

/// Run the gRPC server with the given services.
///
/// This starts the gRPC server in the background and returns immediately.
/// The server will shut down when a [`ShutdownRequested`] event is received.
#[cfg(unix)]
pub(crate) fn run_grpc_server(
    settings: &Settings,
    history_service: HistoryServer<HistoryGrpcService>,
    search_service: SearchServer<SearchGrpcService>,
    semantic_service: SemanticServer<SemanticGrpcService>,
    control_service: ControlServer<ControlService>,
    handle: DaemonHandle,
) -> Result<()> {
    use tokio::net::UnixListener;
    use tokio_stream::wrappers::UnixListenerStream;

    let socket_path = settings.daemon.socket_path.clone();

    let (uds, cleanup) = if cfg!(target_os = "linux") && settings.daemon.systemd_socket {
        #[cfg(target_os = "linux")]
        {
            use eyre::{OptionExt, WrapErr};
            use std::os::unix::net::SocketAddr;
            use std::path::PathBuf;
            tracing::info!("getting systemd socket");
            let listener = listenfd::ListenFd::from_env()
                .take_unix_listener(0)?
                .ok_or_eyre("missing systemd socket")?;
            listener.set_nonblocking(true)?;
            let actual_path: Result<PathBuf, eyre::Report> = listener
                .local_addr()
                .context("getting systemd socket's path")
                .and_then(|addr: SocketAddr| {
                    addr.as_pathname()
                        .ok_or_eyre("systemd socket missing path")
                        .map(|path: &std::path::Path| path.to_owned())
                });
            match actual_path {
                Ok(actual_path) => {
                    tracing::info!("listening on systemd socket: {actual_path:?}");
                    if actual_path != std::path::Path::new(&socket_path) {
                        tracing::warn!(
                            "systemd socket is not at configured client path: {socket_path:?}"
                        );
                    }
                }
                Err(err) => {
                    tracing::warn!(
                        "could not detect systemd socket path, ensure that it's at the configured path: {socket_path:?}, error: {err:?}"
                    );
                }
            }
            (UnixListener::from_std(listener)?, false)
        }
    } else {
        tracing::info!("listening on unix socket {socket_path:?}");
        (UnixListener::bind(socket_path.clone())?, true)
    };

    let uds_stream = UnixListenerStream::new(uds);

    // Create shutdown signal from daemon handle
    let shutdown_signal = async move {
        let mut rx = handle.subscribe();

        loop {
            use crate::atuin_daemon::DaemonEvent;

            match rx.recv().await {
                Err(_) | Ok(DaemonEvent::ShutdownRequested) => break,
                Ok(_) => (),
            }
        }

        if cleanup {
            eprintln!("Removing socket...");
            if let Err(e) = std::fs::remove_file(&socket_path)
                && e.kind() != std::io::ErrorKind::NotFound
            {
                eprintln!("failed to remove socket: {e}");
            }
        }
        eprintln!("Shutting down gRPC server...");
    };

    // Spawn the server in the background
    tokio::spawn(async move {
        use tonic::transport::Server;

        if let Err(e) = Server::builder()
            .add_service(history_service)
            .add_service(search_service)
            .add_service(semantic_service)
            .add_service(control_service)
            .serve_with_incoming_shutdown(uds_stream, shutdown_signal)
            .await
        {
            tracing::error!("gRPC server error: {e}");
        }
    });

    Ok(())
}