Skip to content

Commit

Permalink
feat: impl reqpool
Browse files Browse the repository at this point in the history
  • Loading branch information
keroro520 committed Jan 13, 2025
1 parent ddba6b0 commit 6d27661
Show file tree
Hide file tree
Showing 9 changed files with 768 additions and 0 deletions.
33 changes: 33 additions & 0 deletions reqpool/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
[package]
name = "raiko-reqpool"
version = "0.1.0"
authors = ["Taiko Labs"]
edition = "2021"

[dependencies]
raiko-lib = { workspace = true }
raiko-core = { workspace = true }
raiko-redis-derive = { workspace = true }
num_enum = { workspace = true }
chrono = { workspace = true, features = ["serde"] }
thiserror = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_with = { workspace = true }
hex = { workspace = true }
tracing = { workspace = true }
anyhow = { workspace = true }
tokio = { workspace = true }
async-trait = { workspace = true }
redis = { workspace = true }
backoff = { workspace = true }
derive-getters = { workspace = true }
proc-macro2 = { workspace = true }
quote = { workspace = true }
syn = { workspace = true }
alloy-primitives = { workspace = true }

[dev-dependencies]
rand = "0.9.0-alpha.1" # This is an alpha version, that has rng.gen_iter::<T>()
rand_chacha = "0.9.0-alpha.1"
tempfile = "3.10.1"
10 changes: 10 additions & 0 deletions reqpool/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
/// The configuration for the redis-backend request pool
pub struct RedisPoolConfig {
/// The URL of the Redis database, e.g. "redis://localhost:6379"
pub redis_url: String,
/// The TTL of the Redis database
pub redis_ttl: u64,
}
17 changes: 17 additions & 0 deletions reqpool/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
mod config;
mod macros;
mod memory_pool;
mod redis_pool;
mod request;
mod traits;
mod utils;

// Re-export
pub use config::RedisPoolConfig;
pub use redis_pool::RedisPool;
pub use request::{
AggregationRequestEntity, AggregationRequestKey, RequestEntity, RequestKey,
SingleProofRequestEntity, SingleProofRequestKey, Status, StatusWithContext,
};
pub use traits::{Pool, PoolResult, PoolWithTrace};
pub use utils::proof_key_to_hack_request_key;
44 changes: 44 additions & 0 deletions reqpool/src/macros.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/// This macro implements the Display trait for a type by using serde_json's pretty printing.
/// If the type cannot be serialized to JSON, it falls back to using Debug formatting.
///
/// # Example
///
/// ```rust
/// use serde::{Serialize, Deserialize};
///
/// #[derive(Debug, Serialize, Deserialize)]
/// struct Person {
/// name: String,
/// age: u32
/// }
///
/// impl_display_using_json_pretty!(Person);
///
/// let person = Person {
/// name: "John".to_string(),
/// age: 30
/// };
///
/// // Will print:
/// // {
/// // "name": "John",
/// // "age": 30
/// // }
/// println!("{}", person);
/// ```
///
/// The type must implement serde's Serialize trait for JSON serialization to work.
/// If serialization fails, it will fall back to using the Debug implementation.
#[macro_export]
macro_rules! impl_display_using_json_pretty {
($type:ty) => {
impl std::fmt::Display for $type {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match serde_json::to_string_pretty(self) {
Ok(s) => write!(f, "{}", s),
Err(_) => write!(f, "{:?}", self),
}
}
}
};
}
115 changes: 115 additions & 0 deletions reqpool/src/memory_pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// use std::collections::HashMap;

// use chrono::Utc;

// use crate::{
// request::{RequestEntity, RequestKey, Status, StatusWithContext},
// traits::{Pool, PoolWithTrace},
// };

// #[derive(Debug, Clone)]
// pub struct MemoryPool {
// /// The live requests in the pool
// pending: HashMap<RequestKey, (RequestEntity, StatusWithContext)>,
// /// The trace of requests
// trace: Vec<(RequestKey, RequestEntity, StatusWithContext)>,
// }

// impl Pool for MemoryPool {
// type Config = ();

// fn new(_config: Self::Config) -> Self {
// Self {
// lives: HashMap::new(),
// trace: Vec::new(),
// }
// }

// fn add(&mut self, request_key: RequestKey, request_entity: RequestEntity) {
// let status = StatusWithContext::new(Status::Registered, Utc::now());

// let old = self.lives.insert(
// request_key.clone(),
// (request_entity.clone(), status.clone()),
// );

// if let Some((_, old_status)) = old {
// tracing::error!(
// "MemoryPool.add: request key already exists, {request_key:?}, old status: {old_status:?}"
// );
// } else {
// tracing::info!("MemoryPool.add, {request_key:?}, status: {status:?}");
// }

// self.trace.push((request_key, request_entity, status));
// }

// fn remove(&mut self, request_key: &RequestKey) {
// match self.lives.remove(request_key) {
// Some((_, status)) => {
// tracing::info!("MemoryPool.remove, {request_key:?}, status: {status:?}");
// }
// None => {
// tracing::error!("MemoryPool.remove: request key not found, {request_key:?}");
// }
// }
// }

// fn get(&self, request_key: &RequestKey) -> Option<(RequestEntity, StatusWithContext)> {
// self.lives.get(request_key).cloned()
// }

// fn get_status(&self, request_key: &RequestKey) -> Option<StatusWithContext> {
// self.lives
// .get(request_key)
// .map(|(_, status)| status.clone())
// }

// fn update_status(&mut self, request_key: &RequestKey, status: StatusWithContext) {
// match self.lives.remove(request_key) {
// Some((entity, old_status)) => {
// tracing::info!(
// "MemoryPool.update_status, {request_key:?}, old status: {old_status:?}, new status: {status:?}"
// );
// self.lives
// .insert(request_key.clone(), (entity.clone(), status.clone()));
// self.trace.push((request_key.clone(), entity, status));
// }
// None => {
// tracing::error!(
// "MemoryPool.update_status: request key not found, discard it, {request_key:?}"
// );
// }
// }
// }
// }

// impl PoolWithTrace for MemoryPool {
// fn get_all_live(&self) -> Vec<(RequestKey, RequestEntity, StatusWithContext)> {
// self.lives
// .iter()
// .map(|(k, v)| (k.clone(), v.0.clone(), v.1.clone()))
// .collect()
// }

// fn get_all_trace(&self) -> Vec<(RequestKey, RequestEntity, StatusWithContext)> {
// self.trace.clone()
// }

// fn trace(
// &self,
// request_key: &RequestKey,
// ) -> (
// Option<(RequestEntity, StatusWithContext)>,
// Vec<(RequestKey, RequestEntity, StatusWithContext)>,
// ) {
// let live = self.lives.get(request_key).cloned();
// let traces = self
// .trace
// .iter()
// .filter(|(k, _, _)| k == request_key)
// .cloned()
// .collect();
// (live, traces)
// }
// }
Loading

0 comments on commit 6d27661

Please sign in to comment.