Commit Diff


commit - 517f0f6252a4398a77ad82c027ecdcc7ba8e3f29
commit + a60f9bd921b2b6be0b45d81d723c112aea5d90ca
blob - /dev/null
blob + 36a0a25ca401b75ff4b19f80ca34f4ced938b92a (mode 644)
--- /dev/null
+++ docs/ARCHITECTURE.md
@@ -0,0 +1,25 @@
+# rss-email architecture
+
+## Overview
+
+FETCHER -> ACCUMULATOR -> SENDER
+
+### FETCHER
+
+The FETCHER just fetches RSS/Atom feeds. It then sends these feeds to the
+ACCUMULATOR.
+
+### ACCUMULATOR
+
+The ACCUMULATOR first reads all unsent feeds from the database and sends
+these to the SENDER.
+It also receives feeds from the FETCHER, checks the database for whether
+these have been sent yet, and, if they haven't, forward these to the
+SENDER.
+
+In other words, the ACCUMULATOR accumulates both old feeds from the
+database and new feeds from the internet.
+
+### SENDER
+
+The SENDER receives feeds from the ACCUMULATOR and sends them.
blob - 780b448bbd3b5418e9d7e0db9205d275b8801a74
blob + bc83b8caf0b9d65e05175271b9eea967851e46e7
--- sqlx-data.json
+++ sqlx-data.json
@@ -58,6 +58,54 @@
     },
     "query": "select * from posts where sent != true order by pub_date desc"
   },
