commit dfb712fd57727307d7ef7320fb39a0f68614812b from: witcher date: Sat Dec 3 15:03:17 2022 UTC Send E-Mails asynchronously Thanks to the introduction of tokio in the codebase, E-Mails can now be sent asynchronously as well, speeding up the implementation even more. Implements: https://todo.sr.ht/~witcher/rss-email/16 commit - e0de93c7136c4b0ea0e9885640d2f98085c796d8 commit + dfb712fd57727307d7ef7320fb39a0f68614812b blob - ddcfc5149177be9bf422109cfe96d795ec3d0130 blob + 23ae82d210d370184d665daeb30008eeddf8ab2d --- Cargo.lock +++ Cargo.lock @@ -38,6 +38,17 @@ source = "registry+https://github.com/rust-lang/crates checksum = "508b352bb5c066aac251f6daf6b36eccd03e8a88e8081cd44959ea277a3af9a8" [[package]] +name = "async-trait" +version = "0.1.58" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e805d94e6b5001b651426cf4cd446b1ab5f319d27bab5c644f61de0a804360c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] name = "atoi" version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -480,6 +491,12 @@ dependencies = [ ] [[package]] +name = "futures-io" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb" + +[[package]] name = "futures-sink" version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -498,8 +515,10 @@ source = "registry+https://github.com/rust-lang/crates checksum = "197676987abd2f9cadff84926f410af1c183608d36641465df73ae8211dc65d6" dependencies = [ "futures-core", + "futures-io", "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", "slab", @@ -755,10 +774,13 @@ version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2eabca5e0b4d0e98e7f2243fb5b7520b6af2b65d8f87bcc86f2c75185a6ff243" dependencies = [ + "async-trait", "base64", "email-encoding", "email_address", "fastrand", + "futures-io", + "futures-util", "httpdate", "idna", "mime", @@ -768,6 +790,8 @@ dependencies = [ "rustls", "rustls-pemfile", "socket2", + "tokio", + "tokio-rustls", "webpki-roots", ] @@ -1254,6 +1278,15 @@ dependencies = [ ] [[package]] +name = "signal-hook-registry" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51e73328dc4ac0c7ccbda3a494dfa03df1de2f46018127f60c693f2648455b0" +dependencies = [ + "libc", +] + +[[package]] name = "slab" version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1495,6 +1528,7 @@ dependencies = [ "mio", "num_cpus", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "winapi", @@ -1575,9 +1609,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.29" +version = "0.1.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5aeea4303076558a00714b823f9ad67d58a3bbda1df83d8827d21193156e22f7" +checksum = "24eb03ba0eab1fd845050058ce5e616558e8f8d8fca633e6b163fe25c797213a" dependencies = [ "once_cell", ] blob - 299cf0f638fb2fefa23bf2310735930e3a319a98 blob + 3831aa3aa39a055c6420b327922cacb32b448b5f --- Cargo.toml +++ Cargo.toml @@ -14,7 +14,7 @@ reqwest = {version = "0.11", default-features = false, clap = { version = "3", features = ["derive"] } chrono = "0.4" toml = "0.5.8" -lettre = { version = "0.10.1", default-features = false, features = ["rustls", "rustls-tls", "builder", "smtp-transport"] } +lettre = { version = "0.10.1", default-features = false, features = ["rustls", "rustls-tls", "builder", "smtp-transport", "pool", "tokio1", "tokio1-rustls-tls"] } serde = { version = "1.0", features = ["derive"] } directories = "4.0.1" log = "0.4.17" blob - 34e1430a519195e4298fa3956d7a002f5bc85c90 blob + d895b9f7effb754fc346479f3f8b44070dd22096 --- src/mail.rs +++ src/mail.rs @@ -1,17 +1,17 @@ use crate::config::Config; use lettre::{ message::Message, - transport::smtp::{authentication::Credentials, SmtpTransport}, - Transport, + transport::smtp::{authentication::Credentials, AsyncSmtpTransport}, + AsyncTransport, Tokio1Executor, }; -pub fn get_mailer(config: &Config) -> anyhow::Result { +pub fn get_mailer(config: &Config) -> anyhow::Result> { let creds = Credentials::new( config.smtp.user.to_string(), config.smtp.password.to_string(), ); - let mailer = SmtpTransport::relay(&config.smtp.server)? + let mailer = AsyncSmtpTransport::::relay(&config.smtp.server)? .credentials(creds) .port(config.smtp.port) .build(); @@ -19,11 +19,11 @@ pub fn get_mailer(config: &Config) -> anyhow::Result( +pub async fn send_email( config: &Config, subject: S, body: I, - mailer: &SmtpTransport, + mailer: AsyncSmtpTransport, ) -> anyhow::Result<()> where S: AsRef, @@ -35,7 +35,7 @@ where .subject(subject.as_ref()) .body(body.into())?; - mailer.send(&email)?; + mailer.send(email).await?; Ok(()) } blob - c4a56aaabbfafc555d4dd6fa03b7cd320dd536d1 blob + 9d30382720404549952280e0f4482a6b6ddfe4c2 --- src/main.rs +++ src/main.rs @@ -13,12 +13,14 @@ pub mod models; use crate::mail::{get_mailer, send_email}; use anyhow::Context; use config::Config; +use lettre::{AsyncSmtpTransport, Tokio1Executor}; use std::{ fs::File, io::{BufRead, BufReader}, + sync::Arc, }; -use sqlx::{pool::PoolConnection, sqlite::SqlitePoolOptions, Sqlite}; +use sqlx::{sqlite::SqlitePoolOptions, Sqlite}; use tokio::task::JoinSet; const VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -28,9 +30,11 @@ const AUTHORS: &str = env!("CARGO_PKG_AUTHORS"); async fn main() -> anyhow::Result<()> { env_logger::init(); + // NOTE: `build_app` ensures all `Option`s in `args` to be `Some`, so unwrapping the values + // should be fine let args = cli::Cli::build_app()?; - let config = Config::new(args.config_path.unwrap())?; + let config = Arc::new(Config::new(args.config_path.unwrap())?); let urls = BufReader::new( File::open(args.urls_path.as_ref().unwrap()).context(format!( @@ -74,32 +78,49 @@ async fn main() -> anyhow::Result<()> { .fetch_all(&pool) .await?; - send_posts(pool.acquire().await?, &config, results, args.dry_run).await?; + let mailer = get_mailer(&config)?; + let mut handles = JoinSet::new(); + for result in results { + let mut conn = pool.acquire().await?; + let mailer = mailer.clone(); + let config = config.clone(); + handles + .spawn(async move { send_post(&mut conn, mailer, config, result, args.dry_run).await }); + } + + while let Some(handle) = handles.join_next().await { + // TODO: retry sending mail instead? user should specify number of retries in config + if let Err(e) = handle? { + log::error!("An error occured while sending an E-Mail: {}", e); + } + } + Ok(()) } -async fn send_posts( - mut conn: PoolConnection, - config: &Config, - items: Vec, +async fn send_post<'a, E>( + conn: E, + mailer: AsyncSmtpTransport, + config: Arc, + post: models::Post, dry_run: bool, -) -> anyhow::Result<()> { - let mailer = get_mailer(config)?; - for post in items { - if !dry_run { - let subject = post.title.unwrap_or_else(|| "No title found".to_string()); - let body = match post.content { - Some(c) => c + "\n\n" + &post.url.unwrap(), - None => post.url.unwrap(), - }; - send_email(config, subject, body, &mailer)?; - } - - sqlx::query!("update posts set sent = true where guid = ?", post.guid) - .execute(&mut conn) - .await?; +) -> anyhow::Result<()> +where + E: sqlx::Executor<'a, Database = Sqlite>, +{ + if !dry_run { + let subject = post.title.unwrap_or_else(|| "No title found".to_string()); + let body = match post.content { + Some(c) => c + "\n\n" + &post.url.unwrap(), + None => post.url.unwrap(), + }; + send_email(&config, subject, body, mailer).await?; } + sqlx::query!("update posts set sent = true where guid = ?", post.guid) + .execute(conn) + .await?; + Ok(()) }