From 1d030b9d32f539fd38f5ff3335234c5111c3303f Mon Sep 17 00:00:00 2001 From: Conrad Ludgate Date: Mon, 9 May 2022 07:46:52 +0100 Subject: Importer V3 (#395) * start of importer refactor * fish * resh * zsh --- src/command/client/import.rs | 149 ++++++++++++++----------------------------- 1 file changed, 48 insertions(+), 101 deletions(-) (limited to 'src/command/client/import.rs') diff --git a/src/command/client/import.rs b/src/command/client/import.rs index c70446d5..580e4b0e 100644 --- a/src/command/client/import.rs +++ b/src/command/client/import.rs @@ -1,13 +1,14 @@ -use std::{env, path::PathBuf}; +use std::env; +use async_trait::async_trait; use clap::Parser; -use eyre::{eyre, Result}; +use eyre::Result; use indicatif::ProgressBar; use atuin_client::{ database::Database, history::History, - import::{bash::Bash, fish::Fish, resh::Resh, zsh::Zsh, Importer}, + import::{bash::Bash, fish::Fish, resh::Resh, zsh::Zsh, Importer, Loader}, }; #[derive(Parser)] @@ -18,13 +19,10 @@ pub enum Cmd { /// Import history from the zsh history file Zsh, - /// Import history from the bash history file Bash, - /// Import history from the resh history file Resh, - /// Import history from the fish history file Fish, } @@ -32,7 +30,7 @@ pub enum Cmd { const BATCH_SIZE: usize = 100; impl Cmd { - pub async fn run(&self, db: &mut (impl Database + Send + Sync)) -> Result<()> { + pub async fn run(&self, db: &mut DB) -> Result<()> { println!(" Atuin "); println!("======================"); println!(" \u{1f30d} "); @@ -47,124 +45,73 @@ impl Cmd { if shell.ends_with("/zsh") { println!("Detected ZSH"); - import::, _>(db, BATCH_SIZE).await + import::(db).await } else if shell.ends_with("/fish") { println!("Detected Fish"); - import::, _>(db, BATCH_SIZE).await + import::(db).await } else if shell.ends_with("/bash") { println!("Detected Bash"); - import::, _>(db, BATCH_SIZE).await + import::(db).await } else { println!("cannot import {} history", shell); Ok(()) } } - Self::Zsh => import::, _>(db, BATCH_SIZE).await, - Self::Bash => import::, _>(db, BATCH_SIZE).await, - Self::Resh => import::(db, BATCH_SIZE).await, - Self::Fish => import::, _>(db, BATCH_SIZE).await, + Self::Zsh => import::(db).await, + Self::Bash => import::(db).await, + Self::Resh => import::(db).await, + Self::Fish => import::(db).await, } } } -async fn import( - db: &mut DB, - buf_size: usize, -) -> Result<()> -where - I::IntoIter: Send, -{ - println!("Importing history from {}", I::NAME); - - let histpath = get_histpath::()?; - let contents = I::parse(histpath)?; - - let iter = contents.into_iter(); - let progress = if let (_, Some(upper_bound)) = iter.size_hint() { - ProgressBar::new(upper_bound as u64) - } else { - ProgressBar::new_spinner() - }; - - let mut buf = Vec::::with_capacity(buf_size); - let mut iter = progress.wrap_iter(iter); - loop { - // fill until either no more entries - // or until the buffer is full - let done = fill_buf(&mut buf, &mut iter); - - // flush - db.save_bulk(&buf).await?; +pub struct HistoryImporter<'db, DB: Database> { + pb: ProgressBar, + buf: Vec, + db: &'db mut DB, +} - if done { - break; +impl<'db, DB: Database> HistoryImporter<'db, DB> { + fn new(db: &'db mut DB, len: usize) -> Self { + Self { + pb: ProgressBar::new(len as u64), + buf: Vec::with_capacity(BATCH_SIZE), + db, } } - println!("Import complete!"); - - Ok(()) -} - -fn get_histpath() -> Result { - if let Ok(p) = env::var("HISTFILE") { - is_file(PathBuf::from(p)) - } else { - is_file(I::histpath()?) + async fn flush(self) -> Result<()> { + if !self.buf.is_empty() { + self.db.save_bulk(&self.buf).await?; + } + self.pb.finish(); + Ok(()) } } -fn is_file(p: PathBuf) -> Result { - if p.is_file() { - Ok(p) - } else { - Err(eyre!( - "Could not find history file {:?}. Try setting $HISTFILE", - p - )) +#[async_trait] +impl<'db, DB: Database> Loader for HistoryImporter<'db, DB> { + async fn push(&mut self, hist: History) -> Result<()> { + self.pb.inc(1); + self.buf.push(hist); + if self.buf.len() == self.buf.capacity() { + self.db.save_bulk(&self.buf).await?; + self.buf.clear(); + } + Ok(()) } } -fn fill_buf(buf: &mut Vec, iter: &mut impl Iterator>) -> bool { - buf.clear(); - loop { - match iter.next() { - Some(Ok(t)) => buf.push(t), - Some(Err(_)) => (), - None => break true, - } +async fn import(db: &mut DB) -> Result<()> { + println!("Importing history from {}", I::NAME); - if buf.len() == buf.capacity() { - break false; - } - } -} + let mut importer = I::new().await?; + let len = importer.entries().await.unwrap(); + let mut loader = HistoryImporter::new(db, len); + importer.load(&mut loader).await?; + loader.flush().await?; -#[cfg(test)] -mod tests { - use super::fill_buf; - - #[test] - fn test_fill_buf() { - let mut buf = Vec::with_capacity(4); - let mut iter = vec![ - Ok(1), - Err(2), - Ok(3), - Ok(4), - Err(5), - Ok(6), - Ok(7), - Err(8), - Ok(9), - ] - .into_iter(); - - assert!(!fill_buf(&mut buf, &mut iter)); - assert_eq!(buf, vec![1, 3, 4, 6]); - - assert!(fill_buf(&mut buf, &mut iter)); - assert_eq!(buf, vec![7, 9]); - } + println!("Import complete!"); + Ok(()) } -- cgit v1.3.1