diff options
Diffstat (limited to '')
| -rw-r--r-- | pkgs/by-name/mp/mpdpopm/src/messages.rs | 732 |
1 files changed, 732 insertions, 0 deletions
diff --git a/pkgs/by-name/mp/mpdpopm/src/messages.rs b/pkgs/by-name/mp/mpdpopm/src/messages.rs new file mode 100644 index 00000000..85b24470 --- /dev/null +++ b/pkgs/by-name/mp/mpdpopm/src/messages.rs @@ -0,0 +1,732 @@ +// Copyright (C) 2020-2025 Michael herstine <sp1ff@pobox.com> +// +// This file is part of mpdpopm. +// +// mpdpopm is free software: you can redistribute it and/or modify it under the terms of the GNU +// General Public License as published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// mpdpopm is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even +// the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General +// Public License for more details. +// +// You should have received a copy of the GNU General Public License along with mpdpopm. If not, +// see <http://www.gnu.org/licenses/>. + +//! # messages +//! +//! Process incoming messages to the [mpdpopm](https://github.com/sp1ff/mpdpopm) daemon. +//! +//! # Introduction +//! +//! The [mpdpopm](https://github.com/sp1ff/mpdpopm) daemon accepts commands over a dedicated +//! [channel](https://www.musicpd.org/doc/html/protocol.html#client-to-client). It also provides for +//! a generalized framework in which the [mpdpopm](https://github.com/sp1ff/mpdpopm) administrator +//! can define new commands backed by arbitrary command execution server-side. +//! +//! # Commands +//! +//! The following commands are built-in: +//! +//! - set rating: `rate RATING( TRACK)?` +//! - set playcount: `setpc PC( TRACK)?` +//! - set lastplayed: `setlp TIMESTAMP( TRACK)?` +//! +//! There is no need to provide corresponding accessors since this functionality is already provided +//! via "sticker get". Dedicated accessors could provide the same functionality with slightly more +//! convenience since the sticker name would not have to be specified (as with "sticker get") & may +//! be added at a later date. +//! +//! I'm expanding the MPD filter functionality to include attributes tracked by mpdpopm: +//! +//! - findadd replacement: `findadd FILTER [sort TYPE] [window START:END]` +//! (cf. [here](https://www.musicpd.org/doc/html/protocol.html#the-music-database)) +//! +//! - searchadd replacement: `searchadd FILTER [sort TYPE] [window START:END]` +//! (cf. [here](https://www.musicpd.org/doc/html/protocol.html#the-music-database)) +//! +//! Additional commands may be added through the +//! [generalized commands](crate::commands#the-generalized-command-framework) feature. + +use crate::{ + clients::{Client, IdleClient, PlayerStatus}, + filters::ExpressionParser, + filters_ast::{FilterStickerNames, evaluate}, + ratings::{RatedTrack, RatingRequest}, + storage::{self, last_played, play_count, rating_count}, +}; + +use backtrace::Backtrace; +use boolinator::Boolinator; +use tracing::debug; + +use std::collections::VecDeque; +use std::convert::TryFrom; +use std::path::PathBuf; + +//////////////////////////////////////////////////////////////////////////////////////////////////// + +#[derive(Debug)] +pub enum Error { + BadPath { + pth: PathBuf, + }, + FilterParseError { + msg: String, + }, + InvalidChar { + c: u8, + }, + NoClosingQuotes, + NoCommand, + NotImplemented { + feature: String, + }, + PlayerStopped, + TrailingBackslash, + UnknownChannel { + chan: String, + back: Backtrace, + }, + UnknownCommand { + name: String, + back: Backtrace, + }, + Client { + source: crate::clients::Error, + back: Backtrace, + }, + Ratings { + source: crate::storage::Error, + back: Backtrace, + }, + Playcount { + source: crate::storage::Error, + back: Backtrace, + }, + Filter { + source: crate::filters_ast::Error, + back: Backtrace, + }, + Utf8 { + source: std::str::Utf8Error, + buf: Vec<u8>, + back: Backtrace, + }, + ExpectedInt { + source: std::num::ParseIntError, + text: String, + back: Backtrace, + }, +} + +impl std::fmt::Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + match self { + Error::BadPath { pth } => write!(f, "Bad path: {:?}", pth), + Error::FilterParseError { msg } => write!(f, "Parse error: ``{}''", msg), + Error::InvalidChar { c } => write!(f, "Invalid unquoted character {}", c), + Error::NoClosingQuotes => write!(f, "Missing closing quotes"), + Error::NoCommand => write!(f, "No command specified"), + Error::NotImplemented { feature } => write!(f, "`{}' not implemented, yet", feature), + Error::PlayerStopped => write!( + f, + "Can't operate on the current track when the player is stopped" + ), + Error::TrailingBackslash => write!(f, "Trailing backslash"), + Error::UnknownChannel { chan, back: _ } => write!( + f, + "We received messages for an unknown channel `{}'; this is likely a bug; please consider filing a report to sp1ff@pobox.com", + chan + ), + Error::UnknownCommand { name, back: _ } => { + write!(f, "We received an unknown message ``{}''", name) + } + Error::Client { source, back: _ } => write!(f, "Client error: {}", source), + Error::Ratings { source, back: _ } => write!(f, "Ratings eror: {}", source), + Error::Playcount { source, back: _ } => write!(f, "Playcount error: {}", source), + Error::Filter { source, back: _ } => write!(f, "Filter error: {}", source), + Error::Utf8 { + source, + buf, + back: _, + } => write!(f, "UTF8 error {} ({:#?})", source, buf), + Error::ExpectedInt { + source, + text, + back: _, + } => write!(f, "``{}''L {}", source, text), + } + } +} + +pub type Result<T> = std::result::Result<T, Error>; + +//////////////////////////////////////////////////////////////////////////////////////////////////// + +/// Break `buf` up into individual tokens while removing MPD-style quoting. +/// +/// When a client sends a command to [mpdpopm](crate), it will look like this on the wire: +/// +/// ```text +/// sendmessage ${CHANNEL} "some-command \"with space\" simple \"'with single' and \\\\\"" +/// ``` +/// +/// In other words, the MPD "sendmessage" command takes two parameters: the channel and the +/// message. The recipient (i.e. us) is responsible for breaking up the message into its constituent +/// parts (a command name & its arguments in our case). +/// +/// The message will perforce be quoted according ot the MPD rules: +/// +/// 1. an un-quoted token may contain any printable ASCII character except space, tab, ' & " +/// +/// 2. to include spaces, tabs, '-s or "-s, the token must be enclosed in "-s, and any "-s or \\-s +/// therein must be backslash escaped +/// +/// When the messages is delivered to us, it has already been un-escaped; i.e. we will see the +/// string: +/// +/// ```text +/// some-command "with space" simple "'with single' and \\" +/// ``` +/// +/// This function will break that string up into individual tokens with one more level +/// of escaping removed; i.e. it will return an iterator that will yield the four tokens: +/// +/// 1. some-command +/// 2. with space +/// 3. simple +/// 4. 'with single' and \\ +/// +/// [MPD](https://github.com/MusicPlayerDaemon/MPD) has a nice +/// [implementation](https://github.com/MusicPlayerDaemon/MPD/blob/master/src/util/Tokenizer.cxx#L170) +/// that modifies the string in place by copying subsequent characters on top of escape characters +/// in the same buffer, inserting nulls in between the resulting tokens,and then working in terms of +/// pointers to the resulting null-terminated strings. +/// +/// Once I realized that I could split slices I saw how to implement an Iterator that do the same +/// thing (an idiomatic interface to the tokenization backed by a zero-copy implementation). I was +/// inspired by [My Favorite Rust Function +/// Signature](<https://www.brandonsmith.ninja/blog/favorite-rust-function>). +/// +/// NB. This method works in terms of a slice of [`u8`] because we can't index into Strings in +/// Rust, and MPD deals only in terms of ASCII at any rate. +pub fn tokenize(buf: &mut [u8]) -> impl Iterator<Item = Result<&[u8]>> { + TokenIterator::new(buf) +} + +struct TokenIterator<'a> { + /// The slice on which we operate; modified in-place as we yield tokens + slice: &'a mut [u8], + /// Index into [`slice`] of the first non-whitespace character + input: usize, +} + +impl<'a> TokenIterator<'a> { + pub fn new(slice: &'a mut [u8]) -> TokenIterator<'a> { + let input = match slice.iter().position(|&x| x > 0x20) { + Some(n) => n, + None => slice.len(), + }; + TokenIterator { slice, input } + } +} + +impl<'a> Iterator for TokenIterator<'a> { + type Item = Result<&'a [u8]>; + + fn next(&mut self) -> Option<Self::Item> { + let nslice = self.slice.len(); + if self.slice.is_empty() || self.input == nslice { + None + } else if '"' == self.slice[self.input] as char { + // This is NextString in MPD: walk self.slice, un-escaping characters, until we find + // a closing ". Note that we un-escape by moving characters forward in the slice. + let mut inp = self.input + 1; + let mut out = self.input; + while self.slice[inp] as char != '"' { + if '\\' == self.slice[inp] as char { + inp += 1; + if inp == nslice { + return Some(Err(Error::TrailingBackslash)); + } + } + self.slice[out] = self.slice[inp]; + out += 1; + inp += 1; + if inp == nslice { + return Some(Err(Error::NoClosingQuotes)); + } + } + // The next token is in self.slice[self.input..out] and self.slice[inp] is " + let tmp = std::mem::take(&mut self.slice); + let (_, tmp) = tmp.split_at_mut(self.input); + let (result, new_slice) = tmp.split_at_mut(out - self.input); + self.slice = new_slice; + // strip any leading whitespace + self.input = inp - out + 1; // +1 to skip the closing " + while self.input < self.slice.len() && self.slice[self.input] as char == ' ' { + self.input += 1; + } + Some(Ok(result)) + } else { + // This is NextUnquoted in MPD; walk self.slice, validating characters until the end + // or the next whitespace + let mut i = self.input; + while i < nslice { + if 0x20 >= self.slice[i] { + break; + } + if self.slice[i] as char == '"' || self.slice[i] as char == '\'' { + return Some(Err(Error::InvalidChar { c: self.slice[i] })); + } + i += 1; + } + // The next token is in self.slice[self.input..i] & self.slice[i] is either one- + // past-the end or whitespace. + let tmp = std::mem::take(&mut self.slice); + let (_, tmp) = tmp.split_at_mut(self.input); + let (result, new_slice) = tmp.split_at_mut(i - self.input); + self.slice = new_slice; + // strip any leading whitespace + self.input = match self.slice.iter().position(|&x| x > 0x20) { + Some(n) => n, + None => self.slice.len(), + }; + Some(Ok(result)) + } + } +} + +/// Collective state needed for processing messages, both built-in & generalized +#[derive(Default)] +pub struct MessageProcessor {} + +impl MessageProcessor { + /// Whip up a new instance; other than cloning the iterators, should just hold references in the + /// enclosing scope + pub fn new() -> MessageProcessor { + Self::default() + } + + /// Read messages off the commands channel & dispatch 'em + pub async fn check_messages<'a>( + &self, + client: &mut Client, + idle_client: &mut IdleClient, + state: PlayerStatus, + command_chan: &str, + stickers: &FilterStickerNames<'a>, + ) -> Result<()> { + let m = idle_client + .get_messages() + .await + .map_err(|err| Error::Client { + source: err, + back: Backtrace::new(), + })?; + + for (chan, msgs) in m { + // Only supporting a single channel, ATM + (chan == command_chan).ok_or_else(|| Error::UnknownChannel { + chan, + back: Backtrace::new(), + })?; + for msg in msgs { + self.process(msg, client, &state, stickers).await?; + } + } + + Ok(()) + } + + /// Process a single command + pub async fn process<'a>( + &self, + msg: String, + client: &mut Client, + state: &PlayerStatus, + stickers: &FilterStickerNames<'a>, + ) -> Result<()> { + if let Some(stripped) = msg.strip_prefix("rate ") { + self.rate(stripped, client, state).await + } else if let Some(stripped) = msg.strip_prefix("inc-rate ") { + self.inc_rate(stripped, client, state).await + } else if let Some(stripped) = msg.strip_prefix("setpc ") { + self.setpc(stripped, client, state).await + } else if let Some(stripped) = msg.strip_prefix("setlp ") { + self.setlp(stripped, client, state).await + } else if let Some(stripped) = msg.strip_prefix("findadd ") { + self.findadd(stripped.to_string(), client, stickers, state) + .await + } else if let Some(stripped) = msg.strip_prefix("searchadd ") { + self.searchadd(stripped.to_string(), client, stickers, state) + .await + } else { + unreachable!("Unkonwn command") + } + } + + /// Handle rating message: "RATING( TRACK)?" + async fn rate(&self, msg: &str, client: &mut Client, state: &PlayerStatus) -> Result<()> { + let req = RatingRequest::try_from(msg).map_err(|err| Error::Ratings { + source: storage::Error::Rating { + source: err, + back: Backtrace::new(), + }, + back: Backtrace::new(), + })?; + + let pathb = match req.track { + RatedTrack::Current => match state { + PlayerStatus::Stopped => { + return Err(Error::PlayerStopped {}); + } + PlayerStatus::Play(curr) | PlayerStatus::Pause(curr) => curr.file.clone(), + }, + RatedTrack::File(p) => p, + RatedTrack::Relative(_i) => { + return Err(Error::NotImplemented { + feature: String::from("Relative track position"), + }); + } + }; + let path: &str = pathb + .to_str() + .ok_or_else(|| Error::BadPath { pth: pathb.clone() })?; + + debug!("Setting a rating of {} for `{}'.", req.rating, path); + + rating_count::set(client, path, req.rating) + .await + .map_err(|err| Error::Ratings { + source: err, + back: Backtrace::new(), + })?; + + Ok(()) + } + + /// Handle inc-rating message: "( TRACK)?" + async fn inc_rate(&self, msg: &str, client: &mut Client, state: &PlayerStatus) -> Result<()> { + let pathb = if msg.is_empty() { + // We rate the current track + match state { + PlayerStatus::Stopped => { + return Err(Error::PlayerStopped {}); + } + PlayerStatus::Play(curr) | PlayerStatus::Pause(curr) => curr.file.clone(), + } + } else { + PathBuf::from(msg) + }; + + let path: &str = pathb + .to_str() + .ok_or_else(|| Error::BadPath { pth: pathb.clone() })?; + + let rating = rating_count::get(client, path) + .await + .map_err(|err| Error::Ratings { + source: err, + back: Backtrace::new(), + })? + .unwrap_or(0) + .saturating_add(1); + + debug!( + "Incrementing a rating for `{}' (new value: {}).", + path, rating + ); + + rating_count::set(client, path, rating) + .await + .map_err(|err| Error::Ratings { + source: err, + back: Backtrace::new(), + })?; + + Ok(()) + } + + /// Handle `setpc': "PC( TRACK)?" + async fn setpc(&self, msg: &str, client: &mut Client, state: &PlayerStatus) -> Result<()> { + let text = msg.trim(); + let (pc, track) = match text.find(char::is_whitespace) { + Some(idx) => ( + text[..idx] + .parse::<usize>() + .map_err(|err| Error::ExpectedInt { + source: err, + text: String::from(text), + back: Backtrace::new(), + })?, + &text[idx + 1..], + ), + None => ( + text.parse::<usize>().map_err(|err| Error::ExpectedInt { + source: err, + text: String::from(text), + back: Backtrace::new(), + })?, + "", + ), + }; + let file = if track.is_empty() { + match state { + PlayerStatus::Stopped => { + return Err(Error::PlayerStopped {}); + } + PlayerStatus::Play(curr) | PlayerStatus::Pause(curr) => curr + .file + .to_str() + .ok_or_else(|| Error::BadPath { + pth: curr.file.clone(), + })? + .to_string(), + } + } else { + track.to_string() + }; + + play_count::set(client, &file, pc) + .await + .map_err(|err| Error::Playcount { + source: err, + back: Backtrace::new(), + })?; + + Ok(()) + } + + /// Handle `setlp': "LASTPLAYED( TRACK)?" + async fn setlp(&self, msg: &str, client: &mut Client, state: &PlayerStatus) -> Result<()> { + let text = msg.trim(); + let (lp, track) = match text.find(char::is_whitespace) { + Some(idx) => ( + text[..idx] + .parse::<u64>() + .map_err(|err| Error::ExpectedInt { + source: err, + text: String::from(text), + back: Backtrace::new(), + })?, + &text[idx + 1..], + ), + None => ( + text.parse::<u64>().map_err(|err| Error::ExpectedInt { + source: err, + text: String::from(text), + back: Backtrace::new(), + })?, + "", + ), + }; + + let file = if track.is_empty() { + match state { + PlayerStatus::Stopped => { + return Err(Error::PlayerStopped {}); + } + PlayerStatus::Play(curr) | PlayerStatus::Pause(curr) => curr + .file + .to_str() + .ok_or_else(|| Error::BadPath { + pth: curr.file.clone(), + })? + .to_string(), + } + } else { + track.to_string() + }; + + last_played::set(client, &file, lp) + .await + .map_err(|err| Error::Playcount { + source: err, + back: Backtrace::new(), + })?; + + Ok(()) + } + + /// Handle `findadd': "FILTER [sort TYPE] [window START:END]" + async fn findadd<'a>( + &self, + msg: String, + client: &mut Client, + stickers: &FilterStickerNames<'a>, + _state: &PlayerStatus, + ) -> Result<()> { + let mut buf = msg.into_bytes(); + let args: VecDeque<&str> = tokenize(&mut buf) + .map(|r| match r { + Ok(buf) => Ok(std::str::from_utf8(buf).map_err(|err| Error::Utf8 { + source: err, + buf: buf.to_vec(), + back: Backtrace::new(), + })?), + Err(err) => Err(err), + }) + .collect::<Result<VecDeque<&str>>>()?; + + debug!("findadd arguments: {:#?}", args); + + // there should be 1, 3 or 5 arguments. `sort' & `window' are not supported, yet. + + // ExpressionParser's not terribly ergonomic: it returns a ParesError<L, T, E>; T is the + // offending token, which has the same lifetime as our input, which makes it tough to + // capture. Nor is there a convenient way in which to treat all variants other than the + // Error Trait. + let ast = match ExpressionParser::new().parse(args[0]) { + Ok(ast) => ast, + Err(err) => { + return Err(Error::FilterParseError { + msg: format!("{}", err), + }); + } + }; + + debug!("ast: {:#?}", ast); + + let mut results = Vec::new(); + for song in evaluate(&ast, true, client, stickers) + .await + .map_err(|err| Error::Filter { + source: err, + back: Backtrace::new(), + })? + { + results.push(client.add(&song).await); + } + match results + .into_iter() + .collect::<std::result::Result<Vec<()>, crate::clients::Error>>() + { + Ok(_) => Ok(()), + Err(err) => Err(Error::Client { + source: err, + back: Backtrace::new(), + }), + } + } + + /// Handle `searchadd': "FILTER [sort TYPE] [window START:END]" + async fn searchadd<'a>( + &self, + msg: String, + client: &mut Client, + stickers: &FilterStickerNames<'a>, + _state: &PlayerStatus, + ) -> Result<()> { + // Tokenize the message + let mut buf = msg.into_bytes(); + let args: VecDeque<&str> = tokenize(&mut buf) + .map(|r| match r { + Ok(buf) => Ok(std::str::from_utf8(buf).map_err(|err| Error::Utf8 { + source: err, + buf: buf.to_vec(), + back: Backtrace::new(), + })?), + Err(err) => Err(err), + }) + .collect::<Result<VecDeque<&str>>>()?; + + debug!("searchadd arguments: {:#?}", args); + + // there should be 1, 3 or 5 arguments. `sort' & `window' are not supported, yet. + + // ExpressionParser's not terribly ergonomic: it returns a ParesError<L, T, E>; T is the + // offending token, which has the same lifetime as our input, which makes it tough to + // capture. Nor is there a convenient way in which to treat all variants other than the + // Error Trait. + let ast = match ExpressionParser::new().parse(args[0]) { + Ok(ast) => ast, + Err(err) => { + return Err(Error::FilterParseError { + msg: format!("{}", err), + }); + } + }; + + debug!("ast: {:#?}", ast); + + let mut results = Vec::new(); + for song in evaluate(&ast, false, client, stickers) + .await + .map_err(|err| Error::Filter { + source: err, + back: Backtrace::new(), + })? + { + results.push(client.add(&song).await); + } + match results + .into_iter() + .collect::<std::result::Result<Vec<()>, crate::clients::Error>>() + { + Ok(_) => Ok(()), + Err(err) => Err(Error::Client { + source: err, + back: Backtrace::new(), + }), + } + } +} + +#[cfg(test)] +mod tokenize_tests { + use super::Result; + use super::tokenize; + + #[test] + fn tokenize_smoke() { + let mut buf1 = String::from("some-command").into_bytes(); + let x1: Vec<&[u8]> = tokenize(&mut buf1).collect::<Result<Vec<&[u8]>>>().unwrap(); + assert_eq!(x1[0], b"some-command"); + + let mut buf2 = String::from("a b").into_bytes(); + let x2: Vec<&[u8]> = tokenize(&mut buf2).collect::<Result<Vec<&[u8]>>>().unwrap(); + assert_eq!(x2[0], b"a"); + assert_eq!(x2[1], b"b"); + + let mut buf3 = String::from("a \"b c\"").into_bytes(); + let x3: Vec<&[u8]> = tokenize(&mut buf3).collect::<Result<Vec<&[u8]>>>().unwrap(); + assert_eq!(x3[0], b"a"); + assert_eq!(x3[1], b"b c"); + + let mut buf4 = String::from("a \"b c\" d").into_bytes(); + let x4: Vec<&[u8]> = tokenize(&mut buf4).collect::<Result<Vec<&[u8]>>>().unwrap(); + assert_eq!(x4[0], b"a"); + assert_eq!(x4[1], b"b c"); + assert_eq!(x4[2], b"d"); + + let mut buf5 = String::from("simple-command \"with space\" \"with '\"").into_bytes(); + let x5: Vec<&[u8]> = tokenize(&mut buf5).collect::<Result<Vec<&[u8]>>>().unwrap(); + assert_eq!(x5[0], b"simple-command"); + assert_eq!(x5[1], b"with space"); + assert_eq!(x5[2], b"with '"); + + let mut buf6 = String::from("cmd \"with\\\\slash and space\"").into_bytes(); + let x6: Vec<&[u8]> = tokenize(&mut buf6).collect::<Result<Vec<&[u8]>>>().unwrap(); + assert_eq!(x6[0], b"cmd"); + assert_eq!(x6[1], b"with\\slash and space"); + + let mut buf7 = String::from(" cmd \"with\\\\slash and space\" ").into_bytes(); + let x7: Vec<&[u8]> = tokenize(&mut buf7).collect::<Result<Vec<&[u8]>>>().unwrap(); + assert_eq!(x7[0], b"cmd"); + assert_eq!(x7[1], b"with\\slash and space"); + } + + #[test] + fn tokenize_filter() { + let mut buf1 = String::from(r#""(artist =~ \"foo\\\\bar\\\"\")""#).into_bytes(); + let x1: Vec<&[u8]> = tokenize(&mut buf1).collect::<Result<Vec<&[u8]>>>().unwrap(); + assert_eq!(1, x1.len()); + eprintln!("x1[0] is ``{}''", std::str::from_utf8(x1[0]).unwrap()); + assert_eq!( + std::str::from_utf8(x1[0]).unwrap(), + r#"(artist =~ "foo\\bar\"")"# + ); + } +} |
