Commit Diff


commit - e0de93c7136c4b0ea0e9885640d2f98085c796d8
commit + dfb712fd57727307d7ef7320fb39a0f68614812b
blob - ddcfc5149177be9bf422109cfe96d795ec3d0130
blob + 23ae82d210d370184d665daeb30008eeddf8ab2d
--- Cargo.lock
+++ Cargo.lock
@@ -38,6 +38,17 @@ source = "registry+https://github.com/rust-lang/crates
 checksum = "508b352bb5c066aac251f6daf6b36eccd03e8a88e8081cd44959ea277a3af9a8"
 
 [[package]]
+name = "async-trait"
+version = "0.1.58"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1e805d94e6b5001b651426cf4cd446b1ab5f319d27bab5c644f61de0a804360c"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
+[[package]]
 name = "atoi"
 version = "1.0.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -480,6 +491,12 @@ dependencies = [
 ]
 
 [[package]]
+name = "futures-io"
+version = "0.3.25"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb"
+
+[[package]]
 name = "futures-sink"
 version = "0.3.25"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -498,8 +515,10 @@ source = "registry+https://github.com/rust-lang/crates
 checksum = "197676987abd2f9cadff84926f410af1c183608d36641465df73ae8211dc65d6"
 dependencies = [
  "futures-core",
+ "futures-io",
  "futures-sink",
  "futures-task",
+ "memchr",
  "pin-project-lite",
  "pin-utils",
  "slab",
@@ -755,10 +774,13 @@ version = "0.10.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "2eabca5e0b4d0e98e7f2243fb5b7520b6af2b65d8f87bcc86f2c75185a6ff243"
 dependencies = [
+ "async-trait",
  "base64",
  "email-encoding",
  "email_address",
  "fastrand",
+ "futures-io",
+ "futures-util",
  "httpdate",
  "idna",
  "mime",
@@ -768,6 +790,8 @@ dependencies = [
  "rustls",
  "rustls-pemfile",
  "socket2",
+ "tokio",
+ "tokio-rustls",
  "webpki-roots",
 ]
 
@@ -1254,6 +1278,15 @@ dependencies = [
 ]
 
 [[package]]
+name = "signal-hook-registry"
+version = "1.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0"
+dependencies = [
+ "libc",
+]
+
+[[package]]
 name = "slab"
 version = "0.4.7"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1495,6 +1528,7 @@ dependencies = [
  "mio",
  "num_cpus",
  "pin-project-lite",
+ "signal-hook-registry",
  "socket2",
  "tokio-macros",
  "winapi",
@@ -1575,9 +1609,9 @@ dependencies = [
 
 [[package]]
 name = "tracing-core"
-version = "0.1.29"
+version = "0.1.30"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5aeea4303076558a00714b823f9ad67d58a3bbda1df83d8827d21193156e22f7"
+checksum = "24eb03ba0eab1fd845050058ce5e616558e8f8d8fca633e6b163fe25c797213a"
 dependencies = [
  "once_cell",
 ]
blob - 299cf0f638fb2fefa23bf2310735930e3a319a98
blob + 3831aa3aa39a055c6420b327922cacb32b448b5f
--- Cargo.toml
+++ Cargo.toml
@@ -14,7 +14,7 @@ reqwest = {version = "0.11", default-features = false,
 clap = { version = "3", features = ["derive"] }
 chrono = "0.4"
 toml = "0.5.8"
-lettre = { version = "0.10.1", default-features = false, features = ["rustls", "rustls-tls", "builder", "smtp-transport"] }
+lettre = { version = "0.10.1", default-features = false, features = ["rustls", "rustls-tls", "builder", "smtp-transport", "pool", "tokio1", "tokio1-rustls-tls"] }
 serde = { version = "1.0", features = ["derive"] }
 directories = "4.0.1"
 log = "0.4.17"
blob - 34e1430a519195e4298fa3956d7a002f5bc85c90
blob + d895b9f7effb754fc346479f3f8b44070dd22096
--- src/mail.rs
+++ src/mail.rs
@@ -1,17 +1,17 @@
 use crate::config::Config;
 use lettre::{
     message::Message,
-    transport::smtp::{authentication::Credentials, SmtpTransport},
-    Transport,
+    transport::smtp::{authentication::Credentials, AsyncSmtpTransport},
+    AsyncTransport, Tokio1Executor,
 };
 
-pub fn get_mailer(config: &Config) -> anyhow::Result<SmtpTransport> {
+pub fn get_mailer(config: &Config) -> anyhow::Result<AsyncSmtpTransport<Tokio1Executor>> {
     let creds = Credentials::new(
         config.smtp.user.to_string(),
         config.smtp.password.to_string(),
     );
 
-    let mailer = SmtpTransport::relay(&config.smtp.server)?
+    let mailer = AsyncSmtpTransport::<Tokio1Executor>::relay(&config.smtp.server)?
         .credentials(creds)
         .port(config.smtp.port)
         .build();
@@ -19,11 +19,11 @@ pub fn get_mailer(config: &Config) -> anyhow::Result<S
     Ok(mailer)
 }
 
-pub fn send_email<S, I>(
+pub async fn send_email<S, I>(
     config: &Config,
     subject: S,
     body: I,
-    mailer: &SmtpTransport,
+    mailer: AsyncSmtpTransport<Tokio1Executor>,
 ) -> anyhow::Result<()>
 where
     S: AsRef<str>,
@@ -35,7 +35,7 @@ where
         .subject(subject.as_ref())
         .body(body.into())?;
 
-    mailer.send(&email)?;
+    mailer.send(email).await?;
 
     Ok(())
 }
blob - c4a56aaabbfafc555d4dd6fa03b7cd320dd536d1
blob + 9d30382720404549952280e0f4482a6b6ddfe4c2
--- src/main.rs
+++ src/main.rs
@@ -13,12 +13,14 @@ pub mod models;
 use crate::mail::{get_mailer, send_email};
 use anyhow::Context;
 use config::Config;
+use lettre::{AsyncSmtpTransport, Tokio1Executor};
 use std::{
     fs::File,
     io::{BufRead, BufReader},
+    sync::Arc,
 };
 
-use sqlx::{pool::PoolConnection, sqlite::SqlitePoolOptions, Sqlite};
+use sqlx::{sqlite::SqlitePoolOptions, Sqlite};
 use tokio::task::JoinSet;
 
 const VERSION: &str = env!("CARGO_PKG_VERSION");
@@ -28,9 +30,11 @@ const AUTHORS: &str = env!("CARGO_PKG_AUTHORS");
 async fn main() -> anyhow::Result<()> {
     env_logger::init();
 
+    // NOTE: `build_app` ensures all `Option`s in `args` to be `Some`, so unwrapping the values
+    // should be fine
     let args = cli::Cli::build_app()?;
 
-    let config = Config::new(args.config_path.unwrap())?;
+    let config = Arc::new(Config::new(args.config_path.unwrap())?);
 
     let urls = BufReader::new(
         File::open(args.urls_path.as_ref().unwrap()).context(format!(
@@ -74,32 +78,49 @@ async fn main() -> anyhow::Result<()> {
     .fetch_all(&pool)
     .await?;
 
-    send_posts(pool.acquire().await?, &config, results, args.dry_run).await?;
+    let mailer = get_mailer(&config)?;
+    let mut handles = JoinSet::new();
+    for result in results {
+        let mut conn = pool.acquire().await?;
+        let mailer = mailer.clone();
+        let config = config.clone();
 
+        handles
+            .spawn(async move { send_post(&mut conn, mailer, config, result, args.dry_run).await });
+    }
+
+    while let Some(handle) = handles.join_next().await {
+        // TODO: retry sending mail instead? user should specify number of retries in config
+        if let Err(e) = handle? {
+            log::error!("An error occured while sending an E-Mail: {}", e);
+        }
+    }
+
     Ok(())
 }
 
-async fn send_posts(
-    mut conn: PoolConnection<Sqlite>,
-    config: &Config,
-    items: Vec<models::Post>,
+async fn send_post<'a, E>(
+    conn: E,
+    mailer: AsyncSmtpTransport<Tokio1Executor>,
+    config: Arc<Config>,
+    post: models::Post,
     dry_run: bool,
-) -> anyhow::Result<()> {
-    let mailer = get_mailer(config)?;
-    for post in items {
-        if !dry_run {
-            let subject = post.title.unwrap_or_else(|| "No title found".to_string());
-            let body = match post.content {
-                Some(c) => c + "\n\n" + &post.url.unwrap(),
-                None => post.url.unwrap(),
-            };
-            send_email(config, subject, body, &mailer)?;
-        }
-
-        sqlx::query!("update posts set sent = true where guid = ?", post.guid)
-            .execute(&mut conn)
-            .await?;
+) -> anyhow::Result<()>
+where
+    E: sqlx::Executor<'a, Database = Sqlite>,
+{
+    if !dry_run {
+        let subject = post.title.unwrap_or_else(|| "No title found".to_string());
+        let body = match post.content {
+            Some(c) => c + "\n\n" + &post.url.unwrap(),
+            None => post.url.unwrap(),
+        };
+        send_email(&config, subject, body, mailer).await?;
     }
 
+    sqlx::query!("update posts set sent = true where guid = ?", post.guid)
+        .execute(conn)
+        .await?;
+
     Ok(())
 }