Skip to content

Commit

Permalink
feat: do not reuse connection in github webhook handler
Browse files Browse the repository at this point in the history
  • Loading branch information
jiegec committed Mar 7, 2024
1 parent e695ff5 commit b30f35c
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 32 deletions.
11 changes: 5 additions & 6 deletions server/src/bot.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
use std::{borrow::Cow, sync::Arc};

use crate::{
formatter::{code_repr_string, to_html_new_job_summary},
github::{get_github_token, get_packages_from_pr, login_github},
job::{get_ready_message, send_build_request},
Args, ALL_ARCH, ARGS, WORKERS,
ALL_ARCH, ARGS, WORKERS,
};
use buildit_utils::github::{get_archs, update_abbs, OpenPRError, OpenPRRequest};
use chrono::Local;
use common::{ensure_job_queue, JobSource};
use lapin::{Channel, ConnectionProperties};
use serde_json::Value;
use std::{borrow::Cow, sync::Arc};
use teloxide::{
prelude::*,
types::{ChatAction, ParseMode},
Expand Down Expand Up @@ -115,7 +114,7 @@ fn handle_archs_args(archs: Vec<&str>) -> Vec<&str> {
archs
}

async fn status(args: &Args) -> anyhow::Result<String> {
async fn status() -> anyhow::Result<String> {
let mut res = String::from("__*Queue Status*__\n\n");
let conn = lapin::Connection::connect(&ARGS.amqp_addr, ConnectionProperties::default()).await?;
let channel = conn.create_channel().await?;
Expand All @@ -127,7 +126,7 @@ async fn status(args: &Args) -> anyhow::Result<String> {

// read unacknowledged job count
let mut unacknowledged_str = String::new();
if let Some(api) = &args.rabbitmq_queue_api {
if let Some(api) = &ARGS.rabbitmq_queue_api {
let res = http_rabbitmq_api(api, queue_name).await?;
if let Some(unacknowledged) = res
.as_object()
Expand Down Expand Up @@ -356,7 +355,7 @@ pub async fn answer(
)
.await?;
}
Command::Status => match status(&ARGS).await {
Command::Status => match status().await {
Ok(status) => {
bot.send_message(msg.chat.id, status)
.parse_mode(ParseMode::MarkdownV2)
Expand Down
53 changes: 28 additions & 25 deletions server/src/github_webhooks.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,23 @@
use std::{path::Path, sync::Arc, time::Duration};

use crate::{
formatter::to_html_new_job_summary,
github::get_packages_from_pr,
job::{ack_delivery, send_build_request, update_retry, HandleSuccessResult},
ARGS,
};
use anyhow::{anyhow, bail};
use buildit_utils::github::{get_archs, update_abbs};
use common::JobSource;
use futures::StreamExt;
use lapin::{
options::{BasicConsumeOptions, QueueDeclareOptions},
types::FieldTable,
Channel,
Channel, ConnectionProperties,
};
use log::{error, info};
use octocrab::Octocrab;
use reqwest::StatusCode;
use serde::Deserialize;

use crate::{
formatter::to_html_new_job_summary,
github::get_packages_from_pr,
job::{ack_delivery, send_build_request, update_retry, HandleSuccessResult},
ARGS,
};
use std::{path::Path, time::Duration};

#[derive(Debug, Deserialize)]
struct WebhookComment {
Expand All @@ -38,16 +36,19 @@ struct User {
login: String,
}

pub async fn get_webhooks_message(channel: Arc<Channel>, path: &Path) {
pub async fn get_webhooks_message() {
info!("Starting github webhook worker");
loop {
if let Err(e) = get_webhooks_message_inner(channel.clone(), path).await {
if let Err(e) = get_webhooks_message_inner().await {
error!("Error getting webhooks message: {e}");
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
}

async fn get_webhooks_message_inner(channel: Arc<Channel>, path: &Path) -> anyhow::Result<()> {
async fn get_webhooks_message_inner() -> anyhow::Result<()> {
let conn = lapin::Connection::connect(&ARGS.amqp_addr, ConnectionProperties::default()).await?;
let channel = conn.create_channel().await?;
let _queue = channel
.queue_declare(
"github-webhooks",
Expand Down Expand Up @@ -80,20 +81,22 @@ async fn get_webhooks_message_inner(channel: Arc<Channel>, path: &Path) -> anyho
};

match serde_json::from_slice::<WebhookComment>(&delivery.data) {
Ok(comment) => match handle_webhook_comment(&comment, path, retry, &channel).await {
HandleSuccessResult::Ok | HandleSuccessResult::DoNotRetry => {
ack_delivery(delivery).await
}
HandleSuccessResult::Retry(r) => {
if r == 5 {
ack_delivery(delivery).await;
retry = None;
continue;
Ok(comment) => {
match handle_webhook_comment(&comment, &ARGS.abbs_path, retry, &channel).await {
HandleSuccessResult::Ok | HandleSuccessResult::DoNotRetry => {
ack_delivery(delivery).await
}
HandleSuccessResult::Retry(r) => {
if r == 5 {
ack_delivery(delivery).await;
retry = None;
continue;
}

retry = Some(r);
}

retry = Some(r);
}
},
}
Err(e) => {
error!("{e}");
ack_delivery(delivery).await
Expand Down
2 changes: 1 addition & 1 deletion server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async fn main() {
.build();

tokio::select! {
v = get_webhooks_message(channel, &ARGS.abbs_path) => v,
v = get_webhooks_message() => v,
v = telegram.dispatch() => v,
};
}

0 comments on commit b30f35c

Please sign in to comment.