commit - 517f0f6252a4398a77ad82c027ecdcc7ba8e3f29
commit + a60f9bd921b2b6be0b45d81d723c112aea5d90ca
blob - /dev/null
blob + 36a0a25ca401b75ff4b19f80ca34f4ced938b92a (mode 644)
--- /dev/null
+++ docs/ARCHITECTURE.md
+# rss-email architecture
+
+## Overview
+
+FETCHER -> ACCUMULATOR -> SENDER
+
+### FETCHER
+
+The FETCHER just fetches RSS/Atom feeds. It then sends these feeds to the
+ACCUMULATOR.
+
+### ACCUMULATOR
+
+The ACCUMULATOR first reads all unsent feeds from the database and sends
+these to the SENDER.
+It also receives feeds from the FETCHER, checks the database for whether
+these have been sent yet, and, if they haven't, forward these to the
+SENDER.
+
+In other words, the ACCUMULATOR accumulates both old feeds from the
+database and new feeds from the internet.
+
+### SENDER
+
+The SENDER receives feeds from the ACCUMULATOR and sends them.
blob - 780b448bbd3b5418e9d7e0db9205d275b8801a74
blob + bc83b8caf0b9d65e05175271b9eea967851e46e7
--- sqlx-data.json
+++ sqlx-data.json
},
"query": "select * from posts where sent != true order by pub_date desc"
},
+ "a0e1000c1a200a436e2f33b36adf4ca6b7a8edcc950c9b22a161ba2d47d8315b": {
+ "describe": {
+ "columns": [
+ {
+ "name": "guid",
+ "ordinal": 0,
+ "type_info": "Text"
+ },
+ {
+ "name": "title",
+ "ordinal": 1,
+ "type_info": "Text"
+ },
+ {
+ "name": "url",
+ "ordinal": 2,
+ "type_info": "Text"
+ },
+ {
+ "name": "pub_date",
+ "ordinal": 3,
+ "type_info": "Int64"
+ },
+ {
+ "name": "content",
+ "ordinal": 4,
+ "type_info": "Text"
+ },
+ {
+ "name": "sent",
+ "ordinal": 5,
+ "type_info": "Bool"
+ }
+ ],
+ "nullable": [
+ false,
+ true,
+ true,
+ true,
+ true,
+ false
+ ],
+ "parameters": {
+ "Right": 1
+ }
+ },
+ "query": "select *\n from posts\n where guid = ?"
+ },
"e8c193917469a0ee24db731dd5ccef549b01e97eb3d05deb46706bd702b587fc": {
"describe": {
"columns": [],
blob - 897732853817e93e172b5ba94464b93b1e005ce7
blob + 4b2a8c5f8c65a9cbe84a160921f2df1ce08c870b
--- src/config.rs
+++ src/config.rs
Some(smtp_port),
Some(smtp_starttls),
) => {
- trace!("All necessary config values have been given on the command line, no need to check for config file");
+ log::trace!("All necessary config values have been given on the command line, no need to check for config file");
s.mail_from = mail_from;
s.mail_to = mail_to;
s.smtp_user = smtp_user;
blob - 90ed941fb643996268fc37fdb2a957f20aa4bc92
blob + 93c85d4af115faad37aa44862f589a1d761a2ff1
--- src/db.rs
+++ src/db.rs
/// # Errors
///
/// An error occurs if the statement cannot be executed.
-pub async fn insert_item(mut conn: PoolConnection<Sqlite>, post: &Post) -> Result<(), Error> {
+pub async fn insert_item(conn: &mut PoolConnection<Sqlite>, post: &Post) -> Result<(), Error> {
sqlx::query!(
"insert or ignore into posts (guid, title, url, pub_date, content) values (?, ?, ?, ?, ?)",
post.guid,
post.pub_date,
post.content
)
- .execute(&mut conn)
+ .execute(conn)
.await
.context(GeneralDatabaseSnafu)?;
Ok(())
}
+
+pub async fn set_sent(guid: &str, conn: &mut PoolConnection<Sqlite>) -> Result<(), Error> {
+ sqlx::query!("update posts set sent = true where guid = ?", guid)
+ .execute(conn)
+ .await
+ .context(GeneralDatabaseSnafu)?;
+ Ok(())
+}
blob - 29d0bf7e8ec045d8ee2c556bc47934a9a7741e4f
blob + ad8ddc682acdf344a36023c033508bcd6081bdd6
--- src/feed.rs
+++ src/feed.rs
where
S: AsRef<str> + Send,
{
- debug!("Fetching feed for {}", url.as_ref());
+ log::debug!("Fetching feed for {}", url.as_ref());
let url = url.as_ref();
let content = reqwest::get(url)
.await
.filter_map(|i| match i.try_into() {
Ok(p) => Some(p),
Err(e) => {
- error!("Unable to convert received post, continuing ({e})");
+ log::error!("Unable to convert received post, continuing ({e})");
None
}
})
.filter_map(|e| match e.try_into() {
Ok(e) => Some(e),
Err(e) => {
- error!("Unable to convert received post, continuing ({e})");
+ log::error!("Unable to convert received post, continuing ({e})");
None
}
})
blob - 4e71447e5583ff7d6ea7b8bc25a3ad30fbd60f11
blob + d5e403073fc666b349c519b434c4bbcdcc56b4df
--- src/mail.rs
+++ src/mail.rs
to: &'a str,
mailer: &AsyncSmtpTransport<Tokio1Executor>,
) -> Result<(), Error> {
- trace!("Sending to {}: {}", to, &self.subject);
+ log::trace!("Sending to {}: {}", to, &self.subject);
let from = from.parse().map_err(|_| Error::Parse {
value: from.to_string(),
})?;
blob - 74e412950d4074b2b18a24b90b59db88a0a9d388
blob + 985a33d56b044b40925de14f2ad86dc8d675fffe
--- src/main.rs
+++ src/main.rs
)]
#![allow(clippy::missing_errors_doc, clippy::module_name_repetitions)]
-#[macro_use]
-extern crate log;
-
pub mod cli;
pub mod config;
pub mod db;
use crate::{
config::AppConfig,
- db::{init_db, insert_item},
+ db::{init_db, insert_item, set_sent},
mail::{get_mailer, Mail},
};
use lettre::{AsyncSmtpTransport, Tokio1Executor};
+use models::Post;
use snafu::{prelude::*, Snafu};
+use sqlx::{pool::PoolConnection, Sqlite};
use std::{
fs::File,
io::{BufRead, BufReader},
- sync::Arc,
};
+use tokio::{
+ sync::mpsc::{self, Receiver, Sender},
+ task::JoinSet,
+};
-use sqlx::Sqlite;
-use tokio::task::JoinSet;
-
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Io Error at \"{path}\": {source}"))]
}
}
-async fn app_main() -> Result<(), Error> {
- let config = Arc::new(AppConfig::new().context(ConfigSnafu)?);
- let urls: Vec<String> =
- BufReader::new(File::open(config.urls_path.as_str()).context(IoSnafu {
- path: &config.urls_path,
- })?)
- .lines()
- .map_while(Result::ok)
- .filter(|l| !l.starts_with('#'))
- .collect();
+async fn fetcher(urls: Vec<String>, tx: Sender<Post>) -> Result<(), Error> {
+ let mut set = JoinSet::new();
+ for u in urls {
+ set.spawn(async move { feed::fetch_new(u).await });
+ }
- let pool = init_db(&config.database_path).await?;
+ let mut aborted = false;
+ 'outer: while let Some(new) = set.join_next().await {
+ let posts = match new.context(JoinSetSnafu)? {
+ Ok(p) => p,
+ Err(e) => {
+ log::error!("Error while fetching feed: {e}");
+ continue;
+ }
+ };
- if config.no_fetch {
- info!("Not fetching any feeds as \"--no-fetch\" has been passed");
- } else {
- fetch_feeds(urls, &pool).await?;
+ for post in posts {
+ if tx.send(post).await.is_err() {
+ // It doesn't make sense to fetch any more messages if they're not processed by the
+ // accumulator. Abort here.
+ log::error!("Receiver closed the connection. Aborting fetcher.");
+ aborted = true;
+ break 'outer;
+ }
+ }
}
- let results = sqlx::query_as!(
+ if aborted {
+ set.shutdown().await;
+ }
+
+ Ok(())
+}
+
+async fn accumulator(
+ mut conn: PoolConnection<Sqlite>,
+ mut rx: Receiver<Post>,
+ tx: Sender<Post>,
+) -> Result<(), Error> {
+ // FIXME: race condition: most of the time, this sends 2 mails if the entry is already in the
+ // database but not sent. the entry gets fetched (from the database) and sent, and during that
+ // time the same entry also gets fetched (from the url) and sent.
+ let unsent_posts = sqlx::query_as!(
models::Post,
"select * from posts where sent != true order by pub_date desc"
)
- .fetch_all(&pool)
+ .fetch_all(&mut conn)
.await
.context(GeneralDatabaseSnafu)?;
+ let mut sent_posts = Vec::new();
+ let mut stop_sending = false;
+ for post in unsent_posts {
+ let guid = post.guid.clone();
+ if tx.send(post).await.is_err() {
+ log::error!(
+ "Sender worker closed the receiving side of the channel. Aborting sending messages."
+ );
+ stop_sending = true;
+ break;
+ }
+ sent_posts.push(guid);
+ }
+
+ // XXX: combining "while let" and other conditions is unstable
+ while let Some(post) = rx.recv().await {
+ let db_post = sqlx::query_as!(
+ models::Post,
+ "select *
+ from posts
+ where guid = ?",
+ post.guid,
+ )
+ .fetch_optional(&mut conn)
+ .await
+ .context(GeneralDatabaseSnafu)?;
+
+ if db_post.is_none() {
+ log::debug!("Inserting post with guid '{}'", post.guid);
+ insert_item(&mut conn, &post).await?;
+ }
+
+ // Don't send this post if it it is already in the database and has been sent before or it
+ // was in the database and not sent during startup.
+ // This avoids possibly sending a post twice if it is unsent in the database during startup
+ // due to a race condition that can send the post at startup once and after fetching again
+ // since the sent status is only set after successfully sending the post over email in the
+ // Sender task.
+ let sent = db_post.is_some_and(|p| p.sent) || sent_posts.contains(&post.guid);
+
+ // Ignore the error and don't send any messages from this point on. The reason is
+ // that if the sender stops and closes the stream, the rest of the program can keep
+ // running just fine, fetching and inserting posts into the database.
+ // This is true for both possible cases of the sender crashing and the user not
+ // requesting to send messages in the first place.
+ if !stop_sending && !sent && tx.send(post).await.is_err() {
+ log::debug!("Sender worker closed the receiving side of the channel.");
+ stop_sending = true;
+ }
+ }
+
+ Ok(())
+}
+
+async fn sender(
+ config: &AppConfig,
+ mut conn: PoolConnection<Sqlite>,
+ mut rx: Receiver<Post>,
+) -> Result<(), Error> {
let mailer = get_mailer(
config.smtp_user.clone(),
config.smtp_password.clone(),
)
.context(MailSnafu)?;
- let mut handles = JoinSet::new();
- for result in results {
- let mut conn = pool.acquire().await.context(GeneralDatabaseSnafu)?;
- let mailer = mailer.clone();
- let config = config.clone();
-
- handles.spawn(async move {
- send_post(
- &mut conn,
- mailer,
- &config.mail_from,
- &config.mail_to,
- result,
- config.dry_run,
- )
- .await
- });
+ while let Some(post) = rx.recv().await {
+ send_post(
+ &mut conn,
+ mailer.clone(),
+ &config.mail_from,
+ &config.mail_to,
+ post,
+ )
+ .await?;
}
- while let Some(handle) = handles.join_next().await {
- // TODO: retry sending mail instead? user should specify number of retries in config
- if let Err(e) = handle.context(JoinSetSnafu)? {
- log::error!("An error occured while sending an email: {e}");
- }
- }
-
Ok(())
}
-async fn fetch_feeds(urls: Vec<String>, pool: &sqlx::Pool<Sqlite>) -> Result<(), Error> {
- let mut set = JoinSet::new();
- for u in urls {
- set.spawn(async move { feed::fetch_new(u).await });
- }
+async fn app_main() -> Result<(), Error> {
+ let config = AppConfig::new().context(ConfigSnafu)?;
+ let urls: Vec<String> =
+ BufReader::new(File::open(config.urls_path.as_str()).context(IoSnafu {
+ path: &config.urls_path,
+ })?)
+ .lines()
+ .map_while(Result::ok)
+ .filter(|l| !l.starts_with('#'))
+ .collect();
- while let Some(new) = set.join_next().await {
- let posts = match new.context(JoinSetSnafu)? {
- Ok(p) => p,
- Err(e) => {
- log::error!("Error while fetching feed: {e}");
- continue;
- }
- };
+ let pool = init_db(&config.database_path).await?;
- for i in posts {
- let conn = pool.acquire().await.context(GeneralDatabaseSnafu)?;
- insert_item(conn, &i).await?;
+ let (fetch_tx, acc_rx) = mpsc::channel::<Post>(100);
+ let (acc_tx, send_rx) = mpsc::channel::<Post>(10);
+ let mut join_set = JoinSet::new();
+
+ if config.no_fetch {
+ log::info!("Not fetching any feeds as \"--no-fetch\" has been passed");
+ // close fetch transmitter to signal accumulator that no posts will be sent
+ drop(fetch_tx);
+ } else {
+ join_set.spawn(async move { fetcher(urls, fetch_tx).await });
+ }
+ if config.dry_run {
+ log::info!("Not sending any emails as \"--dry-run\" has been passed");
+ }
+ let conn = pool.acquire().await.context(GeneralDatabaseSnafu)?;
+ join_set.spawn(async move { accumulator(conn, acc_rx, acc_tx).await });
+ if !config.dry_run {
+ let conn = pool.acquire().await.context(GeneralDatabaseSnafu)?;
+ join_set.spawn(async move { sender(&config, conn, send_rx).await });
+ }
+
+ while let Some(task) = join_set.join_next().await {
+ if let Err(e) = task {
+ log::error!("Task failed: {e}");
}
}
Ok(())
}
-async fn send_post<'a, E>(
- conn: E,
+async fn send_post<'a>(
+ conn: &mut PoolConnection<Sqlite>,
mailer: AsyncSmtpTransport<Tokio1Executor>,
from: &'a str,
to: &'a str,
post: models::Post,
- dry_run: bool,
-) -> Result<(), Error>
-where
- E: sqlx::Executor<'a, Database = Sqlite>,
-{
- if dry_run {
- info!("Not sending any emails as \"--dry-run\" has been passed");
- } else {
- Mail::new(post.title, post.content, post.url)
- .send_email(from, to, &mailer)
- .await
- .context(MailSnafu)?;
- }
-
- sqlx::query!("update posts set sent = true where guid = ?", post.guid)
- .execute(conn)
+) -> Result<(), Error> {
+ log::debug!("Sending post with guid '{}'", post.guid);
+ Mail::new(post.title, post.content, post.url)
+ .send_email(from, to, &mailer)
.await
- .context(GeneralDatabaseSnafu)?;
+ .context(MailSnafu)?;
+ set_sent(&post.guid, conn).await?;
+
Ok(())
}