From bea12db8aa458bbe90949408933c896189f77503 Mon Sep 17 00:00:00 2001 From: Jiajie Chen Date: Thu, 7 Mar 2024 14:33:11 +0800 Subject: [PATCH] feat: switch to connection pool --- Cargo.lock | 46 ++++++++++++++++++++++++++++++- server/Cargo.toml | 1 + server/src/bot.rs | 52 +++++++++++++++++++++++++++-------- server/src/github_webhooks.rs | 10 +++---- server/src/heartbeat.rs | 10 +++---- server/src/job.rs | 18 ++++++------ server/src/main.rs | 29 +++++++++---------- 7 files changed, 118 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 626d61e..0d9ff33 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -841,6 +841,38 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "deadpool" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb84100978c1c7b37f09ed3ce3e5f843af02c2a2c431bae5b19230dad2c1b490" +dependencies = [ + "async-trait", + "deadpool-runtime", + "num_cpus", + "tokio", +] + +[[package]] +name = "deadpool-lapin" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce11c0dc86703e59a8921bb9afee10b13c242e47624347bd3a3b545c41db556e" +dependencies = [ + "deadpool", + "lapin", + "tokio-executor-trait", +] + +[[package]] +name = "deadpool-runtime" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63dfa964fe2a66f3fde91fc70b267fe193d822c7e603e2a675a49a7f46ad3f49" +dependencies = [ + "tokio", +] + [[package]] name = "debversion" version = "0.2.2" @@ -2027,7 +2059,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2 0.5.6", "tokio", "tower-service", "tracing", @@ -3374,6 +3406,7 @@ dependencies = [ "clap", "common", "console", + "deadpool-lapin", "dickens", "dotenv", "env_logger", @@ -3819,6 +3852,17 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "tokio-executor-trait" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "802ccf58e108fe16561f35348fabe15ff38218968f033d587e399a84937533cc" +dependencies = [ + "async-trait", + "executor-trait", + "tokio", +] + [[package]] name = "tokio-macros" version = "2.2.0" diff --git a/server/Cargo.toml b/server/Cargo.toml index 1eca0f6..19a34c8 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -28,3 +28,4 @@ buildit-utils = { path = "../buildit-utils" } jsonwebtoken = "9.2.0" size = "0.4.1" dickens = { git = "https://github.com/AOSC-Dev/dickens.git", version = "0.1.0" } +deadpool-lapin = "0.11.0" diff --git a/server/src/bot.rs b/server/src/bot.rs index 8fcfd3b..f8492ed 100644 --- a/server/src/bot.rs +++ b/server/src/bot.rs @@ -7,9 +7,9 @@ use crate::{ use buildit_utils::github::{get_archs, update_abbs, OpenPRError, OpenPRRequest}; use chrono::Local; use common::{ensure_job_queue, JobSource}; -use lapin::{Channel, ConnectionProperties}; + use serde_json::Value; -use std::{borrow::Cow, sync::Arc}; +use std::borrow::Cow; use teloxide::{ prelude::*, types::{ChatAction, ParseMode}, @@ -61,7 +61,7 @@ async fn telegram_send_build_request( bot: &Bot, build_request: BuildRequest<'_>, msg: &Message, - channel: &Channel, + pool: deadpool_lapin::Pool, ) -> ResponseResult<()> { let BuildRequest { branch, @@ -73,6 +73,29 @@ async fn telegram_send_build_request( let archs = handle_archs_args(archs.to_vec()); + let conn = match pool.get().await { + Ok(conn) => conn, + Err(err) => { + bot.send_message( + msg.chat.id, + format!("Failed to connect to message queue: {}", err), + ) + .await?; + return Ok(()); + } + }; + let channel = match conn.create_channel().await { + Ok(conn) => conn, + Err(err) => { + bot.send_message( + msg.chat.id, + format!("Failed to connect to create channel: {}", err), + ) + .await?; + return Ok(()); + } + }; + match send_build_request( branch, packages, @@ -80,7 +103,7 @@ async fn telegram_send_build_request( github_pr, JobSource::Telegram(msg.chat.id.0), sha, - channel, + &channel, ) .await { @@ -114,9 +137,9 @@ fn handle_archs_args(archs: Vec<&str>) -> Vec<&str> { archs } -async fn status() -> anyhow::Result { +async fn status(pool: deadpool_lapin::Pool) -> anyhow::Result { let mut res = String::from("__*Queue Status*__\n\n"); - let conn = lapin::Connection::connect(&ARGS.amqp_addr, ConnectionProperties::default()).await?; + let conn = pool.get().await?; let channel = conn.create_channel().await?; for arch in ALL_ARCH { @@ -183,7 +206,7 @@ pub async fn answer( bot: Bot, msg: Message, cmd: Command, - channel: Arc, + pool: deadpool_lapin::Pool, ) -> ResponseResult<()> { bot.send_chat_action(msg.chat.id, ChatAction::Typing) .await?; @@ -293,8 +316,13 @@ pub async fn answer( sha, }; - telegram_send_build_request(&bot, build_request, &msg, &channel) - .await?; + telegram_send_build_request( + &bot, + build_request, + &msg, + pool.clone(), + ) + .await?; } else { bot.send_message(msg.chat.id, "Please list packages to build in pr info starting with '#buildit'.".to_string()) .await?; @@ -341,7 +369,7 @@ pub async fn answer( github_pr: None, sha: &sha, }; - telegram_send_build_request(&bot, build_request, &msg, &channel).await?; + telegram_send_build_request(&bot, build_request, &msg, pool.clone()).await?; } return Ok(()); } @@ -355,7 +383,7 @@ pub async fn answer( ) .await?; } - Command::Status => match status().await { + Command::Status => match status(pool).await { Ok(status) => { bot.send_message(msg.chat.id, status) .parse_mode(ParseMode::MarkdownV2) @@ -579,7 +607,7 @@ pub async fn answer( archs.extend(ALL_ARCH); } - match get_ready_message(&ARGS.amqp_addr, &archs).await { + match get_ready_message(pool, &archs).await { Ok(map) => { let mut res = String::new(); for (k, v) in map { diff --git a/server/src/github_webhooks.rs b/server/src/github_webhooks.rs index a8ef3de..9162b56 100644 --- a/server/src/github_webhooks.rs +++ b/server/src/github_webhooks.rs @@ -11,7 +11,7 @@ use futures::StreamExt; use lapin::{ options::{BasicConsumeOptions, QueueDeclareOptions}, types::FieldTable, - Channel, ConnectionProperties, + Channel, }; use log::{error, info}; use octocrab::Octocrab; @@ -36,18 +36,18 @@ struct User { login: String, } -pub async fn get_webhooks_message() { +pub async fn get_webhooks_message(pool: deadpool_lapin::Pool) { info!("Starting github webhook worker"); loop { - if let Err(e) = get_webhooks_message_inner().await { + if let Err(e) = get_webhooks_message_inner(pool.clone()).await { error!("Error getting webhooks message: {e}"); } tokio::time::sleep(Duration::from_secs(5)).await; } } -async fn get_webhooks_message_inner() -> anyhow::Result<()> { - let conn = lapin::Connection::connect(&ARGS.amqp_addr, ConnectionProperties::default()).await?; +async fn get_webhooks_message_inner(pool: deadpool_lapin::Pool) -> anyhow::Result<()> { + let conn = pool.get().await?; let channel = conn.create_channel().await?; let _queue = channel .queue_declare( diff --git a/server/src/heartbeat.rs b/server/src/heartbeat.rs index 12a3bff..43f3c75 100644 --- a/server/src/heartbeat.rs +++ b/server/src/heartbeat.rs @@ -5,14 +5,12 @@ use futures::StreamExt; use lapin::{ options::{BasicAckOptions, BasicConsumeOptions}, types::FieldTable, - ConnectionProperties, }; use log::{error, info, warn}; use std::time::Duration; -pub async fn heartbeat_worker_inner(amqp_addr: String) -> anyhow::Result<()> { - let conn = lapin::Connection::connect(&amqp_addr, ConnectionProperties::default()).await?; - +pub async fn heartbeat_worker_inner(pool: deadpool_lapin::Pool) -> anyhow::Result<()> { + let conn = pool.get().await?; let channel = conn.create_channel().await?; let queue_name = "worker-heartbeat"; ensure_job_queue(queue_name, &channel).await?; @@ -78,10 +76,10 @@ pub async fn heartbeat_worker_inner(amqp_addr: String) -> anyhow::Result<()> { Ok(()) } -pub async fn heartbeat_worker(amqp_addr: String) -> anyhow::Result<()> { +pub async fn heartbeat_worker(pool: deadpool_lapin::Pool) -> anyhow::Result<()> { loop { info!("Starting heartbeat worker ..."); - if let Err(err) = heartbeat_worker_inner(amqp_addr.clone()).await { + if let Err(err) = heartbeat_worker_inner(pool.clone()).await { error!("Got error while starting heartbeat worker: {}", err); } tokio::time::sleep(Duration::from_secs(5)).await; diff --git a/server/src/job.rs b/server/src/job.rs index 48ed0e6..6a98bf3 100644 --- a/server/src/job.rs +++ b/server/src/job.rs @@ -12,7 +12,7 @@ use lapin::{ message::Delivery, options::{BasicAckOptions, BasicConsumeOptions, BasicPublishOptions, QueueDeclareOptions}, types::FieldTable, - BasicProperties, Channel, ConnectionProperties, + BasicProperties, Channel, }; use log::{error, info, warn}; use octocrab::params::checks::CheckRunConclusion; @@ -26,9 +26,11 @@ use std::time::Duration; use teloxide::{prelude::*, types::ParseMode}; /// Observe job completion messages -pub async fn job_completion_worker_inner(bot: Bot, amqp_addr: &str) -> anyhow::Result<()> { - let conn = lapin::Connection::connect(amqp_addr, ConnectionProperties::default()).await?; - +pub async fn job_completion_worker_inner( + bot: Bot, + pool: deadpool_lapin::Pool, +) -> anyhow::Result<()> { + let conn = pool.get().await?; let channel = conn.create_channel().await?; let _queue = channel .queue_declare( @@ -361,11 +363,11 @@ async fn handle_success_message( } pub async fn get_ready_message( - amqp_addr: &str, + pool: deadpool_lapin::Pool, archs: &[&str], ) -> anyhow::Result> { let mut res = vec![]; - let conn = lapin::Connection::connect(amqp_addr, ConnectionProperties::default()).await?; + let conn = pool.get().await?; let channel = conn.create_channel().await?; for i in archs { @@ -425,10 +427,10 @@ pub fn update_retry(retry: Option) -> HandleSuccessResult { } } -pub async fn job_completion_worker(bot: Bot, amqp_addr: String) -> anyhow::Result<()> { +pub async fn job_completion_worker(bot: Bot, pool: deadpool_lapin::Pool) -> anyhow::Result<()> { loop { info!("Starting job completion worker ..."); - if let Err(err) = job_completion_worker_inner(bot.clone(), &amqp_addr).await { + if let Err(err) = job_completion_worker_inner(bot.clone(), pool.clone()).await { error!("Got error while starting job completion worker: {}", err); } tokio::time::sleep(Duration::from_secs(5)).await; diff --git a/server/src/main.rs b/server/src/main.rs index 215a088..533d05a 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,6 +1,3 @@ -use std::sync::Arc; - -use lapin::{Channel, ConnectionProperties}; use log::info; use server::bot::Command; use server::github_webhooks::get_webhooks_message; @@ -8,7 +5,7 @@ use server::{bot::answer, heartbeat::heartbeat_worker, job::job_completion_worke use teloxide::prelude::*; #[tokio::main] -async fn main() { +async fn main() -> anyhow::Result<()> { dotenv::dotenv().ok(); env_logger::init(); @@ -16,31 +13,31 @@ async fn main() { let bot = Bot::from_env(); - tokio::spawn(heartbeat_worker(ARGS.amqp_addr.clone())); - tokio::spawn(job_completion_worker(bot.clone(), ARGS.amqp_addr.clone())); - - let send_request_conn = - lapin::Connection::connect(&ARGS.amqp_addr, ConnectionProperties::default()) - .await - .unwrap(); + // setup lapin connection pool + let mut cfg = deadpool_lapin::Config::default(); + cfg.url = Some(ARGS.amqp_addr.clone()); + let pool = cfg.create_pool(Some(deadpool_lapin::Runtime::Tokio1))?; - let channel = Arc::new(send_request_conn.create_channel().await.unwrap()); + tokio::spawn(heartbeat_worker(pool.clone())); + tokio::spawn(job_completion_worker(bot.clone(), pool.clone())); let handler = Update::filter_message().branch(dptree::entry().filter_command::().endpoint( - |bot: Bot, channel: Arc, msg: Message, cmd: Command| async move { - answer(bot, msg, cmd, channel).await + |bot: Bot, pool: deadpool_lapin::Pool, msg: Message, cmd: Command| async move { + answer(bot, msg, cmd, pool).await }, )); let mut telegram = Dispatcher::builder(bot, handler) // Pass the shared state to the handler as a dependency. - .dependencies(dptree::deps![channel.clone()]) + .dependencies(dptree::deps![pool.clone()]) .enable_ctrlc_handler() .build(); tokio::select! { - v = get_webhooks_message() => v, + v = get_webhooks_message(pool.clone()) => v, v = telegram.dispatch() => v, }; + + Ok(()) }