From 8f9777ce7aecfe1a163a915e3245466b9dd9ac2e Mon Sep 17 00:00:00 2001 From: Ellie Huxtable Date: Mon, 16 Mar 2026 15:22:49 -0700 Subject: Squashed 'crates/atuin-nucleo/' content from commit 4253de9f git-subtree-dir: crates/atuin-nucleo git-subtree-split: 4253de9faabb4e5c6d81d946a5e35a90f87347ee --- src/lib.rs | 462 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 462 insertions(+) create mode 100644 src/lib.rs (limited to 'src/lib.rs') diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 00000000..7ddb7407 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,462 @@ +/*! +`nucleo` is a high level crate that provides a high level matcher API that +provides a highly effective (parallel) matcher worker. It's designed to allow +quickly plugging a fully featured (and faster) fzf/skim like fuzzy matcher into +your TUI application. + +It's designed to run matching on a background threadpool while providing a +snapshot of the last complete match. That means the matcher can update the +results live while the user is typing while never blocking the main UI thread +(beyond a user provided timeout). Nucleo also supports fully concurrent lock-free +(and wait-free) streaming of input items. + +The [`Nucleo`] struct serves as the main API entrypoint for this crate. + +# Status + +Nucleo is used in the helix-editor and therefore has a large user base with lots +or real world testing. The core matcher implementation is considered complete +and is unlikely to see major changes. The `nucleo-matcher` crate is finished and +ready for widespread use, breaking changes should be very rare (a 1.0 release +should not be far away). + +While the high level `nucleo` crate also works well (and is also used in helix), +there are still additional features that will be added in the future. The high +level crate also need better documentation and will likely see a few minor API +changes in the future. + +*/ +use std::ops::{Bound, RangeBounds}; +use std::sync::atomic::{self, AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +use parking_lot::Mutex; +use rayon::ThreadPool; + +use crate::pattern::MultiPattern; +use crate::worker::Worker; +pub use nucleo_matcher::{chars, Config, Matcher, Utf32Str, Utf32String}; + +mod boxcar; +mod par_sort; +pub mod pattern; +mod worker; + +#[cfg(test)] +mod tests; + +/// A match candidate stored in a [`Nucleo`] worker. +pub struct Item<'a, T> { + pub data: &'a T, + pub matcher_columns: &'a [Utf32String], +} + +/// A handle that allows adding new items to a [`Nucleo`] worker. +/// +/// It's internally reference counted and can be cheaply cloned +/// and sent across threads. +pub struct Injector { + items: Arc>, + notify: Arc<(dyn Fn() + Sync + Send)>, +} + +impl Clone for Injector { + fn clone(&self) -> Self { + Injector { + items: self.items.clone(), + notify: self.notify.clone(), + } + } +} + +impl Injector { + /// Appends an element to the list of matched items. + /// This function is lock-free and wait-free. + pub fn push(&self, value: T, fill_columns: impl FnOnce(&T, &mut [Utf32String])) -> u32 { + let idx = self.items.push(value, fill_columns); + (self.notify)(); + idx + } + + /// Appends multiple elements to the list of matched items. + /// This function is lock-free and wait-free. + /// + /// You should favor this function over `push` if at least one of the following is true: + /// - the number of items you're adding can be computed beforehand and is typically larger + /// than 1k + /// - you're able to batch incoming items + /// - you're adding items from multiple threads concurrently (this function results in less + /// contention) + pub fn extend(&self, values: I, fill_columns: impl Fn(&T, &mut [Utf32String])) + where + I: IntoIterator + ExactSizeIterator, + { + self.items.extend(values, fill_columns); + (self.notify)(); + } + + /// Returns the total number of items injected in the matcher. This might + /// not match the number of items in the match snapshot (if the matcher + /// is still running) + pub fn injected_items(&self) -> u32 { + self.items.count() + } + + /// Returns a reference to the item at the given index. + /// + /// # Safety + /// + /// Item at `index` must be initialized. That means you must have observed + /// `push` returning this value or `get` returning `Some` for this value. + /// Just because a later index is initialized doesn't mean that this index + /// is initialized + pub unsafe fn get_unchecked(&self, index: u32) -> Item<'_, T> { + self.items.get_unchecked(index) + } + + /// Returns a reference to the element at the given index. + pub fn get(&self, index: u32) -> Option> { + self.items.get(index) + } +} + +/// An [item](crate::Item) that was successfully matched by a [`Nucleo`] worker. +#[derive(PartialEq, Eq, Debug, Clone, Copy)] +pub struct Match { + pub score: u32, + pub idx: u32, +} + +/// That status of a [`Nucleo`] worker after a match. +#[derive(PartialEq, Eq, Debug, Clone, Copy)] +pub struct Status { + /// Whether the current snapshot has changed. + pub changed: bool, + /// Whether the matcher is still processing in the background. + pub running: bool, +} + +/// A snapshot represent the results of a [`Nucleo`] worker after +/// finishing a [`tick`](Nucleo::tick). +pub struct Snapshot { + item_count: u32, + matches: Vec, + pattern: MultiPattern, + items: Arc>, +} + +impl Snapshot { + fn clear(&mut self, new_items: Arc>) { + self.item_count = 0; + self.matches.clear(); + self.items = new_items + } + + fn update(&mut self, worker: &Worker) { + self.item_count = worker.item_count(); + self.pattern.clone_from(&worker.pattern); + self.matches.clone_from(&worker.matches); + if !Arc::ptr_eq(&worker.items, &self.items) { + self.items = worker.items.clone() + } + } + + /// Returns that total number of items + pub fn item_count(&self) -> u32 { + self.item_count + } + + /// Returns the pattern which items were matched against + pub fn pattern(&self) -> &MultiPattern { + &self.pattern + } + + /// Returns that number of items that matched the pattern + pub fn matched_item_count(&self) -> u32 { + self.matches.len() as u32 + } + + /// Returns an iterator over the items that correspond to a subrange of + /// all the matches in this snapshot. + /// + /// # Panics + /// Panics if `range` has a range bound that is larger than + /// the matched item count + pub fn matched_items( + &self, + range: impl RangeBounds, + ) -> impl ExactSizeIterator> + DoubleEndedIterator + '_ { + // TODO: use TAIT + let start = match range.start_bound() { + Bound::Included(&start) => start as usize, + Bound::Excluded(&start) => start as usize + 1, + Bound::Unbounded => 0, + }; + let end = match range.end_bound() { + Bound::Included(&end) => end as usize + 1, + Bound::Excluded(&end) => end as usize, + Bound::Unbounded => self.matches.len(), + }; + self.matches[start..end] + .iter() + .map(|&m| unsafe { self.items.get_unchecked(m.idx) }) + } + + /// Returns a reference to the item at the given index. + /// + /// # Safety + /// + /// Item at `index` must be initialized. That means you must have observed a + /// match with the corresponding index in this exact snapshot. Observing + /// a higher index is not enough as item indices can be non-contigously + /// initialized + #[inline] + pub unsafe fn get_item_unchecked(&self, index: u32) -> Item<'_, T> { + self.items.get_unchecked(index) + } + + /// Returns a reference to the item at the given index. + /// + /// Returns `None` if the given `index` is not initialized. This function + /// is only guarteed to return `Some` for item indices that can be found in + /// the `matches` of this struct. Both smaller and larger indices may return + /// `None`. + #[inline] + pub fn get_item(&self, index: u32) -> Option> { + self.items.get(index) + } + + /// Return the matches corresponding to this snapshot. + #[inline] + pub fn matches(&self) -> &[Match] { + &self.matches + } + + /// A convenience function to return the [`Item`] corresponding to the + /// `n`th match. + /// + /// Returns `None` if `n` is greater than or equal to the match count. + #[inline] + pub fn get_matched_item(&self, n: u32) -> Option> { + // SAFETY: A match index is guaranteed to corresponding to a valid global index in this + // snapshot. + unsafe { Some(self.get_item_unchecked(self.matches.get(n as usize)?.idx)) } + } +} + +#[repr(u8)] +#[derive(Clone, Copy, PartialEq, Eq)] +enum State { + Init, + /// items have been cleared but snapshot and items are still outdated + Cleared, + /// items are fresh + Fresh, +} + +impl State { + fn matcher_item_refs(self) -> usize { + match self { + State::Cleared => 1, + State::Init | State::Fresh => 2, + } + } + + fn canceled(self) -> bool { + self != State::Fresh + } + + fn cleared(self) -> bool { + self != State::Fresh + } +} + +/// A high level matcher worker that quickly computes matches in a background +/// threadpool. +pub struct Nucleo { + // the way the API is build we totally don't actually need these to be Arcs + // but this lets us avoid some unsafe + canceled: Arc, + should_notify: Arc, + worker: Arc>>, + pool: ThreadPool, + state: State, + items: Arc>, + notify: Arc<(dyn Fn() + Sync + Send)>, + snapshot: Snapshot, + /// The pattern matched by this matcher. To update the match pattern + /// [`MultiPattern::reparse`](`pattern::MultiPattern::reparse`) should be used. + /// Note that the matcher worker will only become aware of the new pattern + /// after a call to [`tick`](Nucleo::tick). + pub pattern: MultiPattern, +} + +impl Nucleo { + /// Constructs a new `nucleo` worker threadpool with the provided `config`. + /// + /// `notify` is called everytime new information is available and + /// [`tick`](Nucleo::tick) should be called. Note that `notify` is not + /// debounced, that should be handled by the downstream crate (for example + /// debouncing to only redraw at most every 1/60 seconds). + /// + /// If `None` is passed for the number of worker threads, nucleo will use + /// one thread per hardware thread. + /// + /// Nucleo can match items with multiple orthogonal properties. `columns` + /// indicates how many matching columns each item (and the pattern) has. The + /// number of columns cannot be changed after construction. + pub fn new( + config: Config, + notify: Arc<(dyn Fn() + Sync + Send)>, + num_threads: Option, + columns: u32, + ) -> Self { + let (pool, worker) = Worker::new(num_threads, config, notify.clone(), columns); + Self { + canceled: worker.canceled.clone(), + should_notify: worker.should_notify.clone(), + items: worker.items.clone(), + pool, + pattern: MultiPattern::new(columns as usize), + snapshot: Snapshot { + matches: Vec::with_capacity(2 * 1024), + pattern: MultiPattern::new(columns as usize), + item_count: 0, + items: worker.items.clone(), + }, + worker: Arc::new(Mutex::new(worker)), + state: State::Init, + notify, + } + } + + /// Returns the total number of active injectors + pub fn active_injectors(&self) -> usize { + Arc::strong_count(&self.items) + - self.state.matcher_item_refs() + - (Arc::ptr_eq(&self.snapshot.items, &self.items)) as usize + } + + /// Returns a snapshot of the current matcher state. + pub fn snapshot(&self) -> &Snapshot { + &self.snapshot + } + + /// Returns an injector that can be used for adding candidates to the matcher. + pub fn injector(&self) -> Injector { + Injector { + items: self.items.clone(), + notify: self.notify.clone(), + } + } + + /// Restart the the item stream. Removes all items and disconnects all + /// previously created injectors from this instance. If `clear_snapshot` + /// is `true` then all items and matched are removed from the [`Snapshot`] + /// immediately. Otherwise the snapshot will keep the current matches until + /// the matcher has run again. + /// + /// # Note + /// + /// The injectors will continue to function but they will not affect this + /// instance anymore. The old items will only be dropped when all injectors + /// were dropped. + pub fn restart(&mut self, clear_snapshot: bool) { + self.canceled.store(true, Ordering::Relaxed); + self.items = Arc::new(boxcar::Vec::with_capacity(1024, self.items.columns())); + self.state = State::Cleared; + if clear_snapshot { + self.snapshot.clear(self.items.clone()); + } + } + + /// Update the internal configuration. + pub fn update_config(&mut self, config: Config) { + self.worker.lock().update_config(config) + } + + // Set whether the matcher should sort search results by score after + // matching. Defaults to true. + pub fn sort_results(&mut self, sort_results: bool) { + self.worker.lock().sort_results(sort_results) + } + + // Set whether the matcher should reverse the order of the input. + // Defaults to false. + pub fn reverse_items(&mut self, reverse_items: bool) { + self.worker.lock().reverse_items(reverse_items) + } + + /// The main way to interact with the matcher, this should be called + /// regularly (for example each time a frame is rendered). To avoid + /// excessive redraws this method will wait `timeout` milliseconds for the + /// worker thread to finish. It is recommend to set the timeout to 10ms. + pub fn tick(&mut self, timeout: u64) -> Status { + self.should_notify.store(false, atomic::Ordering::Relaxed); + let status = self.pattern.status(); + let canceled = status != pattern::Status::Unchanged || self.state.canceled(); + let mut res = self.tick_inner(timeout, canceled, status); + if !canceled { + return res; + } + self.state = State::Fresh; + let status2 = self.tick_inner(timeout, false, pattern::Status::Unchanged); + res.changed |= status2.changed; + res.running = status2.running; + res + } + + fn tick_inner(&mut self, timeout: u64, canceled: bool, status: pattern::Status) -> Status { + let mut inner = if canceled { + self.pattern.reset_status(); + self.canceled.store(true, atomic::Ordering::Relaxed); + self.worker.lock_arc() + } else { + let Some(worker) = self.worker.try_lock_arc_for(Duration::from_millis(timeout)) else { + self.should_notify.store(true, Ordering::Release); + return Status { + changed: false, + running: true, + }; + }; + worker + }; + + let changed = inner.running; + + let running = canceled || self.items.count() > inner.item_count(); + if inner.running { + inner.running = false; + if !inner.was_canceled && !self.state.canceled() { + self.snapshot.update(&inner) + } + } + if running { + inner.pattern.clone_from(&self.pattern); + self.canceled.store(false, atomic::Ordering::Relaxed); + if !canceled { + self.should_notify.store(true, atomic::Ordering::Release); + } + let cleared = self.state.cleared(); + if cleared { + inner.items = self.items.clone(); + } + self.pool + .spawn(move || unsafe { inner.run(status, cleared) }) + } + Status { changed, running } + } +} + +impl Drop for Nucleo { + fn drop(&mut self) { + // we ensure the worker quits before dropping items to ensure that + // the worker can always assume the items outlive it + self.canceled.store(true, atomic::Ordering::Relaxed); + let lock = self.worker.try_lock_for(Duration::from_secs(1)); + if lock.is_none() { + unreachable!("thread pool failed to shutdown properly") + } + } +} -- cgit v1.3.1