commit a60f9bd921b2b6be0b45d81d723c112aea5d90ca from: Thomas Böhler date: Sun Feb 25 09:55:16 2024 UTC refactor: change application architecture The architecture of rss-email is changed to include 3 phases: - Fetching - Accumulating - Sending This makes it possible to parallelise these steps, with messages passed between them. Signed-off-by: Thomas Böhler commit - 517f0f6252a4398a77ad82c027ecdcc7ba8e3f29 commit + a60f9bd921b2b6be0b45d81d723c112aea5d90ca blob - /dev/null blob + 36a0a25ca401b75ff4b19f80ca34f4ced938b92a (mode 644) --- /dev/null +++ docs/ARCHITECTURE.md @@ -0,0 +1,25 @@ +# rss-email architecture + +## Overview + +FETCHER -> ACCUMULATOR -> SENDER + +### FETCHER + +The FETCHER just fetches RSS/Atom feeds. It then sends these feeds to the +ACCUMULATOR. + +### ACCUMULATOR + +The ACCUMULATOR first reads all unsent feeds from the database and sends +these to the SENDER. +It also receives feeds from the FETCHER, checks the database for whether +these have been sent yet, and, if they haven't, forward these to the +SENDER. + +In other words, the ACCUMULATOR accumulates both old feeds from the +database and new feeds from the internet. + +### SENDER + +The SENDER receives feeds from the ACCUMULATOR and sends them. blob - 780b448bbd3b5418e9d7e0db9205d275b8801a74 blob + bc83b8caf0b9d65e05175271b9eea967851e46e7 --- sqlx-data.json +++ sqlx-data.json @@ -58,6 +58,54 @@ }, "query": "select * from posts where sent != true order by pub_date desc" }, + "a0e1000c1a200a436e2f33b36adf4ca6b7a8edcc950c9b22a161ba2d47d8315b": { + "describe": { + "columns": [ + { + "name": "guid", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "title", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "url", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "pub_date", + "ordinal": 3, + "type_info": "Int64" + }, + { + "name": "content", + "ordinal": 4, + "type_info": "Text" + }, + { + "name": "sent", + "ordinal": 5, + "type_info": "Bool" + } + ], + "nullable": [ + false, + true, + true, + true, + true, + false + ], + "parameters": { + "Right": 1 + } + }, + "query": "select *\n from posts\n where guid = ?" + }, "e8c193917469a0ee24db731dd5ccef549b01e97eb3d05deb46706bd702b587fc": { "describe": { "columns": [], blob - 897732853817e93e172b5ba94464b93b1e005ce7 blob + 4b2a8c5f8c65a9cbe84a160921f2df1ce08c870b --- src/config.rs +++ src/config.rs @@ -113,7 +113,7 @@ impl AppConfig { Some(smtp_port), Some(smtp_starttls), ) => { - trace!("All necessary config values have been given on the command line, no need to check for config file"); + log::trace!("All necessary config values have been given on the command line, no need to check for config file"); s.mail_from = mail_from; s.mail_to = mail_to; s.smtp_user = smtp_user; blob - 90ed941fb643996268fc37fdb2a957f20aa4bc92 blob + 93c85d4af115faad37aa44862f589a1d761a2ff1 --- src/db.rs +++ src/db.rs @@ -32,7 +32,7 @@ pub async fn init_db(db_path: &str) -> Result, post: &Post) -> Result<(), Error> { +pub async fn insert_item(conn: &mut PoolConnection, post: &Post) -> Result<(), Error> { sqlx::query!( "insert or ignore into posts (guid, title, url, pub_date, content) values (?, ?, ?, ?, ?)", post.guid, @@ -41,9 +41,17 @@ pub async fn insert_item(mut conn: PoolConnection) -> Result<(), Error> { + sqlx::query!("update posts set sent = true where guid = ?", guid) + .execute(conn) + .await + .context(GeneralDatabaseSnafu)?; + Ok(()) +} blob - 29d0bf7e8ec045d8ee2c556bc47934a9a7741e4f blob + ad8ddc682acdf344a36023c033508bcd6081bdd6 --- src/feed.rs +++ src/feed.rs @@ -25,7 +25,7 @@ pub async fn fetch_new(url: S) -> Result, where S: AsRef + Send, { - debug!("Fetching feed for {}", url.as_ref()); + log::debug!("Fetching feed for {}", url.as_ref()); let url = url.as_ref(); let content = reqwest::get(url) .await @@ -71,7 +71,7 @@ pub fn fetch_new_rss(bytes: &[u8]) -> Result .filter_map(|i| match i.try_into() { Ok(p) => Some(p), Err(e) => { - error!("Unable to convert received post, continuing ({e})"); + log::error!("Unable to convert received post, continuing ({e})"); None } }) @@ -92,7 +92,7 @@ pub fn fetch_new_atom(bytes: &[u8]) -> Result Some(e), Err(e) => { - error!("Unable to convert received post, continuing ({e})"); + log::error!("Unable to convert received post, continuing ({e})"); None } }) blob - 4e71447e5583ff7d6ea7b8bc25a3ad30fbd60f11 blob + d5e403073fc666b349c519b434c4bbcdcc56b4df --- src/mail.rs +++ src/mail.rs @@ -50,7 +50,7 @@ impl Mail { to: &'a str, mailer: &AsyncSmtpTransport, ) -> Result<(), Error> { - trace!("Sending to {}: {}", to, &self.subject); + log::trace!("Sending to {}: {}", to, &self.subject); let from = from.parse().map_err(|_| Error::Parse { value: from.to_string(), })?; blob - 74e412950d4074b2b18a24b90b59db88a0a9d388 blob + 985a33d56b044b40925de14f2ad86dc8d675fffe --- src/main.rs +++ src/main.rs @@ -13,9 +13,6 @@ )] #![allow(clippy::missing_errors_doc, clippy::module_name_repetitions)] -#[macro_use] -extern crate log; - pub mod cli; pub mod config; pub mod db; @@ -25,20 +22,22 @@ pub mod models; use crate::{ config::AppConfig, - db::{init_db, insert_item}, + db::{init_db, insert_item, set_sent}, mail::{get_mailer, Mail}, }; use lettre::{AsyncSmtpTransport, Tokio1Executor}; +use models::Post; use snafu::{prelude::*, Snafu}; +use sqlx::{pool::PoolConnection, Sqlite}; use std::{ fs::File, io::{BufRead, BufReader}, - sync::Arc, }; +use tokio::{ + sync::mpsc::{self, Receiver, Sender}, + task::JoinSet, +}; -use sqlx::Sqlite; -use tokio::task::JoinSet; - #[derive(Debug, Snafu)] pub enum Error { #[snafu(display("Io Error at \"{path}\": {source}"))] @@ -66,33 +65,115 @@ async fn main() { } } -async fn app_main() -> Result<(), Error> { - let config = Arc::new(AppConfig::new().context(ConfigSnafu)?); - let urls: Vec = - BufReader::new(File::open(config.urls_path.as_str()).context(IoSnafu { - path: &config.urls_path, - })?) - .lines() - .map_while(Result::ok) - .filter(|l| !l.starts_with('#')) - .collect(); +async fn fetcher(urls: Vec, tx: Sender) -> Result<(), Error> { + let mut set = JoinSet::new(); + for u in urls { + set.spawn(async move { feed::fetch_new(u).await }); + } - let pool = init_db(&config.database_path).await?; + let mut aborted = false; + 'outer: while let Some(new) = set.join_next().await { + let posts = match new.context(JoinSetSnafu)? { + Ok(p) => p, + Err(e) => { + log::error!("Error while fetching feed: {e}"); + continue; + } + }; - if config.no_fetch { - info!("Not fetching any feeds as \"--no-fetch\" has been passed"); - } else { - fetch_feeds(urls, &pool).await?; + for post in posts { + if tx.send(post).await.is_err() { + // It doesn't make sense to fetch any more messages if they're not processed by the + // accumulator. Abort here. + log::error!("Receiver closed the connection. Aborting fetcher."); + aborted = true; + break 'outer; + } + } } - let results = sqlx::query_as!( + if aborted { + set.shutdown().await; + } + + Ok(()) +} + +async fn accumulator( + mut conn: PoolConnection, + mut rx: Receiver, + tx: Sender, +) -> Result<(), Error> { + // FIXME: race condition: most of the time, this sends 2 mails if the entry is already in the + // database but not sent. the entry gets fetched (from the database) and sent, and during that + // time the same entry also gets fetched (from the url) and sent. + let unsent_posts = sqlx::query_as!( models::Post, "select * from posts where sent != true order by pub_date desc" ) - .fetch_all(&pool) + .fetch_all(&mut conn) .await .context(GeneralDatabaseSnafu)?; + let mut sent_posts = Vec::new(); + let mut stop_sending = false; + for post in unsent_posts { + let guid = post.guid.clone(); + if tx.send(post).await.is_err() { + log::error!( + "Sender worker closed the receiving side of the channel. Aborting sending messages." + ); + stop_sending = true; + break; + } + sent_posts.push(guid); + } + + // XXX: combining "while let" and other conditions is unstable + while let Some(post) = rx.recv().await { + let db_post = sqlx::query_as!( + models::Post, + "select * + from posts + where guid = ?", + post.guid, + ) + .fetch_optional(&mut conn) + .await + .context(GeneralDatabaseSnafu)?; + + if db_post.is_none() { + log::debug!("Inserting post with guid '{}'", post.guid); + insert_item(&mut conn, &post).await?; + } + + // Don't send this post if it it is already in the database and has been sent before or it + // was in the database and not sent during startup. + // This avoids possibly sending a post twice if it is unsent in the database during startup + // due to a race condition that can send the post at startup once and after fetching again + // since the sent status is only set after successfully sending the post over email in the + // Sender task. + let sent = db_post.is_some_and(|p| p.sent) || sent_posts.contains(&post.guid); + + // Ignore the error and don't send any messages from this point on. The reason is + // that if the sender stops and closes the stream, the rest of the program can keep + // running just fine, fetching and inserting posts into the database. + // This is true for both possible cases of the sender crashing and the user not + // requesting to send messages in the first place. + if !stop_sending && !sent && tx.send(post).await.is_err() { + log::debug!("Sender worker closed the receiving side of the channel."); + stop_sending = true; + } + } + + Ok(()) +} + +async fn sender( + config: &AppConfig, + mut conn: PoolConnection, + mut rx: Receiver, +) -> Result<(), Error> { let mailer = get_mailer( config.smtp_user.clone(), config.smtp_password.clone(), @@ -102,83 +183,77 @@ async fn app_main() -> Result<(), Error> { ) .context(MailSnafu)?; - let mut handles = JoinSet::new(); - for result in results { - let mut conn = pool.acquire().await.context(GeneralDatabaseSnafu)?; - let mailer = mailer.clone(); - let config = config.clone(); - - handles.spawn(async move { - send_post( - &mut conn, - mailer, - &config.mail_from, - &config.mail_to, - result, - config.dry_run, - ) - .await - }); + while let Some(post) = rx.recv().await { + send_post( + &mut conn, + mailer.clone(), + &config.mail_from, + &config.mail_to, + post, + ) + .await?; } - while let Some(handle) = handles.join_next().await { - // TODO: retry sending mail instead? user should specify number of retries in config - if let Err(e) = handle.context(JoinSetSnafu)? { - log::error!("An error occured while sending an email: {e}"); - } - } - Ok(()) } -async fn fetch_feeds(urls: Vec, pool: &sqlx::Pool) -> Result<(), Error> { - let mut set = JoinSet::new(); - for u in urls { - set.spawn(async move { feed::fetch_new(u).await }); - } +async fn app_main() -> Result<(), Error> { + let config = AppConfig::new().context(ConfigSnafu)?; + let urls: Vec = + BufReader::new(File::open(config.urls_path.as_str()).context(IoSnafu { + path: &config.urls_path, + })?) + .lines() + .map_while(Result::ok) + .filter(|l| !l.starts_with('#')) + .collect(); - while let Some(new) = set.join_next().await { - let posts = match new.context(JoinSetSnafu)? { - Ok(p) => p, - Err(e) => { - log::error!("Error while fetching feed: {e}"); - continue; - } - }; + let pool = init_db(&config.database_path).await?; - for i in posts { - let conn = pool.acquire().await.context(GeneralDatabaseSnafu)?; - insert_item(conn, &i).await?; + let (fetch_tx, acc_rx) = mpsc::channel::(100); + let (acc_tx, send_rx) = mpsc::channel::(10); + let mut join_set = JoinSet::new(); + + if config.no_fetch { + log::info!("Not fetching any feeds as \"--no-fetch\" has been passed"); + // close fetch transmitter to signal accumulator that no posts will be sent + drop(fetch_tx); + } else { + join_set.spawn(async move { fetcher(urls, fetch_tx).await }); + } + if config.dry_run { + log::info!("Not sending any emails as \"--dry-run\" has been passed"); + } + let conn = pool.acquire().await.context(GeneralDatabaseSnafu)?; + join_set.spawn(async move { accumulator(conn, acc_rx, acc_tx).await }); + if !config.dry_run { + let conn = pool.acquire().await.context(GeneralDatabaseSnafu)?; + join_set.spawn(async move { sender(&config, conn, send_rx).await }); + } + + while let Some(task) = join_set.join_next().await { + if let Err(e) = task { + log::error!("Task failed: {e}"); } } Ok(()) } -async fn send_post<'a, E>( - conn: E, +async fn send_post<'a>( + conn: &mut PoolConnection, mailer: AsyncSmtpTransport, from: &'a str, to: &'a str, post: models::Post, - dry_run: bool, -) -> Result<(), Error> -where - E: sqlx::Executor<'a, Database = Sqlite>, -{ - if dry_run { - info!("Not sending any emails as \"--dry-run\" has been passed"); - } else { - Mail::new(post.title, post.content, post.url) - .send_email(from, to, &mailer) - .await - .context(MailSnafu)?; - } - - sqlx::query!("update posts set sent = true where guid = ?", post.guid) - .execute(conn) +) -> Result<(), Error> { + log::debug!("Sending post with guid '{}'", post.guid); + Mail::new(post.title, post.content, post.url) + .send_email(from, to, &mailer) .await - .context(GeneralDatabaseSnafu)?; + .context(MailSnafu)?; + set_sent(&post.guid, conn).await?; + Ok(()) }