commit 771900b15631ac2a32cf60bf9bd16eaf88815c0c from: Thomas Böhler date: Thu Oct 23 18:48:59 2025 UTC fix: handle errors from tasks in JoinSet Check all task errors instead of whether joining them was successful. Previously the actual errors were ignored because it was only checked whether joining all tasks in the JoinSet was successful. Signed-off-by: Thomas Böhler commit - 6f8eb2710134cde1501dfca708fda3d16d9ab578 commit + 771900b15631ac2a32cf60bf9bd16eaf88815c0c blob - 4edbfe45d73e6a659b7ef084fc113a6a6fbd6406 blob + ab43d68d2c1f5a8b559c74aabf188be3304b7842 --- src/main.rs +++ src/main.rs @@ -60,7 +60,10 @@ pub enum Error { #[tokio::main] async fn main() { if let Err(e) = app_main().await { - eprintln!("{e}"); + eprintln!("The following errors were encountered:"); + for err in e { + eprintln!("{err}"); + } std::process::exit(1); } } @@ -199,18 +202,21 @@ async fn sender( Ok(()) } -async fn app_main() -> Result<(), Error> { - let config = AppConfig::new().context(ConfigSnafu)?; - let urls: Vec = - BufReader::new(File::open(config.urls_path.as_str()).context(IoSnafu { - path: &config.urls_path, - })?) - .lines() - .map_while(Result::ok) - .filter(|l| !l.starts_with('#')) - .collect(); +async fn app_main() -> Result<(), Vec> { + let config = AppConfig::new().context(ConfigSnafu).map_err(|e| vec![e])?; + let urls: Vec = BufReader::new( + File::open(config.urls_path.as_str()) + .context(IoSnafu { + path: &config.urls_path, + }) + .map_err(|e| vec![e])?, + ) + .lines() + .map_while(Result::ok) + .filter(|l| !l.starts_with('#')) + .collect(); - let pool = init_db(&config.database_path).await?; + let pool = init_db(&config.database_path).await.map_err(|e| vec![e])?; let (fetch_tx, acc_rx) = mpsc::channel::(100); let (acc_tx, send_rx) = mpsc::channel::(10); @@ -226,18 +232,40 @@ async fn app_main() -> Result<(), Error> { if config.dry_run { log::info!("Not sending any emails as \"--dry-run\" has been passed"); } - let conn = pool.acquire().await.context(GeneralDatabaseSnafu)?; + let conn = pool + .acquire() + .await + .context(GeneralDatabaseSnafu) + .map_err(|e| vec![e])?; join_set.spawn(async move { accumulator(conn, acc_rx, acc_tx).await }); - let conn = pool.acquire().await.context(GeneralDatabaseSnafu)?; + let conn = pool + .acquire() + .await + .context(GeneralDatabaseSnafu) + .map_err(|e| vec![e])?; join_set.spawn(async move { sender(&config, conn, send_rx, config.dry_run).await }); + let mut errors = Vec::new(); while let Some(task) = join_set.join_next().await { - if let Err(e) = task { - log::error!("Task failed: {e}"); + match task.context(JoinSetSnafu) { + Ok(res) => { + if let Err(e) = res { + log::error!("Error during task: {e}"); + errors.push(e); + } + } + Err(e) => { + log::error!("Error joining tasks: {e}"); + errors.push(e); + } } } - Ok(()) + if errors.is_empty() { + Ok(()) + } else { + Err(errors) + } } async fn send_post<'a>(