commit 8dfc8f4385acc8f20be89b62b21ddace47f07a1d from: witcher date: Sat Dec 3 14:37:33 2022 UTC Add Atom support This patch adds Atom support besides the already existing RSS support. First the feed is trying to be interpreted as RSS, then as Atom if that didn't work. Implements: https://todo.sr.ht/~witcher/rss-email/15 commit - 72fbcb51f0ea090f720cfa8b159f8743bb75a079 commit + 8dfc8f4385acc8f20be89b62b21ddace47f07a1d blob - 3dcf69f4c611f5b3fab98d9344d38e27a52ca073 blob + ddcfc5149177be9bf422109cfe96d795ec3d0130 --- Cargo.lock +++ Cargo.lock @@ -1141,6 +1141,7 @@ name = "rss-email" version = "0.2.1" dependencies = [ "anyhow", + "atom_syndication", "chrono", "clap", "directories", blob - 08ff69107e32a1b03daeb02e837709dd37aafcbe blob + 299cf0f638fb2fefa23bf2310735930e3a319a98 --- Cargo.toml +++ Cargo.toml @@ -21,3 +21,4 @@ log = "0.4.17" env_logger = "0.9.0" tokio = { version = "1.21.2", default-features = false, features = ["rt-multi-thread", "macros"] } sqlx = { version = "0.6.2", features = ["runtime-tokio-rustls", "migrate", "sqlite", "offline"] } +atom_syndication = "0.11.0" blob - d217fc506c8db494b1eae9f74b48c4fc1969dd1e blob + b88fcb6c9489d9dddd0b323d934be7405fbe7aef --- src/db.rs +++ src/db.rs @@ -1,25 +1,11 @@ -use chrono::DateTime; -use rss::Item; use sqlx::pool::PoolConnection; use sqlx::Sqlite; +use crate::models::Post; + // inserts a new post or updates an old one with the same guid -pub async fn insert_item(mut conn: PoolConnection, item: &Item) -> anyhow::Result<()> { - let time = item.pub_date().map(|date| { - DateTime::parse_from_rfc2822(date) - .unwrap_or_else(|_| DateTime::default()) - .timestamp() - }); +pub async fn insert_item(mut conn: PoolConnection, post: &Post) -> anyhow::Result<()> { + sqlx::query!("insert or ignore into posts (guid, title, author, url, feedurl, pub_date, content) values (?, ?, ?, ?, ?, ?, ?)", post.guid, post.title, post.author, post.url, post.feedurl, post.pub_date, post.content).execute(&mut conn).await?; - let guid = item.guid().ok_or_else(|| anyhow!("No guid found"))?.value(); - let title = item.title(); - let author = item.author(); - let url = item.link(); - let feedurl = item.source().map(|s| s.url()); - let pub_date = time; - let content = item.content().or_else(|| item.description()); - - sqlx::query!("insert or ignore into posts (guid, title, author, url, feedurl, pub_date, content) values (?, ?, ?, ?, ?, ?, ?)", guid, title, author, url, feedurl, pub_date, content).execute(&mut conn).await?; - Ok(()) } blob - /dev/null blob + ef181a36c89292c45104d4bf4f07468aadda3466 (mode 644) --- /dev/null +++ src/feed.rs @@ -0,0 +1,46 @@ +use atom_syndication; +use rss; + +use crate::anyhow::Context; +use crate::models::Post; + +pub async fn fetch_new>(url: S) -> anyhow::Result> { + debug!("Fetching feed for {}", url.as_ref()); + let content = reqwest::get(url.as_ref()).await?.bytes().await?; + match fetch_new_rss(&content[..]).await { + Err(_) => fetch_new_atom(&content[..]).await, + p => p, + } +} + +pub async fn fetch_new_rss(bytes: &[u8]) -> anyhow::Result> { + let channel = rss::Channel::read_from(bytes).context("Unable to read from RSS feed")?; + + Ok(channel + .items + .into_iter() + .filter_map(|i| match i.try_into() { + Ok(p) => Some(p), + Err(e) => { + error!("Unable to convert received post, continuing ({e})"); + None + } + }) + .collect::>()) +} + +pub async fn fetch_new_atom(bytes: &[u8]) -> anyhow::Result> { + let feed = atom_syndication::Feed::read_from(bytes).context("Unable to read from atom feed")?; + + Ok(feed + .entries + .into_iter() + .filter_map(|e| match e.try_into() { + Ok(e) => Some(e), + Err(e) => { + error!("Unable to convert received post, continuing ({e})"); + None + } + }) + .collect::>()) +} blob - 68bf335cec8a60919dc6be79f56fe70973af6091 blob + c4a56aaabbfafc555d4dd6fa03b7cd320dd536d1 --- src/main.rs +++ src/main.rs @@ -6,9 +6,9 @@ extern crate anyhow; pub mod cli; pub mod config; pub mod db; +pub mod feed; pub mod mail; pub mod models; -pub mod rss; use crate::mail::{get_mailer, send_email}; use anyhow::Context; @@ -52,21 +52,17 @@ async fn main() -> anyhow::Result<()> { let mut set = JoinSet::new(); for u in urls { - set.spawn(async move { rss::fetch_new(u).await }); + set.spawn(async move { feed::fetch_new(u).await }); } while let Some(new) = set.join_next().await { - let new = new??; - let items = new.items(); + let posts = new??; - debug!("Found {} new items", items.len()); - - for i in items { + for i in posts.into_iter() { let conn = pool.acquire().await?; - db::insert_item(conn, i).await.context(format!( + db::insert_item(conn, &i).await.context(format!( "Unable to insert item from {:?} with GUID {:?}", - i.link(), - i.guid() + i.url, i.guid ))?; } } blob - c3046d7556498281e450586cff8e35cbdc2c9e4f blob + 04ca37d43fdb48afb55a0e25d1162ab1ebb2c02f --- src/models.rs +++ src/models.rs @@ -1,3 +1,5 @@ +use chrono::DateTime; + #[derive(Debug)] pub struct Post { pub guid: String, @@ -9,3 +11,76 @@ pub struct Post { pub content: Option, pub sent: bool, } + +impl TryFrom for Post { + type Error = anyhow::Error; + + fn try_from(item: rss::Item) -> anyhow::Result { + let time = item.pub_date().map(|date| { + DateTime::parse_from_rfc2822(date) + .unwrap_or_else(|_| DateTime::default()) + .timestamp() + }); + + let guid = item + .guid() + .ok_or_else(|| anyhow!("No guid found"))? + .value() + .to_string(); + let title = item.title().map(String::from); + let author = item.author().map(String::from); + let url = item.link().map(String::from); + let feedurl = item.source().map(|s| String::from(s.url())); + let pub_date = time; + let content = item + .content() + .or_else(|| item.description()) + .map(String::from); + + Ok(Self { + guid, + title, + author, + url, + feedurl, + pub_date, + content, + sent: false, + }) + } +} + +impl TryFrom for Post { + type Error = anyhow::Error; + + fn try_from(mut value: atom_syndication::Entry) -> Result { + let guid = value.id.clone(); + let title = Some(value.title.value); + // TODO: could be multiple authors (or none) - needs a database migration + let author = if !value.authors.is_empty() { + Some(value.authors.remove(0).name) + } else { + None + }; + let url = Some(value.id); + // TODO: either remove feedurl from Post or find a way to supply it here + let feedurl = None; + let pub_date = value.published.map(|p| p.timestamp()); + let content = if let Some(c) = value.content { + c.value + } else { + None + }; + + Ok(Self { + guid, + title, + author, + url, + feedurl, + pub_date, + content, + sent: false, + }) + } +} blob - 3141cb80595ac1326c51b00c0d95e9814ada7654 (mode 644) blob + /dev/null --- src/rss.rs +++ /dev/null @@ -1,14 +0,0 @@ -use reqwest; -use rss; - -use crate::anyhow::Context; - -pub async fn fetch_new>(url: S) -> anyhow::Result { - debug!("Fetching feed for {}", url.as_ref()); - - let content = reqwest::get(url.as_ref()).await?.bytes().await?; - let channel = rss::Channel::read_from(&content[..]) - .context(format!("Unable to read from RSS feed {}", url.as_ref()))?; - - Ok(channel) -}