diff options
| author | Benedikt Peetz <benedikt.peetz@b-peetz.de> | 2026-01-24 23:51:04 +0100 |
|---|---|---|
| committer | Benedikt Peetz <benedikt.peetz@b-peetz.de> | 2026-01-24 23:51:19 +0100 |
| commit | 38c6f95c94830ed8eb6c6678e303480257cf3cf5 (patch) | |
| tree | e5bd55a6306c5f787ff49d528810c62b5c5dbb32 /pkgs/by-name/mp/mpdpopm/src/clients.rs | |
| parent | module/mpd: Set-up a sticker file (diff) | |
| download | nixos-config-38c6f95c94830ed8eb6c6678e303480257cf3cf5.zip | |
pkgs/mpdpopm: Init
This is based on https://github.com/sp1ff/mpdpopm at commit 178df8ad3a5c39281cfd8b3cec05394f4c9256fd.
Diffstat (limited to 'pkgs/by-name/mp/mpdpopm/src/clients.rs')
| -rw-r--r-- | pkgs/by-name/mp/mpdpopm/src/clients.rs | 1417 |
1 files changed, 1417 insertions, 0 deletions
diff --git a/pkgs/by-name/mp/mpdpopm/src/clients.rs b/pkgs/by-name/mp/mpdpopm/src/clients.rs new file mode 100644 index 00000000..587063b2 --- /dev/null +++ b/pkgs/by-name/mp/mpdpopm/src/clients.rs @@ -0,0 +1,1417 @@ +// Copyright (C) 2020-2025 Michael herstine <sp1ff@pobox.com> +// +// This file is part of mpdpopm. +// +// mpdpopm is free software: you can redistribute it and/or modify it under the terms of the GNU +// General Public License as published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// mpdpopm is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even +// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General +// Public License for more details. +// +// You should have received a copy of the GNU General Public License along with mpdpopm. If not, +// see <http://www.gnu.org/licenses/>. + +//! mpd clients and associated utilities. +//! +//! # Introduction +//! +//! This module contains basic types implementing various MPD client operations (cf. the [mpd +//! protocol](http://www.musicpd.org/doc/protocol/)). Since issuing the "idle" command will tie up +//! the connection, MPD clients often use multiple connections to the server (one to listen for +//! updates, one or more on which to issue commands). This modules provides two different client +//! types: [Client] for general-purpose use and [IdleClient] for long-lived connections listening +//! for server notifiations. +//! +//! Note that there *is* another idiom (used in [libmpdel](https://github.com/mpdel/libmpdel), +//! e.g.): open a single connection & issue an "idle" command. When you want to issue a command, +//! send a "noidle", then the command, then "idle" again. This isn't a race condition, as the +//! server will buffer any changes that took place when you were not idle & send them when you +//! re-issue the "idle" command. This crate however takes the approach of two channels (like +//! [mpdfav](https://github.com/vincent-petithory/mpdfav)). + +use async_trait::async_trait; +use regex::Regex; +use snafu::{Backtrace, IntoError, OptionExt, ResultExt, prelude::*}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use tokio::net::{TcpStream, ToSocketAddrs, UnixStream}; +use tracing::{debug, info}; + +use lazy_static::lazy_static; + +use std::{ + collections::HashMap, + convert::TryFrom, + fmt, + marker::{Send, Unpin}, + path::{Path, PathBuf}, + str::FromStr, +}; + +// The Protocol error, below, gets used a *lot*; anywhere we receive a message from the MPD server +// that "should" never happen. To help give a bit of context beyond a stack trace, I use this +// enumeration of "operations" +/// Enumerated list of MPD operations; used in Error::Protocol to distinguish which operation it was +/// that elicited the protocol error. +#[derive(Debug)] +#[non_exhaustive] +pub enum Operation { + Connect, + Status, + GetSticker, + SetSticker, + SendToPlaylist, + SendMessage, + Update, + GetStoredPlaylists, + RspToUris, + GetStickers, + GetAllSongs, + Add, + Idle, + GetMessages, +} + +impl std::fmt::Display for Operation { + #[allow(unreachable_patterns)] // the _ arm is *currently* unreachable + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + Operation::Connect => write!(f, "Connect"), + Operation::Status => write!(f, "Status"), + Operation::GetSticker => write!(f, "GetSticker"), + Operation::SetSticker => write!(f, "SetSticker"), + Operation::SendToPlaylist => write!(f, "SendToPlaylist"), + Operation::SendMessage => write!(f, "SendMessage"), + Operation::Update => write!(f, "Update"), + Operation::GetStoredPlaylists => write!(f, "GetStoredPlaylists"), + Operation::RspToUris => write!(f, "RspToUris"), + Operation::GetStickers => write!(f, "GetStickers"), + Operation::GetAllSongs => write!(f, "GetAllSongs"), + Operation::Add => write!(f, "Add"), + Operation::Idle => write!(f, "Idle"), + Operation::GetMessages => write!(f, "GetMessages"), + _ => write!(f, "Unknown client operation"), + } + } +} + +/// An MPD client error +#[derive(Debug, Snafu)] +#[non_exhaustive] +pub enum Error { + #[snafu(display("Protocol error ({}): {}", op, msg))] + Protocol { + op: Operation, + msg: String, + backtrace: Backtrace, + }, + #[snafu(display("Protocol errror ({}): {}", op, source))] + ProtocolConv { + op: Operation, + source: Box<dyn std::error::Error>, + backtrace: Backtrace, + }, + #[snafu(display("I/O error: {}", source))] + Io { + source: std::io::Error, + backtrace: Backtrace, + }, + #[snafu(display("Encoding error: {}", source))] + Encoding { + source: std::string::FromUtf8Error, + backtrace: Backtrace, + }, + #[snafu(display("While converting sticker ``{}'': {}", sticker, source))] + StickerConversion { + sticker: String, + source: Box<dyn std::error::Error>, + backtrace: Backtrace, + }, + #[snafu(display("``{}'' is not a recognized Idle subsystem", text))] + IdleSubSystem { text: String, backtrace: Backtrace }, +} + +pub type Result<T> = std::result::Result<T, Error>; + +//////////////////////////////////////////////////////////////////////////////////////////////////// + +/// A description of the current track, suitable for our purposes (as in, it only tracks the +/// attributes needed for this module's functionality). +#[derive(Clone, Debug)] +pub struct CurrentSong { + /// Identifier, unique within the play queue, identifying this particular track; if the same + /// file is listed twice in the `mpd' play queue each instance will get a distinct songid + pub songid: u64, + + /// Path, relative to `mpd' music directory root of this track + pub file: std::path::PathBuf, + + /// Elapsed time, in seconds, in this track + pub elapsed: f64, + + /// Total track duration, in seconds + pub duration: f64, +} + +impl CurrentSong { + fn new(songid: u64, file: std::path::PathBuf, elapsed: f64, duration: f64) -> CurrentSong { + CurrentSong { + songid, + file, + elapsed, + duration, + } + } + /// Compute the ratio of the track that has elapsed, expressed as a floating point between 0 & 1 + pub fn played_pct(&self) -> f64 { + self.elapsed / self.duration + } +} + +/// The MPD player itself can be in one of three states: playing, paused or stopped. In the first +/// two there is a "current" song. +#[derive(Clone, Debug)] +pub enum PlayerStatus { + Play(CurrentSong), + Pause(CurrentSong), + Stopped, +} + +impl PlayerStatus { + pub fn current_song(&self) -> Option<&CurrentSong> { + match self { + PlayerStatus::Play(curr) | PlayerStatus::Pause(curr) => Some(curr), + PlayerStatus::Stopped => None, + } + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// +// Connection // +//////////////////////////////////////////////////////////////////////////////////////////////////// + +/// A trait representing a simple, textual request/response protocol like that +/// [employed](https://www.musicpd.org/doc/html/protocol.html) by [MPD](https://www.musicpd.org/): +/// the caller sends a textual command & the server responds with a (perhaps multi-line) textual +/// response. +/// +/// This trait also enables unit testing client implementations. Note that it is async-- cf. +/// [async_trait](https://docs.rs/async-trait/latest/async_trait/). +#[async_trait] +pub trait RequestResponse { + async fn req(&mut self, msg: &str) -> Result<String>; + /// The hint is used to size the buffer prior to reading the response + async fn req_w_hint(&mut self, msg: &str, hint: usize) -> Result<String>; +} + +#[cfg(test)] +pub mod test_mock { + use super::*; + + /// Mock is an implementation of [`RequestRespone`] that checks expected requests & responses, + /// and will panic if it sees anything unexpected + pub struct Mock { + inmsgs: Vec<String>, + outmsgs: Vec<String>, + } + + impl Mock { + pub fn new(convo: &[(&str, &str)]) -> Mock { + let (left, right): (Vec<&str>, Vec<&str>) = convo.iter().copied().rev().unzip(); + Mock { + inmsgs: left.iter().map(|x| x.to_string()).collect(), + outmsgs: right.iter().map(|x| x.to_string()).collect(), + } + } + } + + #[async_trait] + impl RequestResponse for Mock { + async fn req(&mut self, msg: &str) -> Result<String> { + self.req_w_hint(msg, 512).await + } + async fn req_w_hint(&mut self, msg: &str, _hint: usize) -> Result<String> { + assert_eq!(msg, self.inmsgs.pop().unwrap()); + Ok(self.outmsgs.pop().unwrap()) + } + } + + #[tokio::test] + async fn mock_smoke_test() { + let mut mock = Mock::new(&[("ping", "pong"), ("from", "to")]); + assert_eq!(mock.req("ping").await.unwrap(), "pong"); + assert_eq!(mock.req("from").await.unwrap(), "to"); + } + + #[tokio::test] + #[should_panic] + async fn mock_negative_test() { + let mut mock = Mock::new(&[("ping", "pong")]); + assert_eq!(mock.req("ping").await.unwrap(), "pong"); + let _should_panic = mock.req("not there!").await.unwrap(); + } +} + +/// [MPD](https://www.musicpd.org/) connections talk the same +/// [protocol](https://www.musicpd.org/doc/html/protocol.html) over either a TCP or a Unix socket. +/// +/// # Examples +/// +/// Implementations are provided for tokio [UnixStream] and [TcpStream], but [MpdConnection] is a +/// trait that can work in terms of any asynchronous communications channel (so long as it is also +/// [Send] and [Unpin] so async executors can pass them between threads. +/// +/// To create a connection to an `MPD` server over a Unix domain socket: +/// +/// ```no_run +/// use std::path::Path; +/// use tokio::net::UnixStream; +/// use mpdpopm::clients::MpdConnection; +/// let local_conn = MpdConnection::<UnixStream>::connect(Path::new("/var/run/mpd/mpd.sock")); +/// ``` +/// +/// In this example, `local_conn` is a Future that will resolve to a Result containing the +/// [MpdConnection] Unix domain socket implementation once the socket has been established, the MPD +/// server greets us & the protocol version has been parsed. +/// +/// or over a TCP socket: +/// +/// ```no_run +/// use std::net::SocketAddrV4; +/// use tokio::net::{TcpStream, ToSocketAddrs}; +/// use mpdpopm::clients::MpdConnection; +/// let tcp_conn = MpdConnection::<TcpStream>::connect("localhost:6600".parse::<SocketAddrV4>().unwrap()); +/// ``` +/// +/// Here, `tcp_conn` is a Future that will resolve to a Result containing the [MpdConnection] TCP +/// implementation on successful connection to the MPD server (i.e. the connection is established, +/// the server greets us & we parse the protocol version). +/// +/// +pub struct MpdConnection<T: AsyncRead + AsyncWrite + Send + Unpin> { + sock: T, + _protocol_ver: String, +} + +/// MpdConnection implements RequestResponse using the usual (async) socket I/O +/// +/// The callers need not include the trailing newline in their requests; the implementation will +/// append it. +#[async_trait] +impl<T> RequestResponse for MpdConnection<T> +where + T: AsyncRead + AsyncWrite + Send + Unpin, +{ + async fn req(&mut self, msg: &str) -> Result<String> { + self.req_w_hint(msg, 512).await + } + async fn req_w_hint(&mut self, msg: &str, hint: usize) -> Result<String> { + self.sock + .write_all(format!("{}\n", msg).as_bytes()) + .await + .context(IoSnafu)?; + let mut buf = Vec::with_capacity(hint); + + // Given the request/response nature of the MPD protocol, our callers expect a complete + // response. Therefore we need to loop here until we see either "...^OK\n" or + // "...^ACK...\n". + let mut cb = 0; // # bytes read so far + let mut more = true; // true as long as there is more to read + while more { + cb += self.sock.read_buf(&mut buf).await.context(IoSnafu)?; + + // The shortest complete response has three bytes. If the final byte in `buf' is not a + // newline, then don't bother looking further. + if cb > 2 && char::from(buf[cb - 1]) == '\n' { + // If we're here, `buf' *may* contain a complete response. Search backward for the + // previous newline. It may not exist: many responses are of the form "OK\n". + let mut idx = cb - 2; + while idx > 0 { + if char::from(buf[idx]) == '\n' { + idx += 1; + break; + } + idx -= 1; + } + + if (idx + 2 < cb && char::from(buf[idx]) == 'O' && char::from(buf[idx + 1]) == 'K') + || (idx + 3 < cb + && char::from(buf[idx]) == 'A' + && char::from(buf[idx + 1]) == 'C' + && char::from(buf[idx + 2]) == 'K') + { + more = false; + } + } + } + + // Only doing this to trouble-shoot issue 11 + String::from_utf8(buf.clone()).context(EncodingSnafu) + } +} + +/// Utility function to parse the initial response to a connection from mpd +async fn parse_connect_rsp<T>(sock: &mut T) -> Result<String> +where + T: AsyncReadExt + AsyncWriteExt + Send + Unpin, +{ + let mut buf = Vec::with_capacity(32); + let _cb = sock.read_buf(&mut buf).await.context(IoSnafu)?; + // Only doing this to trouble-shoot issue 11 + let text = String::from_utf8(buf.clone()).context(EncodingSnafu)?; + ensure!( + text.starts_with("OK MPD "), + ProtocolSnafu { + op: Operation::Connect, + msg: text.trim() + } + ); + info!("Connected {}.", text[7..].trim()); + Ok(text[7..].trim().to_string()) +} + +impl MpdConnection<TcpStream> { + pub async fn connect<A: ToSocketAddrs>(addr: A) -> Result<Box<dyn RequestResponse>> { + let mut sock = TcpStream::connect(addr).await.context(IoSnafu)?; + let proto_ver = parse_connect_rsp(&mut sock).await?; + Ok(Box::new(MpdConnection::<TcpStream> { + sock, + _protocol_ver: proto_ver, + })) + } +} + +impl MpdConnection<UnixStream> { + // NTS: we have to box the return value because a `dyn RequestResponse` isn't Sized. + pub async fn connect<P: AsRef<Path>>(pth: P) -> Result<Box<dyn RequestResponse>> { + let mut sock = UnixStream::connect(pth).await.context(IoSnafu)?; + let proto_ver = parse_connect_rsp(&mut sock).await?; + Ok(Box::new(MpdConnection::<UnixStream> { + sock, + _protocol_ver: proto_ver, + })) + } +} + +/// Quote an argument by backslash-escaping " & \ characters +pub fn quote(text: &str) -> String { + if text.contains(&[' ', '\t', '\'', '"'][..]) { + let mut s = String::from("\""); + for c in text.chars() { + if c == '"' || c == '\\' { + s.push('\\'); + } + s.push(c); + } + s.push('"'); + s + } else { + text.to_string() + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// +// Client // +//////////////////////////////////////////////////////////////////////////////////////////////////// + +/// General-purpose [mpd](https://www.musicpd.org) +/// [client](https://www.musicpd.org/doc/html/protocol.html): "general-purpose" in the sense that we +/// send commands through it; the interface is narrowly scoped to this program's needs. +/// +/// # Introduction +/// +/// This is the primary abstraction of the MPD client protocol, written for the convenience of +/// [mpdpopm](crate). Construct instances with a TCP socket, a Unix socket, or any [RequestResponse] +/// implementation. You can then carry out assorted operations in the MPD client protocol by +/// invoking its methods. +/// +/// ```no_run +/// use std::path::Path; +/// use mpdpopm::clients::Client; +/// let client = Client::open(Path::new("/var/run/mpd.sock")); +/// ``` +/// +/// `client` is now a [Future](https://doc.rust-lang.org/stable/std/future/trait.Future.html) that +/// resolves to a [Client] instance talking to `/var/run/mpd.sock`. +/// +/// ```no_run +/// use mpdpopm::clients::Client; +/// let client = Client::connect("localhost:6600"); +/// ``` +/// +/// `client` is now a [Future](https://doc.rust-lang.org/stable/std/future/trait.Future.html) that +/// resolves to a [Client] instance talking TCP to the MPD server on localhost at port 6600. +pub struct Client { + stream: Box<dyn RequestResponse>, +} + +// Thanks to <https://stackoverflow.com/questions/35169259/how-to-make-a-compiled-regexp-a-global-variable> +lazy_static! { + static ref RE_STATE: regex::Regex = Regex::new(r"(?m)^state: (play|pause|stop)$").unwrap(); + static ref RE_SONGID: regex::Regex = Regex::new(r"(?m)^songid: ([0-9]+)$").unwrap(); + static ref RE_ELAPSED: regex::Regex = Regex::new(r"(?m)^elapsed: ([.0-9]+)$").unwrap(); + static ref RE_FILE: regex::Regex = Regex::new(r"(?m)^file: (.*)$").unwrap(); + static ref RE_DURATION: regex::Regex = Regex::new(r"(?m)^duration: (.*)$").unwrap(); +} + +impl Client { + pub async fn connect<A: ToSocketAddrs>(addrs: A) -> Result<Client> { + Self::new(MpdConnection::<TcpStream>::connect(addrs).await?) + } + + pub async fn open<P: AsRef<Path>>(pth: P) -> Result<Client> { + Self::new(MpdConnection::<UnixStream>::connect(pth).await?) + } + + pub fn new(stream: Box<dyn RequestResponse>) -> Result<Client> { + Ok(Client { stream }) + } +} + +impl Client { + /// Retrieve the current server status. + pub async fn status(&mut self) -> Result<PlayerStatus> { + // We begin with sending the "status" command: "Reports the current status of the player and + // the volume level." Per the docs, "MPD may omit lines which have no (known) value", so I + // can't really count on particular lines being there. Tho nothing is said in the docs, I + // also don't want to depend on the order. + let text = self.stream.req("status").await?; + + let proto = || -> Error { + ProtocolSnafu { + op: Operation::Status, + msg: text.to_owned(), + } + .build() + }; + + // I first thought to avoid the use (and cost) of regular expressions by just doing + // sub-string searching on "state: ", but when I realized I needed to only match at the + // beginning of a line I bailed & just went ahead. This makes for more succinct code, since + // I can't count on order, either. + let state = RE_STATE + .captures(&text) + .ok_or_else(proto)? + .get(1) + .ok_or_else(proto)? + .as_str(); + + match state { + "stop" => Ok(PlayerStatus::Stopped), + "play" | "pause" => { + let songid = RE_SONGID + .captures(&text) + .ok_or_else(proto)? + .get(1) + .ok_or_else(proto)? + .as_str() + .parse::<u64>() + .map_err(|err| { + ProtocolConvSnafu { + op: Operation::Status, + } + .into_error(Box::new(err)) + })?; + + let elapsed = RE_ELAPSED + .captures(&text) + .ok_or_else(proto)? + .get(1) + .ok_or_else(proto)? + .as_str() + .parse::<f64>() + .map_err(|err| { + ProtocolConvSnafu { + op: Operation::Status, + } + .into_error(Box::new(err)) + })?; + + // navigate from `songid'-- don't send a "currentsong" message-- the current song + // could have changed + let text = self.stream.req(&format!("playlistid {}", songid)).await?; + + let file = RE_FILE + .captures(&text) + .ok_or_else(proto)? + .get(1) + .ok_or_else(proto)? + .as_str(); + let duration = RE_DURATION + .captures(&text) + .ok_or_else(proto)? + .get(1) + .ok_or_else(proto)? + .as_str() + .parse::<f64>() + .map_err(|err| { + ProtocolConvSnafu { + op: Operation::Status, + } + .into_error(Box::new(err)) + })?; + + let curr = CurrentSong::new(songid, PathBuf::from(file), elapsed, duration); + + if state == "play" { + Ok(PlayerStatus::Play(curr)) + } else { + Ok(PlayerStatus::Pause(curr)) + } + } + _ => ProtocolSnafu { + op: Operation::Status, + msg: state.to_owned(), + } + .fail(), + } + } + + /// Retrieve a song sticker by name + pub async fn get_sticker<T: FromStr>( + &mut self, + file: &str, + sticker_name: &str, + ) -> Result<Option<T>> + where + <T as FromStr>::Err: std::error::Error + Sync + Send + 'static, + { + let msg = format!("sticker get song {} {}", quote(file), quote(sticker_name)); + let text = self.stream.req(&msg).await?; + debug!("Sent message `{}'; got `{}'", &msg, &text); + + let prefix = format!("sticker: {}=", sticker_name); + if text.starts_with(&prefix) { + let s = text[prefix.len()..] + .split('\n') + .next() + .context(ProtocolSnafu { + op: Operation::GetSticker, + msg, + })?; + Ok(Some(T::from_str(s).map_err(|err| { + StickerConversionSnafu { + sticker: sticker_name.to_owned(), + } + .into_error(Box::new(err)) + })?)) + } else { + // ACK_ERROR_NO_EXIST = 50 (Ack.hxx:17) + ensure!( + text.starts_with("ACK [50@0]"), + ProtocolSnafu { + op: Operation::GetSticker, + msg, + } + ); + Ok(None) + } + } + + /// Set a song sticker by name + pub async fn set_sticker<T: std::fmt::Display>( + &mut self, + file: &str, + sticker_name: &str, + sticker_value: &T, + ) -> Result<()> { + let value_as_str = format!("{}", sticker_value); + let msg = format!( + "sticker set song {} {} {}", + quote(file), + quote(sticker_name), + quote(&value_as_str) + ); + let text = self.stream.req(&msg).await?; + debug!("Sent `{}'; got `{}'", &msg, &text); + + ensure!( + text.starts_with("OK"), + ProtocolSnafu { + op: Operation::SetSticker, + msg + } + ); + Ok(()) + } + + /// Send a file to a playlist + pub async fn send_to_playlist(&mut self, file: &str, pl: &str) -> Result<()> { + let msg = format!("playlistadd {} {}", quote(pl), quote(file)); + let text = self.stream.req(&msg).await?; + debug!("Sent `{}'; got `{}'.", &msg, &text); + ensure!( + text.starts_with("OK"), + ProtocolSnafu { + op: Operation::SendToPlaylist, + msg + } + ); + Ok(()) + } + + /// Send an arbitrary message + pub async fn send_message(&mut self, chan: &str, msg: &str) -> Result<()> { + let msg = format!("sendmessage {} {}", chan, quote(msg)); + let text = self.stream.req(&msg).await?; + debug!("Sent `{}'; got `{}'.", &msg, &text); + + ensure!( + text.starts_with("OK"), + ProtocolSnafu { + op: Operation::SendMessage, + msg: text + } + ); + Ok(()) + } + + /// Update a URI + pub async fn update(&mut self, uri: &str) -> Result<u64> { + let msg = format!("update \"{}\"", uri); + let text = self.stream.req(&msg).await?; + debug!("Sent `{}'; got `{}'.", &msg, &text); + + // We expect a response of the form: + // updating_db: JOBID + // OK + // on success, and + // ACK ERR + // on failure. + + let prefix = "updating_db: "; + ensure!( + text.starts_with(prefix), + ProtocolSnafu { + op: Operation::Update, + msg: &text + } + ); + text[prefix.len()..].split('\n').collect::<Vec<&str>>()[0] + .to_string() + .parse::<u64>() + .map_err(|err| { + ProtocolConvSnafu { + op: Operation::Update, + } + .into_error(Box::new(err)) + }) + } + + /// Get the list of stored playlists + pub async fn get_stored_playlists(&mut self) -> Result<std::vec::Vec<String>> { + let text = self.stream.req("listplaylists").await?; + debug!("Sent listplaylists; got `{}'.", &text); + + // We expect a response of the form: + // playlist: a + // Last-Modified: 2020-03-13T17:20:16Z + // playlsit: b + // Last-Modified: 2020-03-13T17:20:16Z + // ... + // OK + // + // or + // + // ACK... + ensure!( + !text.starts_with("ACK"), + ProtocolSnafu { + op: Operation::GetStoredPlaylists, + msg: text + } + ); + Ok(text + .lines() + .filter_map(|x| x.strip_prefix("playlist: ").map(String::from)) + .collect::<Vec<String>>()) + } + + /// Process a search (either find or search) response + fn search_rsp_to_uris(&self, text: &str) -> Result<std::vec::Vec<String>> { + // We expect a response of the form: + // file: P/Pogues, The - A Pistol For Paddy Garcia.mp3 + // Last-Modified: 2007-12-26T19:18:00Z + // Format: 44100:24:2 + // ... + // file: P/Pogues, The - Billy's Bones.mp3 + // ... + // OK + // + // or + // + // ACK... + ensure!( + !text.starts_with("ACK"), + ProtocolSnafu { + op: Operation::RspToUris, + msg: text.to_owned() + } + ); + Ok(text + .lines() + .filter_map(|x| x.strip_prefix("file: ").map(String::from)) + .collect::<Vec<String>>()) + } + + /// Search the database for songs matching filter (unary operator) + /// + /// Set `case` to true to request a case-sensitive search (false yields case-insensitive) + pub async fn find1( + &mut self, + cond: &str, + val: &str, + case: bool, + ) -> Result<std::vec::Vec<String>> { + let cmd = format!( + "{} {}", + if case { "find" } else { "search" }, + quote(&format!("({} {})", cond, val)) + ); + let text = self.stream.req(&cmd).await?; + self.search_rsp_to_uris(&text) + } + + /// Search the database for songs matching filter (case-sensitive, binary operator) + /// + /// Set `case` to true to request a case-sensitive search (false yields case-insensitive) + pub async fn find2( + &mut self, + attr: &str, + op: &str, + val: &str, + case: bool, + ) -> Result<std::vec::Vec<String>> { + let cmd = format!( + "{} {}", + if case { "find" } else { "search" }, + quote(&format!("({} {} {})", attr, op, val)) + ); + debug!("find2 sending ``{}''", cmd); + let text = self.stream.req(&cmd).await?; + self.search_rsp_to_uris(&text) + } + + /// Retrieve all instances of a given sticker under the music directory + /// + /// Return a mapping from song URI to textual sticker value + pub async fn get_stickers(&mut self, sticker: &str) -> Result<HashMap<String, String>> { + let text = self + .stream + .req(&format!("sticker find song \"\" {}", sticker)) + .await?; + + // We expect a response of the form: + // + // file: U-Z/Zafari - Addis Adaba.mp3 + // sticker: unwoundstack.com:rating=64 + // ... + // file: U-Z/Zero 7 - In Time (Album Version).mp3 + // sticker: unwoundstack.com:rating=255 + // OK + // + // or + // + // ACK ... + ensure!( + !text.starts_with("ACK"), + ProtocolSnafu { + op: Operation::GetStickers, + msg: text, + } + ); + let mut m = HashMap::new(); + let mut lines = text.lines(); + loop { + let file = lines.next().context(ProtocolSnafu { + op: Operation::GetStickers, + msg: text.to_owned(), + })?; + if "OK" == file { + break; + } + let val = lines.next().context(ProtocolSnafu { + op: Operation::GetStickers, + msg: text.to_owned(), + })?; + + m.insert( + String::from(&file[6..]), + String::from(&val[10 + sticker.len()..]), + ); + } + Ok(m) + } + + /// Retrieve the song URIs of all songs in the database + /// + /// Returns a vector of String + pub async fn get_all_songs(&mut self) -> Result<std::vec::Vec<String>> { + let text = self.stream.req("find \"(base '')\"").await?; + // We expect a response of the form: + // file: 0-A/A Positive Life - Lighten Up!.mp3 + // Last-Modified: 2020-11-18T22:47:07Z + // Format: 44100:24:2 + // Time: 399 + // duration: 398.550 + // Artist: A Positive Life + // Title: Lighten Up! + // Genre: Electronic + // file: 0-A/A Positive Life - Pleidean Communication.mp3 + // ... + // OK + // + // or "ACK..." + ensure!( + !text.starts_with("ACK"), + ProtocolSnafu { + op: Operation::GetAllSongs, + msg: text, + } + ); + Ok(text + .lines() + .filter_map(|x| x.strip_prefix("file: ").map(String::from)) + .collect::<Vec<String>>()) + } + + pub async fn add(&mut self, uri: &str) -> Result<()> { + let msg = format!("add {}", quote(uri)); + let text = self.stream.req(&msg).await?; + debug!("Sent `{}'; got `{}'.", &msg, &text); + + ensure!( + text.starts_with("OK"), + ProtocolSnafu { + op: Operation::Add, + msg: &text + } + ); + Ok(()) + } +} + +#[cfg(test)] +/// Let's test Client! +mod client_tests { + + use super::test_mock::Mock; + use super::*; + + /// Some basic "smoke" tests + #[tokio::test] + async fn client_smoke_test() { + let mock = Box::new(Mock::new(&[( + "sticker get song foo.mp3 stick", + "sticker: stick=splat\nOK\n", + )])); + let mut cli = Client::new(mock).unwrap(); + let val = cli + .get_sticker::<String>("foo.mp3", "stick") + .await + .unwrap() + .unwrap(); + assert_eq!(val, "splat"); + } + + /// Test the `status' method + #[tokio::test] + async fn test_status() { + let mock = Box::new(Mock::new(&[ + ( + "status", + // When the server is playing or paused, the response will look something like this: + "volume: -1 +repeat: 0 +random: 0 +single: 0 +consume: 0 +playlist: 3 +playlistlength: 87 +mixrampdb: 0.000000 +state: play +song: 14 +songid: 15 +time: 141:250 +bitrate: 128 +audio: 44100:24:2 +nextsong: 15 +nextsongid: 16 +elapsed: 140.585 +OK", + ), + // Should respond with a playlist id request + ( + "playlistid 15", + // Should look something like this: + "file: U-Z/U2 - Who's Gonna RIDE Your WILD HORSES.mp3 +Last-Modified: 2004-12-24T19:26:13Z +Artist: U2 +Title: Who's Gonna RIDE Your WILD HOR +Genre: Pop +Time: 316 +Pos: 41 +Id: 42 +duration: 249.994 +OK", + ), + ( + "status", + // But if the state is "stop", much of that will be missing; it will look more like: + "volume: -1 +repeat: 0 +random: 0 +single: 0 +consume: 0 +playlist: 84 +playlistlength: 27 +mixrampdb: 0.000000 +state: stop +OK", + ), + // Finally, let's simulate something being really wrong + ( + "status", + "volume: -1 +repeat: 0 +state: no-idea!?", + ), + ])); + let mut cli = Client::new(mock).unwrap(); + let stat = cli.status().await.unwrap(); + match stat { + PlayerStatus::Play(curr) => { + assert_eq!(curr.songid, 15); + assert_eq!( + curr.file.to_str().unwrap(), + "U-Z/U2 - Who's Gonna RIDE Your WILD HORSES.mp3" + ); + assert_eq!(curr.elapsed, 140.585); + assert_eq!(curr.duration, 249.994); + } + _ => panic!(), + } + + let stat = cli.status().await.unwrap(); + match stat { + PlayerStatus::Stopped => (), + _ => panic!(), + } + + let stat = cli.status().await; + match stat { + Err(_) => (), + Ok(_) => panic!(), + } + } + + /// Test the `get_sticker' method + #[tokio::test] + async fn test_get_sticker() { + let mock = Box::new(Mock::new(&[ + ( + "sticker get song foo.mp3 stick", + // On success, should get something like this... + "sticker: stick=2\nOK\n", + ), + ( + "sticker get song foo.mp3 stick", + // and on failure, something like this: + "ACK [50@0] {sticker} no such sticker\n", + ), + ( + "sticker get song foo.mp3 stick", + // Finally, let's try something nuts + "", + ), + ( + "sticker get song \"filename_with\\\"doublequotes\\\".flac\" unwoundstack.com:playcount", + "sticker: unwoundstack.com:playcount=11\nOK\n", + ), + ])); + let mut cli = Client::new(mock).unwrap(); + let val = cli + .get_sticker::<String>("foo.mp3", "stick") + .await + .unwrap() + .unwrap(); + assert_eq!(val, "2"); + let _val = cli + .get_sticker::<String>("foo.mp3", "stick") + .await + .unwrap() + .is_none(); + let _val = cli + .get_sticker::<String>("foo.mp3", "stick") + .await + .unwrap_err(); + let val = cli + .get_sticker::<String>( + "filename_with\"doublequotes\".flac", + "unwoundstack.com:playcount", + ) + .await + .unwrap() + .unwrap(); + assert_eq!(val, "11"); + } + + /// Test the `set_sticker' method + #[tokio::test] + async fn test_set_sticker() { + let mock = Box::new(Mock::new(&[ + ("sticker set song foo.mp3 stick 2", "OK\n"), + ( + "sticker set song foo.mp3 stick 2", + "ACK [50@0] {sticker} some error", + ), + ( + "sticker set song foo.mp3 stick 2", + "this makes no sense as a response", + ), + ])); + let mut cli = Client::new(mock).unwrap(); + let () = cli.set_sticker("foo.mp3", "stick", &"2").await.unwrap(); + let _val = cli.set_sticker("foo.mp3", "stick", &"2").await.unwrap_err(); + let _val = cli.set_sticker("foo.mp3", "stick", &"2").await.unwrap_err(); + } + + /// Test the `send_to_playlist' method + #[tokio::test] + async fn test_send_to_playlist() { + let mock = Box::new(Mock::new(&[ + ("playlistadd foo.m3u foo.mp3", "OK\n"), + ( + "playlistadd foo.m3u foo.mp3", + "ACK [101@0] {playlist} some error\n", + ), + ])); + let mut cli = Client::new(mock).unwrap(); + let () = cli.send_to_playlist("foo.mp3", "foo.m3u").await.unwrap(); + let _val = cli + .send_to_playlist("foo.mp3", "foo.m3u") + .await + .unwrap_err(); + } + + /// Test the `update' method + #[tokio::test] + async fn test_update() { + let mock = Box::new(Mock::new(&[ + ("update \"foo.mp3\"", "updating_db: 2\nOK\n"), + ("update \"foo.mp3\"", "ACK [50@0] {update} blahblahblah"), + ("update \"foo.mp3\"", "this makes no sense as a response"), + ])); + let mut cli = Client::new(mock).unwrap(); + let _val = cli.update("foo.mp3").await.unwrap(); + let _val = cli.update("foo.mp3").await.unwrap_err(); + let _val = cli.update("foo.mp3").await.unwrap_err(); + } + + /// Test retrieving stored playlists + #[tokio::test] + async fn test_get_stored_playlists() { + let mock = Box::new(Mock::new(&[ + ( + "listplaylists", + "playlist: saturday-afternoons-in-santa-cruz +Last-Modified: 2020-03-13T17:20:16Z +playlist: gaelic-punk +Last-Modified: 2020-05-24T00:36:02Z +playlist: morning-coffee +Last-Modified: 2020-03-13T17:20:16Z +OK +", + ), + ("listplaylists", "ACK [1@0] {listplaylists} blahblahblah"), + ])); + + let mut cli = Client::new(mock).unwrap(); + let val = cli.get_stored_playlists().await.unwrap(); + assert_eq!( + val, + vec![ + String::from("saturday-afternoons-in-santa-cruz"), + String::from("gaelic-punk"), + String::from("morning-coffee") + ] + ); + let _val = cli.get_stored_playlists().await.unwrap_err(); + } +} + +//////////////////////////////////////////////////////////////////////////////////////////////////// +// IdleClient // +//////////////////////////////////////////////////////////////////////////////////////////////////// + +#[non_exhaustive] +#[derive(Debug, PartialEq, Eq)] +pub enum IdleSubSystem { + Player, + Message, +} + +impl TryFrom<&str> for IdleSubSystem { + type Error = Error; + fn try_from(text: &str) -> std::result::Result<Self, Self::Error> { + let x = text.to_lowercase(); + if x == "player" { + Ok(IdleSubSystem::Player) + } else if x == "message" { + Ok(IdleSubSystem::Message) + } else { + Err(IdleSubSystemSnafu { + text: String::from(text), + } + .build()) + } + } +} + +impl fmt::Display for IdleSubSystem { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + IdleSubSystem::Player => write!(f, "Player"), + IdleSubSystem::Message => write!(f, "Message"), + } + } +} + +/// [MPD](https://www.musicpd.org) client for "idle" connections. +/// +/// # Introduction +/// +/// This is an MPD client designed to "idle": it opens a long-lived connection to the MPD server and +/// waits for MPD to respond with a message indicating that there's been a change to a subsystem of +/// interest. At present, there are only two subsystems in which [mpdpopm](crate) is interested: the player +/// & messages (cf. [IdleSubSystem]). +/// +/// ```no_run +/// use std::path::Path; +/// use tokio::runtime::Runtime; +/// use mpdpopm::clients::IdleClient; +/// +/// let mut rt = Runtime::new().unwrap(); +/// rt.block_on( async { +/// let mut client = IdleClient::open(Path::new("/var/run/mpd.sock")).await.unwrap(); +/// client.subscribe("player").await.unwrap(); +/// client.idle().await.unwrap(); +/// // Arrives here when the player's state changes +/// }) +/// ``` +/// +/// `client` is now a [Future](https://doc.rust-lang.org/stable/std/future/trait.Future.html) that +/// resolves to an [IdleClient] instance talking to `/var/run/mpd.sock`. +/// +pub struct IdleClient { + conn: Box<dyn RequestResponse>, +} + +impl IdleClient { + /// Create a new [mpdpopm::client::IdleClient][IdleClient] instance from something that + /// implements [ToSocketAddrs] + pub async fn connect<A: ToSocketAddrs>(addr: A) -> Result<IdleClient> { + Self::new(MpdConnection::<TcpStream>::connect(addr).await?) + } + + pub async fn open<P: AsRef<Path>>(pth: P) -> Result<IdleClient> { + Self::new(MpdConnection::<UnixStream>::connect(pth).await?) + } + + pub fn new(stream: Box<dyn RequestResponse>) -> Result<IdleClient> { + Ok(IdleClient { conn: stream }) + } + + /// Subscribe to an mpd channel + pub async fn subscribe(&mut self, chan: &str) -> Result<()> { + let text = self.conn.req(&format!("subscribe {}", chan)).await?; + debug!("Sent subscribe message for {}; got `{}'.", chan, text); + ensure!( + text.starts_with("OK"), + ProtocolSnafu { + op: Operation::Connect, + msg: &text + } + ); + debug!("Subscribed to {}.", chan); + Ok(()) + } + + /// Enter idle state-- return the subsystem that changed, causing the connection to return. NB + /// this may block for some time. + pub async fn idle(&mut self) -> Result<IdleSubSystem> { + let text = self.conn.req("idle player message").await?; + debug!("Sent idle message; got `{}'.", text); + + // If the player state changes, we'll get: "changed: player\nOK\n" + // + // If a ratings message is sent, we'll get: "changed: message\nOK\n", to which we respond + // "readmessages", which should give us something like: + // + // channel: ratings + // message: 255 + // OK + // + // We remain subscribed, but we need to send a new idle message. + + ensure!( + text.starts_with("changed: "), + ProtocolSnafu { + op: Operation::Idle, + msg: &text + } + ); + let idx = text.find('\n').context(ProtocolSnafu { + op: Operation::Idle, + msg: text.to_owned(), + })?; + + let result = IdleSubSystem::try_from(&text[9..idx])?; + let text = text[idx + 1..].to_string(); + ensure!( + text.starts_with("OK"), + ProtocolSnafu { + op: Operation::Idle, + msg: &text + } + ); + + Ok(result) + } + + /// This method simply returns the results of a "readmessages" as a HashMap of channel name to + /// Vec of (String) messages for that channel + pub async fn get_messages(&mut self) -> Result<HashMap<String, Vec<String>>> { + let text = self.conn.req("readmessages").await?; + debug!("Sent readmessages; got `{}'.", text); + + // We expect something like: + // + // channel: ratings + // message: 255 + // OK + // + // We remain subscribed, but we need to send a new idle message. + + let mut m: HashMap<String, Vec<String>> = HashMap::new(); + + // Populate `m' with a little state machine: + enum State { + Init, + Running, + Finished, + } + let mut state = State::Init; + let mut chan = String::new(); + let mut msgs: Vec<String> = Vec::new(); + for line in text.lines() { + match state { + State::Init => { + ensure!( + line.starts_with("channel: "), + ProtocolSnafu { + op: Operation::GetMessages, + msg: line.to_owned() + } + ); + chan = String::from(&line[9..]); + state = State::Running; + } + State::Running => { + if let Some(stripped) = line.strip_prefix("message: ") { + msgs.push(String::from(stripped)); + } else if let Some(stripped) = line.strip_prefix("channel: ") { + match m.get_mut(&chan) { + Some(v) => v.append(&mut msgs), + None => { + m.insert(chan.clone(), msgs.clone()); + } + } + chan = String::from(stripped); + msgs = Vec::new(); + } else if line == "OK" { + match m.get_mut(&chan) { + Some(v) => v.append(&mut msgs), + None => { + m.insert(chan.clone(), msgs.clone()); + } + } + state = State::Finished; + } else { + return Err(ProtocolSnafu { + op: Operation::GetMessages, + msg: text, + } + .build()); + } + } + State::Finished => { + // Should never be here! + return Err(ProtocolSnafu { + op: Operation::GetMessages, + msg: String::from(line), + } + .build()); + } + } + } + + Ok(m) + } +} + +#[cfg(test)] +/// Let's test IdleClient! +mod idle_client_tests { + + use super::test_mock::Mock; + use super::*; + + /// Some basic "smoke" tests + #[tokio::test] + async fn test_get_messages() { + let mock = Box::new(Mock::new(&[( + "readmessages", + // If a ratings message is sent, we'll get: "changed: message\nOK\n", to which we + // respond "readmessages", which should give us something like: + // + // channel: ratings + // message: 255 + // OK + // + // We remain subscribed, but we need to send a new idle message. + "channel: ratings +message: 255 +message: 128 +channel: send-to-playlist +message: foo.m3u +OK +", + )])); + let mut cli = IdleClient::new(mock).unwrap(); + let hm = cli.get_messages().await.unwrap(); + let val = hm.get("ratings").unwrap(); + assert_eq!(val.len(), 2); + let val = hm.get("send-to-playlist").unwrap(); + assert!(val.len() == 1); + } + + /// Test issue #1 + #[tokio::test] + async fn test_issue_1() { + let mock = Box::new(Mock::new(&[( + "readmessages", + "channel: playcounts +message: a +channel: playcounts +message: b +OK +", + )])); + let mut cli = IdleClient::new(mock).unwrap(); + let hm = cli.get_messages().await.unwrap(); + let val = hm.get("playcounts").unwrap(); + assert_eq!(val.len(), 2); + } +} |
