use std::{
collections::HashMap,
path::{Path, PathBuf},
};
use anyhow::{bail, Context, Result};
use futures::{Stream, StreamExt, TryStreamExt};
use log::info;
use tokio::{
fs::{self, DirEntry},
io,
sync::mpsc::{self, Receiver, Sender},
task::JoinHandle,
};
use tokio_stream::wrappers::{ReadDirStream, ReceiverStream};
pub struct MappingGenerator {
current_mappings: HashMap<String, PathBuf>,
paths_to_process: Vec<PathBuf>,
}
pub struct MappingGeneratorHelper {
tx: Sender<(PathBuf, oneshotSender<PathBuf>)>,
handle: JoinHandle<()>,
done: Vec<PathBuf>,
}
impl MappingGeneratorHelper {
pub fn new() -> Self {
let (rx, tx) = mpsc::channel(100);
let handle = tokio::spawn(async move {
while let Some(dir) = rx.recv().await {
info!("processing '{}'..", dir.display());
get_dir(dir);
}
});
Self { tx, handle }
}
pub async fn process(&self, dir: PathBuf) -> Result<()> {
let (tx, rx) =
self.tx.send(dir).await?;
Ok(())
}
}
impl MappingGenerator {
pub async fn new(directories_to_scan: Vec<PathBuf>, depth: usize) -> Result<Self> {
let cleaned_directories: Vec<PathBuf> = directories_to_scan
.into_iter()
.map(check_dir)
.collect::<Result<_>>()?;
let helper = MappingGeneratorHelper::new();
cleaned_directories
.into_iter()
.for_each(|dir| helper.process(dir));
info!(
"Will process:\n {}",
all_directories
.iter()
.map(|pat| pat.display().to_string())
.collect::<Vec<_>>()
.join("\n ")
);
Ok(Self {
current_mappings: HashMap::new(),
paths_to_process: all_directories,
})
}
}
fn check_dir(dir: PathBuf) -> Result<PathBuf> {
match dir.metadata() {
Ok(_) => Ok(dir),
Err(e) => bail!(
"'{}' is not a valid path; Error was: '{}'",
dir.display(),
e
),
}
}
pub async fn get_dir(dir: PathBuf, current_depth: usize, max_depth: usize) -> Result<Vec<PathBuf>> {
let (tx, rx) = mpsc::channel(100);
let handle = tokio::spawn(async move { get_dir_recursive(dir, current_depth, max_depth, tx) });
let out = ReceiverStream::new(rx).collect::<Vec<PathBuf>>().await;
handle.await?;
Ok(out)
}
async fn get_dir_recursive(
dir: PathBuf,
current_depth: usize,
max_depth: usize,
tx: Sender<PathBuf>,
) -> Result<()> {
if dir.is_dir() && current_depth != max_depth {
tx.send(dir).await?;
match fs::read_dir(&dir).await {
Ok(directories) => {
let mut handles: Vec<JoinHandle<Result<(), anyhow::Error>>> = vec![];
while let Some(entry) = directories
.next_entry()
.await
.with_context(|| format!("Failed to read directory: '{}'", dir.display()))?
{
let tx_new = tx.clone();
handles.push(tokio::spawn(async move {
get_dir_recursive(entry.path(), current_depth + 1, max_depth, tx_new)
.await
.with_context(|| {
format!("Failed to get child directories to '{}'", dir.display())
})?;
Ok(())
}));
}
let out: Vec<_> = tokio_stream::iter(handles)
.then(|handle| async move { handle.await })
.collect()
.await;
// I have no idea what happened here to the type system
for i in out {
i??
}
Ok(())
}
Err(e) => {
bail!(
"Unable to read directory {}, skipping; error: {}",
dir.display(),
e
);
}
}
} else {
return Ok(());
}
}
#[cfg(test)]
mod test {
use std::path::PathBuf;
use super::get_dir;
#[test]
fn test_get_dir() {
let dirs = get_dir(PathBuf::from("~/repos"));
let expected_dirs = vec![PathBuf::from("~/repos/rust")];
assert_eq!(dirs, expected_dirs);
}
}