commit e039dc9be309339563524e1606a744336453c5ce from: Hugo Osvaldo Barrera via: witcher date: Thu Nov 17 13:13:26 2022 UTC Implement async fetching This changeset implements fetching of RSS feeds asynchronously. It's still no perfect though: data from feeds is inserted into the database synchronously, and requests are temporarily paused while this happens (since we're doing blocking IO to the database). A potential solution to this is to have one thread that does the networking, and another which inserts into the DB, with something like an mpsc::channel to pass messages between them. This requires further refactor and can be done as a followup. It would also be great to re-use a single reqwest::Client, but that also requires further refactors. Despite imperfections, this patch still provides a noticeable speed bump; networking IO for one feed does not block others. References: https://todo.sr.ht/~witcher/rss-email/3 commit - 08360f954b312a5566c5a39e6d82256257b77644 commit + e039dc9be309339563524e1606a744336453c5ce blob - 5e5ded631b7135c166fb13031e2f6335bdc4e3c8 blob + 06368397a1ad7a4dfe9c33b7086b93885ff67823 --- Cargo.lock +++ Cargo.lock @@ -350,50 +350,41 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.23" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bfc52cbddcfd745bf1740338492bb0bd83d76c67b445f91c5fb29fae29ecaa1" +checksum = "52ba265a92256105f45b719605a571ffe2d1f0fea3807304b522c1d778f79eed" dependencies = [ "futures-core", ] [[package]] name = "futures-core" -version = "0.3.23" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2acedae88d38235936c3922476b10fced7b2b68136f5e3c03c2d5be348a1115" +checksum = "04909a7a7e4633ae6c4a9ab280aeb86da1236243a77b694a49eacd659a4bd3ac" [[package]] -name = "futures-io" -version = "0.3.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93a66fc6d035a26a3ae255a6d2bca35eda63ae4c5512bef54449113f7a1228e5" - -[[package]] name = "futures-sink" -version = "0.3.23" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca0bae1fe9752cf7fd9b0064c674ae63f97b37bc714d745cbde0afb7ec4e6765" +checksum = "39c15cf1a4aa79df40f1bb462fb39676d0ad9e366c2a33b590d7c66f4f81fcf9" [[package]] name = "futures-task" -version = "0.3.23" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "842fc63b931f4056a24d59de13fb1272134ce261816e063e634ad0c15cdc5306" +checksum = "2ffb393ac5d9a6eaa9d3fdf37ae2776656b706e200c8e16b1bdb227f5198e6ea" [[package]] name = "futures-util" -version = "0.3.23" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0828a5471e340229c11c77ca80017937ce3c58cb788a17e5f1c2d5c485a9577" +checksum = "197676987abd2f9cadff84926f410af1c183608d36641465df73ae8211dc65d6" dependencies = [ "futures-core", - "futures-io", "futures-task", - "memchr", "pin-project-lite", "pin-utils", - "slab", ] [[package]] @@ -741,9 +732,9 @@ dependencies = [ [[package]] name = "num_cpus" -version = "1.13.1" +version = "1.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1" +checksum = "f6058e64324c71e02bc2b150e4f3bc8286db6c83092132ffa3f6b1eab0f9def5" dependencies = [ "hermit-abi", "libc", @@ -962,6 +953,7 @@ dependencies = [ "reqwest", "rss", "serde", + "tokio", "toml", ] @@ -1150,9 +1142,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522c [[package]] name = "tokio" -version = "1.20.1" +version = "1.21.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a8325f63a7d4774dd041e363b2409ed1c5cbbd0f867795e661df066b2b0a581" +checksum = "a9e03c497dc955702ba729190dc4aac6f2a0ce97f913e5b1b5912fc5039d9099" dependencies = [ "autocfg", "bytes", @@ -1160,13 +1152,24 @@ dependencies = [ "memchr", "mio", "num_cpus", - "once_cell", "pin-project-lite", "socket2", + "tokio-macros", "winapi", ] [[package]] +name = "tokio-macros" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9724f9a975fb987ef7a3cd9be0350edcbe130698af5b8f7a631e23d42d052484" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] name = "tokio-rustls" version = "0.23.4" source = "registry+https://github.com/rust-lang/crates.io-index" blob - 93f7b32ac4c60607d4a91913294e9d22c1c78654 blob + dacdd829277c0d5384d2a0a86e198c7bd7533a05 --- Cargo.toml +++ Cargo.toml @@ -11,7 +11,7 @@ strip = "symbols" diesel = { version = "1.4", features = ["sqlite"] } rss = "2.0" anyhow = "1.0" -reqwest = {version = "0.11", default-features = false, features = ["blocking", "rustls-tls"]} +reqwest = {version = "0.11", default-features = false, features = ["rustls-tls"]} clap = { version = "3", features = ["derive"] } chrono = "0.4" toml = "0.5.8" @@ -21,3 +21,4 @@ diesel_migrations = "1.4.0" directories = "4.0.1" log = "0.4.17" env_logger = "0.9.0" +tokio = { version = "1.21.2", default-features = false, features = ["rt-multi-thread", "macros"] } blob - 156cc104f6b1d2ca9ecf492f4f9893c292758ee2 blob + b181138712b3a96686ba3774132fc7fa124cfbc2 --- src/main.rs +++ src/main.rs @@ -24,8 +24,10 @@ use std::{ fs::File, io::{BufRead, BufReader}, }; +use tokio::task::JoinSet; -fn main() -> anyhow::Result<()> { +#[tokio::main] +async fn main() -> anyhow::Result<()> { diesel_migrations::embed_migrations!("migrations/"); env_logger::init(); @@ -49,10 +51,18 @@ fn main() -> anyhow::Result<()> { let conn = SqliteConnection::establish(args.database_path.unwrap().to_str().unwrap())?; embedded_migrations::run(&conn)?; + let mut set = JoinSet::new(); for u in urls { - debug!("Fetching feed from {u:?}"); - let new = rss::fetch_new(u)?; - for i in new.items() { + set.spawn(async move { rss::fetch_new(u).await }); + } + + while let Some(new) = set.join_next().await { + let new = new??; + let items = new.items(); + + debug!("Found {} new items", items.len()); + + for i in items { let _ = db::insert_item(&conn, i)?; } } blob - 52903660f78d83ec81015281338c89ae1ba63af2 blob + af377d139f71bc562e75be4281aecfad59e9702f --- src/rss.rs +++ src/rss.rs @@ -1,8 +1,10 @@ use reqwest; use rss; -pub fn fetch_new>(url: S) -> anyhow::Result { - let content = reqwest::blocking::get(url.as_ref())?.bytes()?; +pub async fn fetch_new>(url: S) -> anyhow::Result { + debug!("Fetching feed for {}", url.as_ref()); + + let content = reqwest::get(url.as_ref()).await?.bytes().await?; let channel = rss::Channel::read_from(&content[..])?; Ok(channel)