+  "a0e1000c1a200a436e2f33b36adf4ca6b7a8edcc950c9b22a161ba2d47d8315b": {
+    "describe": {
+      "columns": [
+        {
+          "name": "guid",
+          "ordinal": 0,
+          "type_info": "Text"
+        },
+        {
+          "name": "title",
+          "ordinal": 1,
+          "type_info": "Text"
+        },
+        {
+          "name": "url",
+          "ordinal": 2,
+          "type_info": "Text"
+        },
+        {
+          "name": "pub_date",
+          "ordinal": 3,
+          "type_info": "Int64"
+        },
+        {
+          "name": "content",
+          "ordinal": 4,
+          "type_info": "Text"
+        },
+        {
+          "name": "sent",
+          "ordinal": 5,
+          "type_info": "Bool"
+        }
+      ],
+      "nullable": [
+        false,
+        true,
+        true,
+        true,
+        true,
+        false
+      ],
+      "parameters": {
+        "Right": 1
+      }
+    },
+    "query": "select *\n                     from posts\n                     where guid = ?"
+  },
   "e8c193917469a0ee24db731dd5ccef549b01e97eb3d05deb46706bd702b587fc": {
     "describe": {
       "columns": [],
blob - 897732853817e93e172b5ba94464b93b1e005ce7
blob + 4b2a8c5f8c65a9cbe84a160921f2df1ce08c870b
--- src/config.rs
+++ src/config.rs
@@ -113,7 +113,7 @@ impl AppConfig {
                 Some(smtp_port),
                 Some(smtp_starttls),
             ) => {
-                trace!("All necessary config values have been given on the command line, no need to check for config file");
+                log::trace!("All necessary config values have been given on the command line, no need to check for config file");
                 s.mail_from = mail_from;
                 s.mail_to = mail_to;
                 s.smtp_user = smtp_user;
blob - 90ed941fb643996268fc37fdb2a957f20aa4bc92
blob + 93c85d4af115faad37aa44862f589a1d761a2ff1
--- src/db.rs
+++ src/db.rs
@@ -32,7 +32,7 @@ pub async fn init_db(db_path: &str) -> Result<sqlx::Po
 /// # Errors
 ///
 /// An error occurs if the statement cannot be executed.
-pub async fn insert_item(mut conn: PoolConnection<Sqlite>, post: &Post) -> Result<(), Error> {
+pub async fn insert_item(conn: &mut PoolConnection<Sqlite>, post: &Post) -> Result<(), Error> {
     sqlx::query!(
         "insert or ignore into posts (guid, title, url, pub_date, content) values (?, ?, ?, ?, ?)",
         post.guid,
@@ -41,9 +41,17 @@ pub async fn insert_item(mut conn: PoolConnection<Sqli
         post.pub_date,
         post.content
     )
-    .execute(&mut conn)
+    .execute(conn)
     .await
     .context(GeneralDatabaseSnafu)?;
 
     Ok(())
 }
+
+pub async fn set_sent(guid: &str, conn: &mut PoolConnection<Sqlite>) -> Result<(), Error> {
+    sqlx::query!("update posts set sent = true where guid = ?", guid)
+        .execute(conn)
+        .await
+        .context(GeneralDatabaseSnafu)?;
+    Ok(())
+}
blob - 29d0bf7e8ec045d8ee2c556bc47934a9a7741e4f
blob + ad8ddc682acdf344a36023c033508bcd6081bdd6
--- src/feed.rs
+++ src/feed.rs
@@ -25,7 +25,7 @@ pub async fn fetch_new<S>(url: S) -> Result<Vec<Post>,
 where
     S: AsRef<str> + Send,
 {
-    debug!("Fetching feed for {}", url.as_ref());
+    log::debug!("Fetching feed for {}", url.as_ref());
     let url = url.as_ref();
     let content = reqwest::get(url)
         .await
@@ -71,7 +71,7 @@ pub fn fetch_new_rss(bytes: &[u8]) -> Result<Vec<Post>
         .filter_map(|i| match i.try_into() {
             Ok(p) => Some(p),
             Err(e) => {
-                error!("Unable to convert received post, continuing ({e})");
+                log::error!("Unable to convert received post, continuing ({e})");
                 None
             }
         })
@@ -92,7 +92,7 @@ pub fn fetch_new_atom(bytes: &[u8]) -> Result<Vec<Post
         .filter_map(|e| match e.try_into() {
             Ok(e) => Some(e),
             Err(e) => {
-                error!("Unable to convert received post, continuing ({e})");
+                log::error!("Unable to convert received post, continuing ({e})");
                 None
             }
         })
blob - 4e71447e5583ff7d6ea7b8bc25a3ad30fbd60f11
blob + d5e403073fc666b349c519b434c4bbcdcc56b4df
--- src/mail.rs
+++ src/mail.rs
@@ -50,7 +50,7 @@ impl Mail {
         to: &'a str,
         mailer: &AsyncSmtpTransport<Tokio1Executor>,
     ) -> Result<(), Error> {
-        trace!("Sending to {}: {}", to, &self.subject);
+        log::trace!("Sending to {}: {}", to, &self.subject);
         let from = from.parse().map_err(|_| Error::Parse {
             value: from.to_string(),
         })?;
blob - 74e412950d4074b2b18a24b90b59db88a0a9d388
blob + 985a33d56b044b40925de14f2ad86dc8d675fffe
--- src/main.rs
+++ src/main.rs
@@ -13,9 +13,6 @@
 )]
 #![allow(clippy::missing_errors_doc, clippy::module_name_repetitions)]
 
-#[macro_use]
-extern crate log;
-
 pub mod cli;
 pub mod config;
 pub mod db;
@@ -25,20 +22,22 @@ pub mod models;
 
 use crate::{
     config::AppConfig,
-    db::{init_db, insert_item},
+    db::{init_db, insert_item, set_sent},
     mail::{get_mailer, Mail},
 };
 use lettre::{AsyncSmtpTransport, Tokio1Executor};
+use models::Post;
 use snafu::{prelude::*, Snafu};
+use sqlx::{pool::PoolConnection, Sqlite};
 use std::{
     fs::File,
     io::{BufRead, BufReader},
-    sync::Arc,
 };
+use tokio::{
+    sync::mpsc::{self, Receiver, Sender},
+    task::JoinSet,
+};
 
-use sqlx::Sqlite;
-use tokio::task::JoinSet;
-
 #[derive(Debug, Snafu)]
 pub enum Error {
     #[snafu(display("Io Error at \"{path}\": {source}"))]
@@ -66,33 +65,115 @@ async fn main() {
     }
 }
 
-async fn app_main() -> Result<(), Error> {
-    let config = Arc::new(AppConfig::new().context(ConfigSnafu)?);
-    let urls: Vec<String> =
-        BufReader::new(File::open(config.urls_path.as_str()).context(IoSnafu {
-            path: &config.urls_path,
-        })?)
-        .lines()
-        .map_while(Result::ok)
-        .filter(|l| !l.starts_with('#'))
-        .collect();
+async fn fetcher(urls: Vec<String>, tx: Sender<Post>) -> Result<(), Error> {
+    let mut set = JoinSet::new();
+    for u in urls {
+        set.spawn(async move { feed::fetch_new(u).await });
+    }
 
-    let pool = init_db(&config.database_path).await?;
+    let mut aborted = false;
+    'outer: while let Some(new) = set.join_next().await {
+        let posts = match new.context(JoinSetSnafu)? {
+            Ok(p) => p,
+            Err(e) => {
+                log::error!("Error while fetching feed: {e}");
+                continue;
+            }
+        };
 
-    if config.no_fetch {
-        info!("Not fetching any feeds as \"--no-fetch\" has been passed");
-    } else {
-        fetch_feeds(urls, &pool).await?;
+        for post in posts {
+            if tx.send(post).await.is_err() {
+                // It doesn't make sense to fetch any more messages if they're not processed by the
+                // accumulator. Abort here.
+                log::error!("Receiver closed the connection. Aborting fetcher.");
+                aborted = true;
+                break 'outer;
+            }
+        }
     }
 
-    let results = sqlx::query_as!(
+    if aborted {
+        set.shutdown().await;
+    }
+
+    Ok(())
+}
+
+async fn accumulator(
+    mut conn: PoolConnection<Sqlite>,
+    mut rx: Receiver<Post>,
+    tx: Sender<Post>,
+) -> Result<(), Error> {
+    // FIXME: race condition: most of the time, this sends 2 mails if the entry is already in the
+    // database but not sent. the entry gets fetched (from the database) and sent, and during that
+    // time the same entry also gets fetched (from the url) and sent.
+    let unsent_posts = sqlx::query_as!(
         models::Post,
         "select * from posts where sent != true order by pub_date desc"
     )
-    .fetch_all(&pool)
+    .fetch_all(&mut conn)
     .await
     .context(GeneralDatabaseSnafu)?;
 
+    let mut sent_posts = Vec::new();
+    let mut stop_sending = false;
+    for post in unsent_posts {
+        let guid = post.guid.clone();
+        if tx.send(post).await.is_err() {
+            log::error!(
+                "Sender worker closed the receiving side of the channel. Aborting sending messages."
+            );
+            stop_sending = true;
+            break;
+        }
+        sent_posts.push(guid);
+    }
+
+    // XXX: combining "while let" and other conditions is unstable
+    while let Some(post) = rx.recv().await {
+        let db_post = sqlx::query_as!(
+            models::Post,
+            "select *
+                     from posts
+                     where guid = ?",
+            post.guid,
+        )
+        .fetch_optional(&mut conn)
+        .await
+        .context(GeneralDatabaseSnafu)?;
+
+        if db_post.is_none() {
+            log::debug!("Inserting post with guid '{}'", post.guid);
+            insert_item(&mut conn, &post).await?;
+        }
+
+        // Don't send this post if it it is already in the database and has been sent before or it
+        // was in the database and not sent during startup.
+        // This avoids possibly sending a post twice if it is unsent in the database during startup
+        // due to a race condition that can send the post at startup once and after fetching again
+        // since the sent status is only set after successfully sending the post over email in the
+        // Sender task.
+        let sent = db_post.is_some_and(|p| p.sent) || sent_posts.contains(&post.guid);
+
+        // Ignore the error and don't send any messages from this point on. The reason is
+        // that if the sender stops and closes the stream, the rest of the program can keep
+        // running just fine, fetching and inserting posts into the database.
+        // This is true for both possible cases of the sender crashing and the user not
+        // requesting to send messages in the first place.
+        if !stop_sending && !sent && tx.send(post).await.is_err() {
+            log::debug!("Sender worker closed the receiving side of the channel.");
+            stop_sending = true;
+        }
+    }
+
+    Ok(())
+}
+
+async fn sender(
+    config: &AppConfig,
+    mut conn: PoolConnection<Sqlite>,
+    mut rx: Receiver<Post>,
+) -> Result<(), Error> {
     let mailer = get_mailer(
         config.smtp_user.clone(),
         config.smtp_password.clone(),
@@ -102,83 +183,77 @@ async fn app_main() -> Result<(), Error> {
     )
     .context(MailSnafu)?;
 
-    let mut handles = JoinSet::new();
-    for result in results {
-        let mut conn = pool.acquire().await.context(GeneralDatabaseSnafu)?;
-        let mailer = mailer.clone();
-        let config = config.clone();
-
-        handles.spawn(async move {
-            send_post(
-                &mut conn,
-                mailer,
-                &config.mail_from,
-                &config.mail_to,
-                result,
-                config.dry_run,
-            )
-            .await
-        });
+    while let Some(post) = rx.recv().await {
+        send_post(
+            &mut conn,
+            mailer.clone(),
+            &config.mail_from,
+            &config.mail_to,
+            post,
+        )
+        .await?;
     }
 
-    while let Some(handle) = handles.join_next().await {
-        // TODO: retry sending mail instead? user should specify number of retries in config
-        if let Err(e) = handle.context(JoinSetSnafu)? {
-            log::error!("An error occured while sending an email: {e}");
-        }
-    }
-
     Ok(())
 }
 
-async fn fetch_feeds(urls: Vec<String>, pool: &sqlx::Pool<Sqlite>) -> Result<(), Error> {
-    let mut set = JoinSet::new();
-    for u in urls {
-        set.spawn(async move { feed::fetch_new(u).await });
-    }
+async fn app_main() -> Result<(), Error> {
+    let config = AppConfig::new().context(ConfigSnafu)?;
+    let urls: Vec<String> =
+        BufReader::new(File::open(config.urls_path.as_str()).context(IoSnafu {
+            path: &config.urls_path,
+        })?)
+        .lines()
+        .map_while(Result::ok)
+        .filter(|l| !l.starts_with('#'))
+        .collect();
 
-    while let Some(new) = set.join_next().await {
-        let posts = match new.context(JoinSetSnafu)? {
-            Ok(p) => p,
-            Err(e) => {
-                log::error!("Error while fetching feed: {e}");
-                continue;
-            }
-        };
+    let pool = init_db(&config.database_path).await?;
 
-        for i in posts {
-            let conn = pool.acquire().await.context(GeneralDatabaseSnafu)?;
-            insert_item(conn, &i).await?;
+    let (fetch_tx, acc_rx) = mpsc::channel::<Post>(100);
+    let (acc_tx, send_rx) = mpsc::channel::<Post>(10);
+    let mut join_set = JoinSet::new();
+
+    if config.no_fetch {
+        log::info!("Not fetching any feeds as \"--no-fetch\" has been passed");
+        // close fetch transmitter to signal accumulator that no posts will be sent
+        drop(fetch_tx);
+    } else {
+        join_set.spawn(async move { fetcher(urls, fetch_tx).await });
+    }
+    if config.dry_run {
+        log::info!("Not sending any emails as \"--dry-run\" has been passed");
+    }
+    let conn = pool.acquire().await.context(GeneralDatabaseSnafu)?;
+    join_set.spawn(async move { accumulator(conn, acc_rx, acc_tx).await });
+    if !config.dry_run {
+        let conn = pool.acquire().await.context(GeneralDatabaseSnafu)?;
+        join_set.spawn(async move { sender(&config, conn, send_rx).await });
+    }
+
+    while let Some(task) = join_set.join_next().await {
+        if let Err(e) = task {
+            log::error!("Task failed: {e}");
         }
     }
 
     Ok(())
 }
 
-async fn send_post<'a, E>(
-    conn: E,
+async fn send_post<'a>(
+    conn: &mut PoolConnection<Sqlite>,
     mailer: AsyncSmtpTransport<Tokio1Executor>,
     from: &'a str,
     to: &'a str,
     post: models::Post,
-    dry_run: bool,
-) -> Result<(), Error>
-where
-    E: sqlx::Executor<'a, Database = Sqlite>,
-{
-    if dry_run {
-        info!("Not sending any emails as \"--dry-run\" has been passed");
-    } else {
-        Mail::new(post.title, post.content, post.url)
-            .send_email(from, to, &mailer)
-            .await
-            .context(MailSnafu)?;
-    }
-
-    sqlx::query!("update posts set sent = true where guid = ?", post.guid)
-        .execute(conn)
+) -> Result<(), Error> {
+    log::debug!("Sending post with guid '{}'", post.guid);
+    Mail::new(post.title, post.content, post.url)
+        .send_email(from, to, &mailer)
         .await
-        .context(GeneralDatabaseSnafu)?;
+        .context(MailSnafu)?;
 
+    set_sent(&post.guid, conn).await?;
+
     Ok(())
 }