diff --git a/chain/Cargo.toml b/chain/Cargo.toml index ccc3d1613..2608c33b0 100644 --- a/chain/Cargo.toml +++ b/chain/Cargo.toml @@ -33,6 +33,10 @@ diesel.workspace = true orm.workspace = true clap-verbosity-flag.workspace = true futures.workspace = true +futures-util.workspace = true +serde.workspace = true +deadpool-redis = "0.15.1" +redis = {version = "0.25.0", features = ["streams"]} [build-dependencies] vergen = { version = "8.0.0", features = ["build", "git", "gitcl"] } diff --git a/chain/run.sh b/chain/run.sh index 1068af98d..3833c46b9 100755 --- a/chain/run.sh +++ b/chain/run.sh @@ -1,4 +1,5 @@ . ../.env export TENDERMINT_URL export DATABASE_URL +export QUEUE_URL cargo run -- --initial-query-retry-time 5 diff --git a/chain/src/app_state.rs b/chain/src/app_state.rs index 8179fb34a..bc855e13e 100644 --- a/chain/src/app_state.rs +++ b/chain/src/app_state.rs @@ -2,14 +2,16 @@ use std::env; use anyhow::Context; use deadpool_diesel::postgres::{Object, Pool as DbPool}; +use deadpool_redis::{Config, Pool as RedisPool, Runtime}; #[derive(Clone)] pub struct AppState { db: DbPool, + redis: RedisPool, } impl AppState { - pub fn new(db_url: String) -> anyhow::Result { + pub fn new(db_url: String, redis_url: String) -> anyhow::Result { let max_pool_size = env::var("DATABASE_POOL_SIZE") .unwrap_or_else(|_| 8.to_string()) .parse::() @@ -23,7 +25,13 @@ impl AppState { .build() .context("Failed to build Postgres db pool")?; - Ok(Self { db: pool }) + let cfg = Config::from_url(redis_url); + let redis_pool = cfg.create_pool(Some(Runtime::Tokio1)).unwrap(); + + Ok(Self { + db: pool, + redis: redis_pool, + }) } pub async fn get_db_connection(&self) -> anyhow::Result { @@ -32,4 +40,13 @@ impl AppState { .await .context("Failed to get db connection handle from deadpool") } + + pub async fn get_redis_connection( + &self, + ) -> anyhow::Result { + self.redis + .get() + .await + .context("Failed to get redis connection handle from deadpool") + } } diff --git a/chain/src/config.rs b/chain/src/config.rs index 745dd46f7..9c2729514 100644 --- a/chain/src/config.rs +++ b/chain/src/config.rs @@ -15,7 +15,7 @@ impl Display for CargoEnv { } } -#[derive(clap::Parser)] +#[derive(clap::Parser, Clone)] pub struct AppConfig { #[clap(long, env)] pub tendermint_url: String, @@ -23,6 +23,9 @@ pub struct AppConfig { #[clap(long, env)] pub database_url: String, + #[clap(long, env)] + pub queue_url: String, + #[clap(long, env)] pub initial_query_retry_time: u64, diff --git a/chain/src/main.rs b/chain/src/main.rs index cadadac73..f0f47f72f 100644 --- a/chain/src/main.rs +++ b/chain/src/main.rs @@ -7,6 +7,7 @@ use chain::app_state::AppState; use chain::config::AppConfig; use chain::repository; use chain::services::db::get_pos_crawler_state; +// TODO remove imports use chain::services::namada::{ query_all_balances, query_all_bonds_and_unbonds, query_all_proposals, query_bonds, query_last_block_height, @@ -19,16 +20,24 @@ use chrono::{NaiveDateTime, Utc}; use clap::Parser; use clap_verbosity_flag::LevelFilter; use deadpool_diesel::postgres::Object; +use deadpool_redis::redis::AsyncCommands; use namada_sdk::time::DateTimeUtc; use orm::migrations::run_migrations; +use redis::aio::MultiplexedConnection; use shared::block::Block; use shared::block_result::BlockResult; use shared::checksums::Checksums; use shared::crawler::crawl; use shared::crawler_state::ChainCrawlerState; use shared::error::{AsDbError, AsRpcError, ContextDbInteractError, MainError}; +use shared::event_store::{ + publish, subscribe, ChainInitializedEventV1, ChainProcessed, PosEvents, +}; use shared::id::Id; use tendermint_rpc::HttpClient; +use tokio::signal; +use tokio::sync::{mpsc, oneshot}; +use tokio::task::JoinHandle; use tokio::time::sleep; use tracing::Level; use tracing_subscriber::FmtSubscriber; @@ -36,10 +45,9 @@ use tracing_subscriber::FmtSubscriber; #[tokio::main] async fn main() -> Result<(), MainError> { let config = AppConfig::parse(); - let client = HttpClient::new(config.tendermint_url.as_str()).unwrap(); - let mut checksums = Checksums::default(); + for code_path in Checksums::code_paths() { let code = namada_service::query_tx_code_hash(&client, &code_path) .await @@ -57,6 +65,7 @@ async fn main() -> Result<(), MainError> { LevelFilter::Debug => Some(Level::DEBUG), LevelFilter::Trace => Some(Level::TRACE), }; + if let Some(log_level) = log_level { let subscriber = FmtSubscriber::builder().with_max_level(log_level).finish(); @@ -64,9 +73,10 @@ async fn main() -> Result<(), MainError> { } let client = Arc::new(client); - - let app_state = AppState::new(config.database_url).into_db_error()?; - let conn = Arc::new(app_state.get_db_connection().await.into_db_error()?); + let app_state = Arc::new( + AppState::new(config.database_url, config.queue_url).into_db_error()?, + ); + let conn = app_state.get_db_connection().await.into_db_error()?; // Run migrations run_migrations(&conn) @@ -74,34 +84,130 @@ async fn main() -> Result<(), MainError> { .context_db_interact_error() .into_db_error()?; - initial_query(&client, &conn, config.initial_query_retry_time).await?; + let (tx, rx) = oneshot::channel(); + let (events_tx, events_rx) = mpsc::channel::(100); - let crawler_state = db_service::get_chain_crawler_state(&conn) - .await - .into_db_error()?; + let mut redis_conn = + app_state.get_redis_connection().await.into_db_error()?; - crawl( - move |block_height| { - crawling_fn( - block_height, - client.clone(), - conn.clone(), - checksums.clone(), - ) - }, - crawler_state.last_processed_block, + let last_processed_id: String = redis_conn + .get("chain_last_processed_id") + .await + .ok() + .unwrap_or("0".to_string()); + + let subscriber = tokio::spawn(subscribe( + redis_conn, + ("chain_last_processed_id".to_string(), last_processed_id), + events_tx, + rx, + )); + + let handler = tokio::spawn(message_processor( + events_rx, + Arc::clone(&client), + Arc::clone(&app_state), + config.initial_query_retry_time, + checksums.clone(), + )); + + let redis_conn = app_state.get_redis_connection().await.into_db_error()?; + + publish( + redis_conn, + PosEvents::ChainInitializedEventV1(ChainInitializedEventV1), ) .await + .into_db_error()?; + + tokio::select! { + _ = must_exit_handle() => { + tracing::info!("Received interrupt signal, shutting down..."); + tx.send(()).unwrap(); + } + _ = handler => { + tracing::info!("Handler finished..."); + } + _ = subscriber => { + tracing::info!("Subscriber finished..."); + } + } + + Ok(()) +} + +async fn message_processor( + mut rx: mpsc::Receiver, + client: Arc, + app_state: Arc, + initial_query_retry_time: u64, + checksums: Checksums, +) -> anyhow::Result<()> { + while let Some(event) = rx.recv().await { + let app_state = Arc::clone(&app_state); + + match event { + PosEvents::PosInitializedEventV1(_data) => { + let client = Arc::clone(&client); + let checksums = checksums.clone(); + let conn = app_state.get_db_connection().await?; + + initial_query( + Arc::clone(&client), + conn, + initial_query_retry_time, + ) + .await + .context("Initial query error")?; + + let conn = app_state.get_db_connection().await?; + let crawler_state = db_service::get_chain_crawler_state(&conn) + .await + .into_db_error()?; + + crawl( + move |block_height| { + crawling_fn( + block_height, + Arc::clone(&client), + checksums.clone(), + app_state.clone(), + ) + }, + crawler_state.last_processed_block, + ) + .await + .context("Crawling error")?; + } + PosEvents::Test(_) => { + tracing::info!("Received test event"); + } + _ => {} + } + } + tracing::info!("Message processor finished..."); + + Ok(()) +} + +fn must_exit_handle() -> JoinHandle<()> { + tokio::spawn(async move { + signal::ctrl_c() + .await + .expect("Error receiving interrupt signal"); + }) } async fn crawling_fn( block_height: u32, client: Arc, - conn: Arc, checksums: Checksums, + app_state: Arc, ) -> Result<(), MainError> { let should_process = can_process(block_height, client.clone()).await?; + let conn = app_state.get_db_connection().await.into_db_error()?; + if !should_process { let timestamp = Utc::now().naive_utc(); update_crawler_timestamp(&conn, timestamp).await?; @@ -157,9 +263,10 @@ async fn crawling_fn( .into_rpc_error()?; let addresses = block.addresses_with_balance_change(native_token); - let balances = namada_service::query_balance(&client, &addresses) - .await - .into_rpc_error()?; + let balances = + namada_service::query_balance(Arc::clone(&client), addresses.clone()) + .await + .into_rpc_error()?; tracing::info!("Updating balance for {} addresses...", addresses.len()); let next_governance_proposal_id = @@ -171,7 +278,7 @@ async fn crawling_fn( tracing::info!("Creating {} governance proposals...", proposals.len()); let proposals_with_tally = - namada_service::query_tallies(&client, proposals) + namada_service::query_tallies(Arc::clone(&client), proposals) .await .into_rpc_error()?; @@ -179,7 +286,9 @@ async fn crawling_fn( tracing::info!("Creating {} governance votes...", proposals_votes.len()); let addresses = block.bond_addresses(); - let bonds = query_bonds(&client, addresses).await.into_rpc_error()?; + let bonds = query_bonds(Arc::clone(&client), addresses) + .await + .into_rpc_error()?; tracing::info!("Updating bonds for {} addresses", bonds.len()); let bonds_updates = bonds @@ -195,7 +304,7 @@ async fn crawling_fn( .collect::>(); let addresses = block.unbond_addresses(); - let unbonds = namada_service::query_unbonds(&client, addresses) + let unbonds = namada_service::query_unbonds(client, addresses) .await .into_rpc_error()?; tracing::info!("Updating unbonds for {} addresses", unbonds.len()); @@ -277,31 +386,43 @@ async fn crawling_fn( }) }) .await - .context_db_interact_error() - .into_db_error()? - .context("Commit block db transaction error") - .into_db_error() + .unwrap() + .unwrap(); + + let redis_conn = app_state.get_redis_connection().await.into_db_error()?; + + publish( + redis_conn, + PosEvents::ChainProcessed(ChainProcessed { + block: block_height, + }), + ) + .await + .into_db_error()?; + + Ok(()) } async fn initial_query( - client: &HttpClient, - conn: &Object, + client: Arc, + conn: Object, initial_query_retry_time: u64, ) -> Result<(), MainError> { tracing::info!("Querying initial data..."); let block_height = - query_last_block_height(client).await.into_rpc_error()?; + query_last_block_height(&client).await.into_rpc_error()?; let mut epoch = - namada_service::get_epoch_at_block_height(client, block_height) + namada_service::get_epoch_at_block_height(&client, block_height) + .await + .into_rpc_error()?; + let first_block_in_epoch = + namada_service::get_first_block_in_epoch(&client) .await .into_rpc_error()?; - let first_block_in_epoch = namada_service::get_first_block_in_epoch(client) - .await - .into_rpc_error()?; loop { let pos_crawler_state = - get_pos_crawler_state(conn).await.into_db_error(); + get_pos_crawler_state(&conn).await.into_db_error(); match pos_crawler_state { // >= in case epochs are really short @@ -320,17 +441,17 @@ async fn initial_query( sleep(Duration::from_secs(initial_query_retry_time)).await; } - let balances = query_all_balances(client).await.into_rpc_error()?; + let balances = query_all_balances(&client).await.into_rpc_error()?; tracing::info!("Querying bonds and unbonds..."); - let (bonds, unbonds) = query_all_bonds_and_unbonds(client, None, None) + let (bonds, unbonds) = query_all_bonds_and_unbonds(&client, None, None) .await .into_rpc_error()?; tracing::info!("Querying proposals..."); - let proposals = query_all_proposals(client).await.into_rpc_error()?; + let proposals = query_all_proposals(&client).await.into_rpc_error()?; let proposals_with_tally = - namada_service::query_tallies(client, proposals.clone()) + namada_service::query_tallies(Arc::clone(&client), proposals.clone()) .await .into_rpc_error()?; diff --git a/chain/src/services/namada.rs b/chain/src/services/namada.rs index d4816dbca..24cf74afa 100644 --- a/chain/src/services/namada.rs +++ b/chain/src/services/namada.rs @@ -1,5 +1,6 @@ use std::collections::HashSet; use std::str::FromStr; +use std::sync::Arc; use anyhow::{anyhow, Context}; use futures::StreamExt; @@ -67,34 +68,40 @@ pub async fn get_epoch_at_block_height( } pub async fn query_balance( - client: &HttpClient, - balance_changes: &HashSet, + client: Arc, + balance_changes: HashSet, ) -> anyhow::Result { Ok(futures::stream::iter(balance_changes) - .filter_map(|balance_change| async move { - tracing::info!( - "Fetching balance change for {} ...", - balance_change.address - ); - - let owner = - NamadaSdkAddress::from_str(&balance_change.address.to_string()) - .context("Failed to parse owner address") - .ok()?; - let token = - NamadaSdkAddress::from_str(&balance_change.token.to_string()) - .context("Failed to parse token address") - .ok()?; - - let amount = rpc::get_token_balance(client, &token, &owner) - .await - .unwrap_or_default(); + .filter_map(|balance_change| { + let client = Arc::clone(&client); + async move { + tracing::info!( + "Fetching balance change for {} ...", + balance_change.address + ); + + let owner = NamadaSdkAddress::from_str( + &balance_change.address.to_string(), + ) + .context("Failed to parse owner address") + .ok()?; + let token = NamadaSdkAddress::from_str( + &balance_change.token.to_string(), + ) + .context("Failed to parse token address") + .ok()?; + + let client: &HttpClient = &client; + let amount = rpc::get_token_balance(client, &token, &owner) + .await + .unwrap_or_default(); - Some(Balance { - owner: Id::from(owner), - token: Id::from(token), - amount: Amount::from(amount), - }) + Some(Balance { + owner: Id::from(owner), + token: Id::from(token), + amount: Amount::from(amount), + }) + } }) .map(futures::future::ready) .buffer_unordered(20) @@ -326,7 +333,7 @@ pub async fn query_next_governance_id( .context("Failed to deserialize proposal id") } -pub async fn query_bonds( +pub async fn query_bonds_old( client: &HttpClient, addresses: HashSet, ) -> anyhow::Result)>> { @@ -363,8 +370,51 @@ pub async fn query_bonds( anyhow::Ok(bonds) } +pub async fn query_bonds( + client: Arc, + addresses: HashSet, +) -> anyhow::Result)>> { + let nested_bonds = futures::stream::iter(addresses) + .filter_map(|BondAddresses { source, target }| { + let client = Arc::clone(&client); + async move { + let client: &HttpClient = &client; + // TODO: if this is too slow do not use query_all_bonds_and_unbonds + let (bonds_res, _) = query_all_bonds_and_unbonds( + client, + Some(source.clone()), + Some(target.clone()), + ) + .await + .context("Failed to query all bonds and unbonds") + .ok()?; + + let bonds = if !bonds_res.is_empty() { + bonds_res + .into_iter() + .map(|bond| { + (source.clone(), target.clone(), Some(bond)) + }) + .collect::>() + } else { + vec![(source, target, None)] + }; + + Some(bonds) + } + }) + .map(futures::future::ready) + .buffer_unordered(20) + .collect::>() + .await; + + let bonds = nested_bonds.iter().flatten().cloned().collect(); + + anyhow::Ok(bonds) +} + pub async fn query_unbonds( - client: &HttpClient, + client: Arc, addresses: HashSet, ) -> anyhow::Result { let nested_unbonds = futures::stream::iter(addresses) @@ -374,7 +424,9 @@ pub async fn query_unbonds( let validator = NamadaSdkAddress::from_str(&validator.to_string()) .expect("Failed to parse validator address"); + let client = Arc::clone(&client); async move { + let client: &HttpClient = &client; let unbonds = RPC .vp() .pos() @@ -475,16 +527,21 @@ pub async fn is_steward( } pub async fn query_tallies( - client: &HttpClient, + client: Arc, proposals: Vec, ) -> anyhow::Result> { let proposals = futures::stream::iter(proposals) - .filter_map(|proposal| async move { - let is_steward = is_steward(client, &proposal.author).await.ok()?; + .filter_map(|proposal| { + let client = Arc::clone(&client); + async move { + let proposal = proposal.clone(); + let is_steward = + is_steward(&client, &proposal.author).await.ok()?; - let tally_type = TallyType::from(&proposal.r#type, is_steward); + let tally_type = TallyType::from(&proposal.r#type, is_steward); - Some((proposal, tally_type)) + Some((proposal, tally_type)) + } }) .map(futures::future::ready) .buffer_unordered(20) @@ -495,12 +552,15 @@ pub async fn query_tallies( } pub async fn query_all_votes( - client: &HttpClient, + client: Arc, proposals_ids: Vec, ) -> anyhow::Result> { - let votes: Vec> = - futures::stream::iter(proposals_ids) - .filter_map(|proposal_id| async move { + let votes = futures::stream::iter(proposals_ids) + .filter_map(|proposal_id| { + let client = Arc::clone(&client); + async move { + let client: &HttpClient = &client; + let votes = rpc::query_proposal_votes(client, proposal_id) .await .ok()?; @@ -515,11 +575,12 @@ pub async fn query_all_votes( .collect::>(); Some(votes) - }) - .map(futures::future::ready) - .buffer_unordered(20) - .collect::>() - .await; + } + }) + .map(futures::future::ready) + .buffer_unordered(20) + .collect::>() + .await; anyhow::Ok(votes.iter().flatten().cloned().collect()) } diff --git a/docker-compose-db.yml b/docker-compose-db.yml index 27a49e979..f7e42c8ee 100644 --- a/docker-compose-db.yml +++ b/docker-compose-db.yml @@ -17,7 +17,7 @@ services: dragonfly: image: docker.dragonflydb.io/dragonflydb/dragonfly - command: --logtostderr --cache_mode=true --port 6379 -dbnum 1 --maxmemory=2gb + command: --logtostderr --cache_mode=true --port 6379 -dbnum 1 --maxmemory=4gb ulimits: memlock: -1 ports: diff --git a/orm/src/schema.rs b/orm/src/schema.rs index ba632225a..1b64c3386 100644 --- a/orm/src/schema.rs +++ b/orm/src/schema.rs @@ -1,67 +1,35 @@ // @generated automatically by Diesel CLI. pub mod sql_types { - #[derive( - diesel::query_builder::QueryId, - std::fmt::Debug, - diesel::sql_types::SqlType, - )] + #[derive(diesel::query_builder::QueryId, std::fmt::Debug, diesel::sql_types::SqlType)] #[diesel(postgres_type(name = "crawler_name"))] pub struct CrawlerName; - #[derive( - diesel::query_builder::QueryId, - std::fmt::Debug, - diesel::sql_types::SqlType, - )] + #[derive(diesel::query_builder::QueryId, std::fmt::Debug, diesel::sql_types::SqlType)] #[diesel(postgres_type(name = "governance_kind"))] pub struct GovernanceKind; - #[derive( - diesel::query_builder::QueryId, - std::fmt::Debug, - diesel::sql_types::SqlType, - )] + #[derive(diesel::query_builder::QueryId, std::fmt::Debug, diesel::sql_types::SqlType)] #[diesel(postgres_type(name = "governance_result"))] pub struct GovernanceResult; - #[derive( - diesel::query_builder::QueryId, - std::fmt::Debug, - diesel::sql_types::SqlType, - )] + #[derive(diesel::query_builder::QueryId, std::fmt::Debug, diesel::sql_types::SqlType)] #[diesel(postgres_type(name = "governance_tally_type"))] pub struct GovernanceTallyType; - #[derive( - diesel::query_builder::QueryId, - std::fmt::Debug, - diesel::sql_types::SqlType, - )] + #[derive(diesel::query_builder::QueryId, std::fmt::Debug, diesel::sql_types::SqlType)] #[diesel(postgres_type(name = "transaction_kind"))] pub struct TransactionKind; - #[derive( - diesel::query_builder::QueryId, - std::fmt::Debug, - diesel::sql_types::SqlType, - )] + #[derive(diesel::query_builder::QueryId, std::fmt::Debug, diesel::sql_types::SqlType)] #[diesel(postgres_type(name = "transaction_result"))] pub struct TransactionResult; - #[derive( - diesel::query_builder::QueryId, - std::fmt::Debug, - diesel::sql_types::SqlType, - )] + #[derive(diesel::query_builder::QueryId, std::fmt::Debug, diesel::sql_types::SqlType)] #[diesel(postgres_type(name = "validator_state"))] pub struct ValidatorState; - #[derive( - diesel::query_builder::QueryId, - std::fmt::Debug, - diesel::sql_types::SqlType, - )] + #[derive(diesel::query_builder::QueryId, std::fmt::Debug, diesel::sql_types::SqlType)] #[diesel(postgres_type(name = "vote_kind"))] pub struct VoteKind; } diff --git a/pos/Cargo.toml b/pos/Cargo.toml index cc032cea3..d9b0717f3 100644 --- a/pos/Cargo.toml +++ b/pos/Cargo.toml @@ -29,6 +29,9 @@ diesel.workspace = true diesel_migrations.workspace = true orm.workspace = true clap-verbosity-flag.workspace = true +deadpool-redis = "0.15.1" +redis = {version = "0.25.0", features = ["streams"]} +serde_json.workspace = true [build-dependencies] vergen = { version = "8.0.0", features = ["build", "git", "gitcl"] } diff --git a/pos/run.sh b/pos/run.sh index df0434ec7..75e42d543 100755 --- a/pos/run.sh +++ b/pos/run.sh @@ -1,4 +1,5 @@ . ../.env export TENDERMINT_URL export DATABASE_URL +export QUEUE_URL cargo run diff --git a/pos/src/app_state.rs b/pos/src/app_state.rs index 8179fb34a..bc855e13e 100644 --- a/pos/src/app_state.rs +++ b/pos/src/app_state.rs @@ -2,14 +2,16 @@ use std::env; use anyhow::Context; use deadpool_diesel::postgres::{Object, Pool as DbPool}; +use deadpool_redis::{Config, Pool as RedisPool, Runtime}; #[derive(Clone)] pub struct AppState { db: DbPool, + redis: RedisPool, } impl AppState { - pub fn new(db_url: String) -> anyhow::Result { + pub fn new(db_url: String, redis_url: String) -> anyhow::Result { let max_pool_size = env::var("DATABASE_POOL_SIZE") .unwrap_or_else(|_| 8.to_string()) .parse::() @@ -23,7 +25,13 @@ impl AppState { .build() .context("Failed to build Postgres db pool")?; - Ok(Self { db: pool }) + let cfg = Config::from_url(redis_url); + let redis_pool = cfg.create_pool(Some(Runtime::Tokio1)).unwrap(); + + Ok(Self { + db: pool, + redis: redis_pool, + }) } pub async fn get_db_connection(&self) -> anyhow::Result { @@ -32,4 +40,13 @@ impl AppState { .await .context("Failed to get db connection handle from deadpool") } + + pub async fn get_redis_connection( + &self, + ) -> anyhow::Result { + self.redis + .get() + .await + .context("Failed to get redis connection handle from deadpool") + } } diff --git a/pos/src/config.rs b/pos/src/config.rs index 6e14e0018..21d4b9859 100644 --- a/pos/src/config.rs +++ b/pos/src/config.rs @@ -23,6 +23,9 @@ pub struct AppConfig { #[clap(long, env)] pub database_url: String, + #[clap(long, env)] + pub queue_url: String, + #[command(flatten)] pub verbosity: Verbosity, } diff --git a/pos/src/main.rs b/pos/src/main.rs index acae4ad4d..b6e577d64 100644 --- a/pos/src/main.rs +++ b/pos/src/main.rs @@ -5,6 +5,7 @@ use chrono::{NaiveDateTime, Utc}; use clap::Parser; use clap_verbosity_flag::LevelFilter; use deadpool_diesel::postgres::Object; +use deadpool_redis::redis::{self}; use namada_sdk::time::DateTimeUtc; use orm::crawler_state::EpochStateInsertDb; use orm::migrations::run_migrations; @@ -13,10 +14,17 @@ use pos::app_state::AppState; use pos::config::AppConfig; use pos::repository::{self}; use pos::services::namada as namada_service; +use redis::AsyncCommands; use shared::crawler; use shared::crawler_state::{CrawlerName, EpochCrawlerState}; use shared::error::{AsDbError, AsRpcError, ContextDbInteractError, MainError}; +use shared::event_store::{ + publish, subscribe, PosEvents, PosInitializedEventV1, +}; use tendermint_rpc::HttpClient; +use tokio::signal; +use tokio::sync::{mpsc, oneshot}; +use tokio::task::JoinHandle; use tracing::Level; use tracing_subscriber::FmtSubscriber; @@ -41,8 +49,11 @@ async fn main() -> Result<(), MainError> { let client = Arc::new(HttpClient::new(config.tendermint_url.as_str()).unwrap()); - let app_state = AppState::new(config.database_url).into_db_error()?; - let conn = Arc::new(app_state.get_db_connection().await.into_db_error()?); + let app_state = Arc::new( + AppState::new(config.database_url, config.queue_url).into_db_error()?, + ); + let conn = app_state.get_db_connection().await.into_db_error()?; + let (tx, rx) = oneshot::channel::<()>(); // Run migrations run_migrations(&conn) @@ -50,16 +61,101 @@ async fn main() -> Result<(), MainError> { .context_db_interact_error() .into_db_error()?; - // We always start from the current epoch - let next_epoch = namada_service::get_current_epoch(&client.clone()) + let (events_tx, events_rx) = mpsc::channel::(100); + + let mut redis_conn = + app_state.get_redis_connection().await.into_db_error()?; + + let last_processed_id: String = redis_conn + .get("pos_last_processed_id") .await - .into_rpc_error()?; + .ok() + .unwrap_or("0".to_string()); - crawler::crawl( - move |epoch| crawling_fn(epoch, conn.clone(), client.clone()), - next_epoch, - ) - .await + let subscriber = tokio::spawn(subscribe( + redis_conn, + ("pos_last_processed_id".to_string(), last_processed_id), + events_tx, + rx, + )); + + let handler = tokio::spawn(message_processor( + events_rx, + Arc::clone(&client), + Arc::clone(&app_state), + )); + + tokio::select! { + _ = must_exit_handle() => { + tracing::info!("Received interrupt signal, shutting down..."); + tx.send(()).unwrap(); + } + _ = handler => { + tracing::info!("Handler finished..."); + } + _ = subscriber => { + tracing::info!("Subscriber finished..."); + } + } + + Ok(()) +} + +fn must_exit_handle() -> JoinHandle<()> { + tokio::spawn(async move { + signal::ctrl_c() + .await + .expect("Error receiving interrupt signal"); + }) +} + +async fn message_processor( + mut rx: mpsc::Receiver, + client: Arc, + app_state: Arc, +) -> anyhow::Result<()> { + tracing::info!("Starting message processor..."); + while let Some(event) = rx.recv().await { + tracing::info!("Received message: {:?}", event); + match event { + PosEvents::ChainInitializedEventV1(_) => { + tracing::info!("Chain is ready to process..."); + let client = Arc::clone(&client); + let conn = Arc::new(app_state.get_db_connection().await?); + let redis_conn = app_state.get_redis_connection().await?; + + tracing::info!("Starting crawler..."); + // We always start from the current epoch + let next_epoch = + namada_service::get_current_epoch(&client.clone()) + .await + .into_rpc_error()?; + + { + // TODO: we should wait for first crawl iteration to finish + publish( + redis_conn, + PosEvents::PosInitializedEventV1(PosInitializedEventV1), + ) + .await?; + } + + tokio::spawn(crawler::crawl( + move |epoch| { + crawling_fn(epoch, conn.clone(), client.clone()) + }, + next_epoch, + )); + } + PosEvents::Test(_) => { + tracing::info!("!!!!!Test message received...!!!!!"); + } + _ => {} + } + } + tracing::info!("Message processor finished..."); + + Ok(()) } async fn crawling_fn( diff --git a/pos/src/redis_subscriber.rs b/pos/src/redis_subscriber.rs new file mode 100644 index 000000000..5c9b3b12d --- /dev/null +++ b/pos/src/redis_subscriber.rs @@ -0,0 +1,81 @@ +use std::sync::Arc; + +use clap::Parser; +use deadpool_redis::redis::{self, Commands}; +use tokio::{ + sync::{watch, Mutex}, + task::{self, JoinHandle}, +}; + +use crate::config::AppConfig; + +pub fn subscribe(channel: String) -> anyhow::Result> { + let (_tx, rx) = watch::channel(false); + + let handle = tokio::spawn(async move { + let config = AppConfig::parse(); + let conn = redis::Client::open(config.queue_url) + .expect("failed") + .get_connection() + .expect("failed"); + let conn_arc = Arc::new(Mutex::new(conn)); + + tracing::info!("Subscribed to channel: {}", channel); + + let _rx = rx.clone(); + loop { + let conn = conn_arc.clone(); + let channel = channel.clone(); + + tracing::info!("Waiting for message"); + let result = task::spawn_blocking(|| async move { + tracing::info!("Blocking call started"); + let mut conn = conn.lock().await; + + let mut pubsub = conn.as_pubsub(); + pubsub.subscribe(channel.clone()).unwrap(); + + pubsub + .set_read_timeout(Some(std::time::Duration::from_secs(5))) + .unwrap(); + + let msg = pubsub.get_message().unwrap(); + tracing::info!("Received message: {:?}", msg); + + pubsub.unsubscribe(channel).unwrap(); + + msg + }); + + match result.await { + Ok(res) => { + tracing::info!( + "Blocking call result: {:?}", + res.await.get_payload::() + ) + } + Err(e) => println!("Blocking call failed: {}", e), + } + + // Check if we should stop + if *rx.borrow() { + break; + } + } + }); + + Ok(handle) +} + +pub fn publish_message(channel: String, message: String) -> anyhow::Result<()> { + let config = AppConfig::parse(); + let mut conn = redis::Client::open(config.queue_url) + .expect("failed") + .get_connection() + .expect("failed"); + + conn.publish(channel, message.clone())?; + tracing::info!("Published message: {}", message); + + Ok(()) +} diff --git a/shared/Cargo.toml b/shared/Cargo.toml index 96e457161..37456c8c6 100644 --- a/shared/Cargo.toml +++ b/shared/Cargo.toml @@ -15,17 +15,20 @@ path = "src/lib.rs" [dependencies] anyhow.workspace = true async-stream.workspace = true -bimap.workspace = true bigdecimal.workspace = true +bimap.workspace = true +deadpool-redis = "0.15.1" +fake.workspace = true futures-core.workspace = true futures-util.workspace = true futures.workspace = true namada_core.workspace = true namada_governance.workspace = true -namada_proof_of_stake.workspace = true namada_ibc.workspace = true +namada_proof_of_stake.workspace = true namada_sdk.workspace = true namada_tx.workspace = true +rand.workspace = true serde.workspace = true serde_json.workspace = true subtle-encoding.workspace = true @@ -35,5 +38,3 @@ thiserror.workspace = true tokio-retry.workspace = true tokio.workspace = true tracing.workspace = true -fake.workspace = true -rand.workspace = true \ No newline at end of file diff --git a/shared/src/event_store.rs b/shared/src/event_store.rs new file mode 100644 index 000000000..7f43554e7 --- /dev/null +++ b/shared/src/event_store.rs @@ -0,0 +1,139 @@ +use deadpool_redis::{ + redis::{ + streams::{StreamReadOptions, StreamReadReply}, + AsyncCommands, Value, + }, + Connection, +}; +use serde::{Deserialize, Serialize}; +use tokio::sync::{mpsc, oneshot}; + +const EVENT_STORE: &str = "event_store"; +const EVENT: &str = "event"; + +pub async fn subscribe( + mut redis_conn: Connection, + (last_processed_key, last_processed_val): (String, String), + tx: mpsc::Sender, + mut exit_rx: oneshot::Receiver<()>, +) -> anyhow::Result<()> +where + T: SupportedEvents, +{ + let opts = StreamReadOptions::default().count(1).block(0); + let mut last_processed_id = last_processed_val.clone(); + + loop { + let result: Option = redis_conn + .xread_options(&[EVENT_STORE], &[&last_processed_id], &opts) + .await?; + + if let Some(reply) = result { + for stream_key in reply.keys { + for stream_id in stream_key.clone().ids { + tracing::info!("Processing event: {:?}", stream_key); + let event = + stream_id.map.get(EVENT).expect("event key not found"); + + if let Value::Data(data) = event { + let event_str = std::str::from_utf8(data) + .expect("event is not valid utf8"); + + let event = T::from_stored(event_str); + + redis_conn + .set(&last_processed_key, stream_id.id.clone()) + .await?; + last_processed_id = stream_id.id; + + tx.send(event).await.expect("send failed"); + } + } + } + } + + if exit_rx.try_recv().is_ok() { + break; + } + } + + Ok(()) +} + +pub async fn publish( + mut redis_conn: Connection, + event: T, +) -> anyhow::Result<()> +where + T: SupportedEvents, +{ + let stored_event = event.to_stored(); + redis_conn.xadd(EVENT_STORE, "*", &[stored_event]).await?; + + Ok(()) +} + +// Define a macro to create event types with optional fields +macro_rules! define_event { + // Case with fields + ($name:ident, $($field_name:ident: $field_type:ty),*) => { + #[derive(Debug, Clone, Serialize, Deserialize)] + pub struct $name { + $(pub $field_name: $field_type),* + } + }; + + + // Case without fields + ($name:ident) => { + #[derive(Debug, Clone, Serialize, Deserialize)] + pub struct $name; + + impl Event for $name {} + }; +} + +pub trait Event: Clone + Serialize + Send + Sync { + fn name(&self) -> &'static str { + // Get the type name at runtime and strip the module path + let full_name = std::any::type_name::(); + // Find the last occurrence of "::" and return the substring after it + match full_name.rsplit("::").next() { + Some(name) => name, + None => full_name, + } + } + + // Default implementation of payload + fn payload(&self) -> Self { + self.clone() + } +} + +define_event!(PosInitializedEventV1); +define_event!(ChainInitializedEventV1); +define_event!(ChainProcessed, block: u32); +define_event!(Test); + +pub trait SupportedEvents: for<'a> Deserialize<'a> + Serialize { + fn from_stored(value: &str) -> Self { + serde_json::from_str(value).unwrap() + } + + fn to_stored(&self) -> (String, String) { + let value = serde_json::to_string(&self).unwrap(); + + ("event".to_string(), value) + } +} + +// TODO: move this to POS module +#[derive(Clone, Serialize, Deserialize, Debug)] +pub enum PosEvents { + PosInitializedEventV1(PosInitializedEventV1), + ChainInitializedEventV1(ChainInitializedEventV1), + ChainProcessed(ChainProcessed), + Test(Test), +} + +impl SupportedEvents for PosEvents {} diff --git a/shared/src/lib.rs b/shared/src/lib.rs index 512168955..a02eb66bb 100644 --- a/shared/src/lib.rs +++ b/shared/src/lib.rs @@ -6,6 +6,7 @@ pub mod checksums; pub mod crawler; pub mod crawler_state; pub mod error; +pub mod event_store; pub mod gas; pub mod genesis; pub mod header; diff --git a/webserver/Cargo.toml b/webserver/Cargo.toml index e0acd0473..5bc5a76f0 100644 --- a/webserver/Cargo.toml +++ b/webserver/Cargo.toml @@ -44,11 +44,13 @@ tokio-stream.workspace = true namada_core.workspace = true namada_sdk.workspace = true namada_parameters.workspace = true -deadpool-redis = "0.13.0" +deadpool-redis = "0.15.1" +redis = {version = "0.25.0", features = ["streams"]} bigdecimal.workspace = true shared.workspace = true strum.workspace = true strum_macros.workspace = true +async-stream = "0.3.5" [build-dependencies] vergen = { version = "8.0.0", features = ["build", "git", "gitcl"] } diff --git a/webserver/src/app.rs b/webserver/src/app.rs index 378f5b948..caaffbfbd 100644 --- a/webserver/src/app.rs +++ b/webserver/src/app.rs @@ -1,14 +1,19 @@ use std::net::{Ipv4Addr, SocketAddr}; +use std::sync::Arc; use std::time::Duration; use axum::error_handling::HandleErrorLayer; use axum::http::{HeaderValue, StatusCode}; use axum::response::IntoResponse; use axum::routing::get; +use axum::Extension; use axum::{BoxError, Json, Router}; use lazy_static::lazy_static; use namada_sdk::tendermint_rpc::HttpClient; use serde_json::json; +use shared::event_store::PosEvents; +use tokio::sync::mpsc::Receiver; +use tokio::sync::Mutex; use tower::buffer::BufferLayer; use tower::limit::RateLimitLayer; use tower::ServiceBuilder; @@ -33,16 +38,25 @@ lazy_static! { pub struct ApplicationServer; impl ApplicationServer { - pub async fn serve(config: AppConfig) -> anyhow::Result<()> { + pub async fn serve( + config: AppConfig, + events_rx: Receiver, + ) -> anyhow::Result<()> { let db_url = config.database_url.clone(); let cache_url = config.cache_url.clone(); let app_state = AppState::new(db_url, cache_url); let client = HttpClient::new(config.tendermint_url.as_str()).unwrap(); + // let events_stream = + // tokio_stream::wrappers::ReceiverStream::new(events_rx); let routes = { - let common_state = - CommonState::new(client, config.clone(), app_state.clone()); + let common_state = CommonState::new( + client, + config.clone(), + app_state.clone(), + Arc::new(Mutex::new(events_rx)), + ); Router::new() .route("/pos/validator", get(pos_handlers::get_validators)) diff --git a/webserver/src/handler/chain.rs b/webserver/src/handler/chain.rs index 8373d80fa..440af5d65 100644 --- a/webserver/src/handler/chain.rs +++ b/webserver/src/handler/chain.rs @@ -1,5 +1,6 @@ use std::convert::Infallible; -use std::time::Duration; +use std::ops::{Deref, DerefMut}; +use std::pin::Pin; use axum::extract::State; use axum::http::HeaderMap; @@ -24,34 +25,50 @@ struct ChainStatusEvent { pub async fn chain_status( State(state): State, ) -> Sse>> { - let stream = tokio_stream::wrappers::IntervalStream::new( - tokio::time::interval(Duration::from_secs(3)), - ) - .then(move |_| { - let state = state.clone(); - - async move { - let height = state - .chain_service - .find_last_processed_block() - .await - .expect("Failed to get last processed block"); - - let epoch = state - .chain_service - .find_last_processed_epoch() - .await - .expect("Failed to get last processed epoch"); - + // let stream = state.events_rx.map(|event| { + // let event = + // serde_json::to_string(&event).expect("Failed to serialize event"); + + // Ok(Event::default().data(event)) + // }); + // let stream = tokio_stream::wrappers::IntervalStream::new( + // tokio::time::interval(Duration::from_secs(3)), + // ) + // .then(move |_| { + // let state = state.clone(); + + // async move { + // let height = state + // .chain_service + // .find_last_processed_block() + // .await + // .expect("Failed to get last processed block"); + + // let epoch = state + // .chain_service + // .find_last_processed_epoch() + // .await + // .expect("Failed to get last processed epoch"); + + // let event = + // serde_json::to_string(&ChainStatusEvent { height, epoch }) + // .expect("Failed to serialize event"); + + // Ok(Event::default().data(event)) + // } + // }); + // Convert the channels to a `Stream`. + let rx1 = async_stream::stream! { + let mut events_rx = state.events_rx.lock().await; + + while let Some(event) = events_rx.recv().await { let event = - serde_json::to_string(&ChainStatusEvent { height, epoch }) - .expect("Failed to serialize event"); - - Ok(Event::default().data(event)) + serde_json::to_string(&event).expect("Failed to serialize event"); + yield Ok(Event::default().data(event)); } - }); + }; - Sse::new(stream).keep_alive(KeepAlive::default()) + Sse::new(rx1).keep_alive(KeepAlive::default()) } pub async fn get_parameters( diff --git a/webserver/src/main.rs b/webserver/src/main.rs index 19dad7ffb..11ad9597a 100644 --- a/webserver/src/main.rs +++ b/webserver/src/main.rs @@ -1,5 +1,9 @@ use anyhow::Context; use clap::Parser; +use deadpool_redis::redis::AsyncCommands; +use deadpool_redis::{Config, Runtime}; +use shared::event_store::{subscribe, PosEvents}; +use tokio::sync::{mpsc, oneshot}; use webserver::app::ApplicationServer; use webserver::config::AppConfig; @@ -11,9 +15,34 @@ async fn main() -> anyhow::Result<()> { .with_max_level(tracing::Level::INFO) .init(); - ApplicationServer::serve(config) + let (tx, rx) = oneshot::channel(); + let (events_tx, events_rx) = mpsc::channel::(100); + let cfg = Config::from_url("redis://redis@0.0.0.0:6379"); + let redis_pool = cfg.create_pool(Some(Runtime::Tokio1)).unwrap(); + let mut redis_conn = redis_pool.get().await.unwrap(); + + let last_processed_id: String = redis_conn + .get("webserver_last_processed_id") .await - .context("could not initialize application routes")?; + .ok() + .unwrap_or("0".to_string()); + + let var_name = + ("webserver_last_processed_id".to_string(), last_processed_id); + let subscriber = + tokio::spawn(subscribe(redis_conn, var_name, events_tx, rx)); + + let server = ApplicationServer::serve(config, events_rx); + + tokio::select! { + _ = server => { + tracing::info!("Received interrupt signal, shutting down..."); + tx.send(()).unwrap(); + } + _ = subscriber => { + tracing::info!("Subscriber finished..."); + } + } Ok(()) } diff --git a/webserver/src/state/common.rs b/webserver/src/state/common.rs index f50fafb06..779bfeda5 100644 --- a/webserver/src/state/common.rs +++ b/webserver/src/state/common.rs @@ -1,4 +1,9 @@ +use std::sync::Arc; + use namada_sdk::tendermint_rpc::HttpClient; +use shared::event_store::PosEvents; +use tokio::sync::mpsc::Receiver; +use tokio::sync::Mutex; use crate::appstate::AppState; use crate::config::AppConfig; @@ -23,10 +28,16 @@ pub struct CommonState { pub crawler_state_service: CrawlerStateService, pub client: HttpClient, pub config: AppConfig, + pub events_rx: Arc>>, } impl CommonState { - pub fn new(client: HttpClient, config: AppConfig, data: AppState) -> Self { + pub fn new( + client: HttpClient, + config: AppConfig, + data: AppState, + events_rx: Arc>>, + ) -> Self { Self { pos_service: PosService::new(data.clone()), gov_service: GovernanceService::new(data.clone()), @@ -38,6 +49,7 @@ impl CommonState { crawler_state_service: CrawlerStateService::new(data.clone()), client, config, + events_rx, } } }