1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
use eyre::Result;
use crate::atuin_daemon::components::history::HistoryGrpcService;
use crate::atuin_daemon::components::search::SearchGrpcService;
use crate::atuin_daemon::components::semantic::SemanticGrpcService;
use crate::atuin_daemon::control::{ControlService, control_server::ControlServer};
use crate::atuin_daemon::daemon::DaemonHandle;
use crate::atuin_daemon::history::history_server::HistoryServer;
use crate::atuin_daemon::search::search_server::SearchServer;
use crate::atuin_daemon::semantic::semantic_server::SemanticServer;
use crate::atuin_client::settings::Settings;
/// Run the gRPC server with the given services.
///
/// This starts the gRPC server in the background and returns immediately.
/// The server will shut down when a ShutdownRequested event is received.
#[cfg(unix)]
pub(crate) async fn run_grpc_server(
settings: Settings,
history_service: HistoryServer<HistoryGrpcService>,
search_service: SearchServer<SearchGrpcService>,
semantic_service: SemanticServer<SemanticGrpcService>,
control_service: ControlServer<ControlService>,
handle: DaemonHandle,
) -> Result<()> {
use tokio::net::UnixListener;
use tokio_stream::wrappers::UnixListenerStream;
let socket_path = settings.daemon.socket_path.clone();
let (uds, cleanup) = if cfg!(target_os = "linux") && settings.daemon.systemd_socket {
#[cfg(target_os = "linux")]
{
use eyre::{OptionExt, WrapErr};
use std::os::unix::net::SocketAddr;
use std::path::PathBuf;
tracing::info!("getting systemd socket");
let listener = listenfd::ListenFd::from_env()
.take_unix_listener(0)?
.ok_or_eyre("missing systemd socket")?;
listener.set_nonblocking(true)?;
let actual_path: Result<PathBuf, eyre::Report> = listener
.local_addr()
.context("getting systemd socket's path")
.and_then(|addr: SocketAddr| {
addr.as_pathname()
.ok_or_eyre("systemd socket missing path")
.map(|path: &std::path::Path| path.to_owned())
});
match actual_path {
Ok(actual_path) => {
tracing::info!("listening on systemd socket: {actual_path:?}");
if actual_path != std::path::Path::new(&socket_path) {
tracing::warn!(
"systemd socket is not at configured client path: {socket_path:?}"
);
}
}
Err(err) => {
tracing::warn!(
"could not detect systemd socket path, ensure that it's at the configured path: {socket_path:?}, error: {err:?}"
);
}
}
(UnixListener::from_std(listener)?, false)
}
} else {
tracing::info!("listening on unix socket {socket_path:?}");
(UnixListener::bind(socket_path.clone())?, true)
};
let uds_stream = UnixListenerStream::new(uds);
// Create shutdown signal from daemon handle
let shutdown_signal = async move {
let mut rx = handle.subscribe();
loop {
use crate::atuin_daemon::DaemonEvent;
match rx.recv().await {
Ok(DaemonEvent::ShutdownRequested) => break,
Ok(_) => continue,
Err(_) => break, // Channel closed
}
}
if cleanup {
eprintln!("Removing socket...");
if let Err(e) = std::fs::remove_file(&socket_path)
&& e.kind() != std::io::ErrorKind::NotFound
{
eprintln!("failed to remove socket: {e}");
}
}
eprintln!("Shutting down gRPC server...");
};
// Spawn the server in the background
tokio::spawn(async move {
use tonic::transport::Server;
if let Err(e) = Server::builder()
.add_service(history_service)
.add_service(search_service)
.add_service(semantic_service)
.add_service(control_service)
.serve_with_incoming_shutdown(uds_stream, shutdown_signal)
.await
{
tracing::error!("gRPC server error: {e}");
}
});
Ok(())
}
|