about summary refs log blame commit diff stats
path: root/sys/nixpkgs/pkgs/lf-make-map/src/generator/mod.rs.old
blob: 406b1996681c2e0264247cde6ccf27cf5cbeb822 (plain) (tree)



































































































































































                                                                                                    
use std::{
    collections::HashMap,
    path::{Path, PathBuf},
};

use anyhow::{bail, Context, Result};
use futures::{Stream, StreamExt, TryStreamExt};
use log::info;
use tokio::{
    fs::{self, DirEntry},
    io,
    sync::mpsc::{self, Receiver, Sender},
    task::JoinHandle,
};
use tokio_stream::wrappers::{ReadDirStream, ReceiverStream};

pub struct MappingGenerator {
    current_mappings: HashMap<String, PathBuf>,
    paths_to_process: Vec<PathBuf>,
}

pub struct MappingGeneratorHelper {
    tx: Sender<(PathBuf, oneshotSender<PathBuf>)>,
    handle: JoinHandle<()>,
    done: Vec<PathBuf>,
}

impl MappingGeneratorHelper {
    pub fn new() -> Self {
        let (rx, tx) = mpsc::channel(100);

        let handle = tokio::spawn(async move {
            while let Some(dir) = rx.recv().await {
                info!("processing '{}'..", dir.display());
                get_dir(dir);
            }
        });

        Self { tx, handle }
    }

    pub async fn process(&self, dir: PathBuf) -> Result<()> {
        let (tx, rx) =
        self.tx.send(dir).await?;
        Ok(())
    }
}

impl MappingGenerator {
    pub async fn new(directories_to_scan: Vec<PathBuf>, depth: usize) -> Result<Self> {
        let cleaned_directories: Vec<PathBuf> = directories_to_scan
            .into_iter()
            .map(check_dir)
            .collect::<Result<_>>()?;

        let helper = MappingGeneratorHelper::new();

        cleaned_directories
            .into_iter()
            .for_each(|dir| helper.process(dir));

        info!(
            "Will process:\n  {}",
            all_directories
                .iter()
                .map(|pat| pat.display().to_string())
                .collect::<Vec<_>>()
                .join("\n  ")
        );
        Ok(Self {
            current_mappings: HashMap::new(),
            paths_to_process: all_directories,
        })
    }
}

fn check_dir(dir: PathBuf) -> Result<PathBuf> {
    match dir.metadata() {
        Ok(_) => Ok(dir),
        Err(e) => bail!(
            "'{}' is not a valid path; Error was: '{}'",
            dir.display(),
            e
        ),
    }
}

pub async fn get_dir(dir: PathBuf, current_depth: usize, max_depth: usize) -> Result<Vec<PathBuf>> {
    let (tx, rx) = mpsc::channel(100);

    let handle = tokio::spawn(async move { get_dir_recursive(dir, current_depth, max_depth, tx) });

    let out = ReceiverStream::new(rx).collect::<Vec<PathBuf>>().await;
    handle.await?;
    Ok(out)
}

async fn get_dir_recursive(
    dir: PathBuf,
    current_depth: usize,
    max_depth: usize,
    tx: Sender<PathBuf>,
) -> Result<()> {
    if dir.is_dir() && current_depth != max_depth {
        tx.send(dir).await?;

        match fs::read_dir(&dir).await {
            Ok(directories) => {
                let mut handles: Vec<JoinHandle<Result<(), anyhow::Error>>> = vec![];
                while let Some(entry) = directories
                    .next_entry()
                    .await
                    .with_context(|| format!("Failed to read directory: '{}'", dir.display()))?
                {
                    let tx_new = tx.clone();
                    handles.push(tokio::spawn(async move {
                        get_dir_recursive(entry.path(), current_depth + 1, max_depth, tx_new)
                            .await
                            .with_context(|| {
                                format!("Failed to get child directories to '{}'", dir.display())
                            })?;

                        Ok(())
                    }));
                }

                let out: Vec<_> = tokio_stream::iter(handles)
                    .then(|handle| async move { handle.await })
                    .collect()
                    .await;

                // I have no idea what happened here to the type system
                for i in out {
                    i??
                }

                Ok(())
            }

            Err(e) => {
                bail!(
                    "Unable to read directory {}, skipping; error: {}",
                    dir.display(),
                    e
                );
            }
        }
    } else {
        return Ok(());
    }
}

#[cfg(test)]
mod test {
    use std::path::PathBuf;

    use super::get_dir;

    #[test]
    fn test_get_dir() {
        let dirs = get_dir(PathBuf::from("~/repos"));
        let expected_dirs = vec![PathBuf::from("~/repos/rust")];
        assert_eq!(dirs, expected_dirs);
    }
}