commit - 72fbcb51f0ea090f720cfa8b159f8743bb75a079
commit + 8dfc8f4385acc8f20be89b62b21ddace47f07a1d
blob - 3dcf69f4c611f5b3fab98d9344d38e27a52ca073
blob + ddcfc5149177be9bf422109cfe96d795ec3d0130
--- Cargo.lock
+++ Cargo.lock
version = "0.2.1"
dependencies = [
"anyhow",
+ "atom_syndication",
"chrono",
"clap",
"directories",
blob - 08ff69107e32a1b03daeb02e837709dd37aafcbe
blob + 299cf0f638fb2fefa23bf2310735930e3a319a98
--- Cargo.toml
+++ Cargo.toml
env_logger = "0.9.0"
tokio = { version = "1.21.2", default-features = false, features = ["rt-multi-thread", "macros"] }
sqlx = { version = "0.6.2", features = ["runtime-tokio-rustls", "migrate", "sqlite", "offline"] }
+atom_syndication = "0.11.0"
blob - d217fc506c8db494b1eae9f74b48c4fc1969dd1e
blob + b88fcb6c9489d9dddd0b323d934be7405fbe7aef
--- src/db.rs
+++ src/db.rs
-use chrono::DateTime;
-use rss::Item;
use sqlx::pool::PoolConnection;
use sqlx::Sqlite;
+use crate::models::Post;
+
// inserts a new post or updates an old one with the same guid
-pub async fn insert_item(mut conn: PoolConnection<Sqlite>, item: &Item) -> anyhow::Result<()> {
- let time = item.pub_date().map(|date| {
- DateTime::parse_from_rfc2822(date)
- .unwrap_or_else(|_| DateTime::default())
- .timestamp()
- });
+pub async fn insert_item(mut conn: PoolConnection<Sqlite>, post: &Post) -> anyhow::Result<()> {
+ sqlx::query!("insert or ignore into posts (guid, title, author, url, feedurl, pub_date, content) values (?, ?, ?, ?, ?, ?, ?)", post.guid, post.title, post.author, post.url, post.feedurl, post.pub_date, post.content).execute(&mut conn).await?;
- let guid = item.guid().ok_or_else(|| anyhow!("No guid found"))?.value();
- let title = item.title();
- let author = item.author();
- let url = item.link();
- let feedurl = item.source().map(|s| s.url());
- let pub_date = time;
- let content = item.content().or_else(|| item.description());
-
- sqlx::query!("insert or ignore into posts (guid, title, author, url, feedurl, pub_date, content) values (?, ?, ?, ?, ?, ?, ?)", guid, title, author, url, feedurl, pub_date, content).execute(&mut conn).await?;
-
Ok(())
}
blob - /dev/null
blob + ef181a36c89292c45104d4bf4f07468aadda3466 (mode 644)
--- /dev/null
+++ src/feed.rs
+use atom_syndication;
+use rss;
+
+use crate::anyhow::Context;
+use crate::models::Post;
+
+pub async fn fetch_new<S: AsRef<str>>(url: S) -> anyhow::Result<Vec<Post>> {
+ debug!("Fetching feed for {}", url.as_ref());
+ let content = reqwest::get(url.as_ref()).await?.bytes().await?;
+ match fetch_new_rss(&content[..]).await {
+ Err(_) => fetch_new_atom(&content[..]).await,
+ p => p,
+ }
+}
+
+pub async fn fetch_new_rss(bytes: &[u8]) -> anyhow::Result<Vec<Post>> {
+ let channel = rss::Channel::read_from(bytes).context("Unable to read from RSS feed")?;
+
+ Ok(channel
+ .items
+ .into_iter()
+ .filter_map(|i| match i.try_into() {
+ Ok(p) => Some(p),
+ Err(e) => {
+ error!("Unable to convert received post, continuing ({e})");
+ None
+ }
+ })
+ .collect::<Vec<Post>>())
+}
+
+pub async fn fetch_new_atom(bytes: &[u8]) -> anyhow::Result<Vec<Post>> {
+ let feed = atom_syndication::Feed::read_from(bytes).context("Unable to read from atom feed")?;
+
+ Ok(feed
+ .entries
+ .into_iter()
+ .filter_map(|e| match e.try_into() {
+ Ok(e) => Some(e),
+ Err(e) => {
+ error!("Unable to convert received post, continuing ({e})");
+ None
+ }
+ })
+ .collect::<Vec<_>>())
+}
blob - 68bf335cec8a60919dc6be79f56fe70973af6091
blob + c4a56aaabbfafc555d4dd6fa03b7cd320dd536d1
--- src/main.rs
+++ src/main.rs
pub mod cli;
pub mod config;
pub mod db;
+pub mod feed;
pub mod mail;
pub mod models;
-pub mod rss;
use crate::mail::{get_mailer, send_email};
use anyhow::Context;
let mut set = JoinSet::new();
for u in urls {
- set.spawn(async move { rss::fetch_new(u).await });
+ set.spawn(async move { feed::fetch_new(u).await });
}
while let Some(new) = set.join_next().await {
- let new = new??;
- let items = new.items();
+ let posts = new??;
- debug!("Found {} new items", items.len());
-
- for i in items {
+ for i in posts.into_iter() {
let conn = pool.acquire().await?;
- db::insert_item(conn, i).await.context(format!(
+ db::insert_item(conn, &i).await.context(format!(
"Unable to insert item from {:?} with GUID {:?}",
- i.link(),
- i.guid()
+ i.url, i.guid
))?;
}
}
blob - c3046d7556498281e450586cff8e35cbdc2c9e4f
blob + 04ca37d43fdb48afb55a0e25d1162ab1ebb2c02f
--- src/models.rs
+++ src/models.rs
+use chrono::DateTime;
+
#[derive(Debug)]
pub struct Post {
pub guid: String,
pub content: Option<String>,
pub sent: bool,
}
+
+impl TryFrom<rss::Item> for Post {
+ type Error = anyhow::Error;
+
+ fn try_from(item: rss::Item) -> anyhow::Result<Self> {
+ let time = item.pub_date().map(|date| {
+ DateTime::parse_from_rfc2822(date)
+ .unwrap_or_else(|_| DateTime::default())
+ .timestamp()
+ });
+
+ let guid = item
+ .guid()
+ .ok_or_else(|| anyhow!("No guid found"))?
+ .value()
+ .to_string();
+ let title = item.title().map(String::from);
+ let author = item.author().map(String::from);
+ let url = item.link().map(String::from);
+ let feedurl = item.source().map(|s| String::from(s.url()));
+ let pub_date = time;
+ let content = item
+ .content()
+ .or_else(|| item.description())
+ .map(String::from);
+
+ Ok(Self {
+ guid,
+ title,
+ author,
+ url,
+ feedurl,
+ pub_date,
+ content,
+ sent: false,
+ })
+ }
+}
+
+impl TryFrom<atom_syndication::Entry> for Post {
+ type Error = anyhow::Error;
+
+ fn try_from(mut value: atom_syndication::Entry) -> Result<Self, Self::Error> {
+ let guid = value.id.clone();
+ let title = Some(value.title.value);
+ // TODO: could be multiple authors (or none) - needs a database migration
+ let author = if !value.authors.is_empty() {
+ Some(value.authors.remove(0).name)
+ } else {
+ None
+ };
+ let url = Some(value.id);
+ // TODO: either remove feedurl from Post or find a way to supply it here
+ let feedurl = None;
+ let pub_date = value.published.map(|p| p.timestamp());
+ let content = if let Some(c) = value.content {
+ c.value
+ } else {
+ None
+ };
+
+ Ok(Self {
+ guid,
+ title,
+ author,
+ url,
+ feedurl,
+ pub_date,
+ content,
+ sent: false,
+ })
+ }
+}
blob - 3141cb80595ac1326c51b00c0d95e9814ada7654 (mode 644)
blob + /dev/null
--- src/rss.rs
+++ /dev/null
-use reqwest;
-use rss;
-
-use crate::anyhow::Context;
-
-pub async fn fetch_new<S: AsRef<str>>(url: S) -> anyhow::Result<rss::Channel> {
- debug!("Fetching feed for {}", url.as_ref());
-
- let content = reqwest::get(url.as_ref()).await?.bytes().await?;
- let channel = rss::Channel::read_from(&content[..])
- .context(format!("Unable to read from RSS feed {}", url.as_ref()))?;
-
- Ok(channel)
-}