diff --git a/Cargo.lock b/Cargo.lock index 6b6a6efdb8..a540f4c743 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -107,6 +107,18 @@ dependencies = [ "syn", ] +[[package]] +name = "async-tls" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95fd83426b89b034bf4e9ceb9c533c2f2386b813fd3dcae0a425ec6f1837d78a" +dependencies = [ + "futures 0.3.5", + "rustls 0.17.0", + "webpki", + "webpki-roots", +] + [[package]] name = "async-trait" version = "0.1.36" @@ -118,6 +130,21 @@ dependencies = [ "syn", ] +[[package]] +name = "async-tungstenite" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2760f968801f873112e651c5a9c3b60c11ae6d66e7ca3f2d51aafbfcef9495fa" +dependencies = [ + "async-tls", + "futures-io", + "futures-util", + "log 0.4.8", + "pin-project", + "tokio", + "tungstenite 0.11.0", +] + [[package]] name = "atty" version = "0.2.14" @@ -141,6 +168,19 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d" +[[package]] +name = "backoff" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "721c249ab59cbc483ad4294c9ee2671835c1e43e9ffc277e6b4ecfef733cfdc5" +dependencies = [ + "futures-core", + "instant", + "pin-project", + "rand 0.7.3", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.49" @@ -624,6 +664,20 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69323bff1fb41c635347b8ead484a5ca6c3f11914d784170b158d8449ab07f8e" +dependencies = [ + "cfg-if", + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue 0.2.3", + "crossbeam-utils 0.7.2", +] + [[package]] name = "crossbeam-channel" version = "0.4.2" @@ -885,6 +939,44 @@ dependencies = [ "version_check 0.9.2", ] +[[package]] +name = "explorer" +version = "0.1.0" +dependencies = [ + "async-tungstenite", + "backoff", + "cardano-legacy-address", + "chain-addr", + "chain-core", + "chain-crypto", + "chain-impl-mockchain", + "chain-ser", + "chain-time", + "futures 0.3.5", + "futures-channel", + "futures-util", + "imhamt", + "jormungandr-lib", + "juniper", + "reqwest", + "serde", + "serde_json", + "serde_yaml", + "slog", + "slog-async", + "slog-gelf", + "slog-journald", + "slog-json", + "slog-term", + "sloggers", + "structopt", + "thiserror", + "tokio", + "tungstenite 0.11.0", + "url", + "warp", +] + [[package]] name = "fake-simd" version = "0.1.2" @@ -1757,6 +1849,24 @@ version = "0.2.71" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9457b06509d27052635f90d6466700c65095fdf75409b3fbdd903e988b886f49" +[[package]] +name = "libflate" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9bac9023e1db29c084f9f8cd9d3852e5e8fddf98fb47c4964a0ea4663d95949" +dependencies = [ + "adler32", + "crc32fast", + "libflate_lz77", + "rle-decode-fast", +] + +[[package]] +name = "libflate_lz77" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3286f09f7d4926fc486334f28d8d2e6ebe4f7f9994494b6dab27ddfad2c9b11b" + [[package]] name = "libsqlite3-sys" version = "0.18.0" @@ -2912,6 +3022,12 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "rle-decode-fast" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cabe4fa914dec5870285fa7f71f602645da47c486e68486d2b4ceb4a343e90ac" + [[package]] name = "rusqlite" version = "0.23.1" @@ -3252,6 +3368,39 @@ dependencies = [ "slog", ] +[[package]] +name = "slog-kvfilter" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae939ed7d169eed9699f4f5cd440f046f5dc5dfc27c19e3cd311619594c175e0" +dependencies = [ + "regex", + "slog", +] + +[[package]] +name = "slog-scope" +version = "4.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c44c89dd8b0ae4537d1ae318353eaf7840b4869c536e31c41e963d1ea523ee6" +dependencies = [ + "arc-swap", + "lazy_static", + "slog", +] + +[[package]] +name = "slog-stdlog" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4d87903baf655da2d82bc3ac3f7ef43868c58bf712b3a661fda72009304c23" +dependencies = [ + "crossbeam", + "log 0.4.8", + "slog", + "slog-scope", +] + [[package]] name = "slog-syslog" version = "0.12.0" @@ -3276,6 +3425,25 @@ dependencies = [ "thread_local", ] +[[package]] +name = "sloggers" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f01d37507aa6f37490cfa08d71e2639b16906e84c285ae4b9f7ec7ca35756d69" +dependencies = [ + "chrono", + "libflate", + "regex", + "serde", + "slog", + "slog-async", + "slog-kvfilter", + "slog-scope", + "slog-stdlog", + "slog-term", + "trackable", +] + [[package]] name = "smallvec" version = "0.6.13" @@ -4018,6 +4186,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "trackable" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30fb6e13d129dd92c501458f64d56c708e3685e3fd307e878ec5f934c5c5bdb0" +dependencies = [ + "trackable_derive", +] + +[[package]] +name = "trackable_derive" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebeb235c5847e2f82cfe0f07eb971d1e5f6804b18dac2ae16349cc604380f82f" +dependencies = [ + "quote", + "syn", +] + [[package]] name = "treeline" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 85d727e438..25173e4adb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ members = [ "jormungandr-lib", "jormungandr", "jcli", + "explorer", "testing/jormungandr-testing-utils", "testing/jormungandr-integration-tests", "testing/jormungandr-scenario-tests", diff --git a/explorer/Cargo.toml b/explorer/Cargo.toml new file mode 100644 index 0000000000..270743d30c --- /dev/null +++ b/explorer/Cargo.toml @@ -0,0 +1,52 @@ +[package] +authors = ["dev@iohk.io"] +description = "explorer service for jormungandr" +documentation = "https://github.com/input-output-hk/jormungandr#USAGE.md" +edition = "2018" +homepage = "https://github.com/input-output-hk/jormungandr#README.md" +license = "MIT OR Apache-2.0" +name = "explorer" +repository = "https://github.com/input-output-hk/jormungandr" +version = "0.9.1" + +[dependencies] +async-tungstenite = {version = "0.7.1", features = ["tokio-runtime", "real-async-tls"]} +backoff = {version = "0.2.1", features = ["tokio"]} +futures = "0.3.5" +futures-channel = "0.3.5" +futures-util = "0.3.5" +juniper = "0.14.2" +serde = {version = "1.0.114", features = ["derive"]} +serde_json = "1.0.56" +serde_yaml = "0.8.13" +slog = {version = "^2.5.1", features = ["max_level_trace", "release_max_level_trace"]} +slog-async = "2.5.0" +slog-gelf = {version = "0.1.2", optional = true} +slog-journald = {version = "2.1.1", optional = true} +slog-json = "2.3.0" +slog-term = "2.6.0" +sloggers = "1.0.1" +structopt = "0.3.15" +thiserror = "1.0.20" +tokio = {version = "0.2.21", features = ["macros", "io-std", "sync", "stream", "signal"]} +url = "2.1.1" +warp = {version = "0.2.3", features = ["tls"]} + +cardano-legacy-address = {path = "../chain-deps/cardano-legacy-address"} +chain-addr = {path = "../chain-deps/chain-addr"} +chain-core = {path = "../chain-deps/chain-core"} +chain-crypto = {path = "../chain-deps/chain-crypto"} +chain-impl-mockchain = {path = "../chain-deps/chain-impl-mockchain"} +chain-ser = {path = "../chain-deps/chain-ser"} +chain-time = {path = "../chain-deps/chain-time"} +imhamt = {path = "../chain-deps/imhamt"} +jormungandr-lib = {path = "../jormungandr-lib"} + +[dependencies.tungstenite] +default-features = false +version = "0.11.0" + +[dependencies.reqwest] +default-features = false +features = ["rustls-tls"] +version = "0.10.6" diff --git a/explorer/src/api/graphql/certificates.rs b/explorer/src/api/graphql/certificates.rs new file mode 100644 index 0000000000..9cc4120c6e --- /dev/null +++ b/explorer/src/api/graphql/certificates.rs @@ -0,0 +1,333 @@ +use super::error::Error; +use super::scalars::{PayloadType, PoolId, PublicKey, TimeOffsetSeconds, VotePlanId}; +use super::{Address, BlockDate, Context, ExplorerAddress, Pool, Proposal, TaxType}; +use chain_impl_mockchain::certificate; +use juniper::graphql_union; +use juniper::FieldResult; +use std::convert::TryFrom; + +// interface for grouping certificates as a graphl union +pub enum Certificate { + StakeDelegation(StakeDelegation), + OwnerStakeDelegation(OwnerStakeDelegation), + PoolRegistration(Box), + PoolRetirement(PoolRetirement), + PoolUpdate(Box), + VotePlan(VotePlan), + VoteCast(VoteCast), + VoteTally(VoteTally), +} + +pub struct StakeDelegation(certificate::StakeDelegation); + +pub struct PoolRegistration(certificate::PoolRegistration); + +pub struct OwnerStakeDelegation(certificate::OwnerStakeDelegation); + +/// Retirement info for a pool +pub struct PoolRetirement(certificate::PoolRetirement); + +pub struct PoolUpdate(certificate::PoolUpdate); + +pub struct VotePlan(certificate::VotePlan); + +pub struct VoteCast(certificate::VoteCast); + +pub struct VoteTally(certificate::VoteTally); + +graphql_union!(Certificate: Context |&self| { + // the left hand side of the `instance_resolvers` match-like pub structure is the one + // that's used to match in the graphql query with the `__typename` field + instance_resolvers: |_| { + &StakeDelegation => match *self { Certificate::StakeDelegation(ref c) => Some(c), _ => None }, + &OwnerStakeDelegation => match *self { Certificate::OwnerStakeDelegation(ref c) => Some(c), _ => None }, + &PoolRegistration => match *self { Certificate::PoolRegistration(ref c) => Some(&**c), _ => None }, + &PoolUpdate => match *self { Certificate::PoolUpdate(ref c) => Some(&**c), _ => None}, + &PoolRetirement => match *self { Certificate::PoolRetirement(ref c) => Some(c), _ => None}, + &VotePlan => match *self { Certificate::VotePlan(ref c) => Some(c), _ => None}, + &VoteCast => match *self { Certificate::VoteCast(ref c) => Some(c), _ => None}, + &VoteTally => match *self { Certificate::VoteTally(ref c) => Some(c), _ => None}, + } +}); + +#[juniper::object( + Context = Context, +)] +impl StakeDelegation { + // FIXME: Maybe a new Account type would be better? + pub fn account(&self, context: &Context) -> FieldResult
{ + let discrimination = context.db.blockchain_config.discrimination; + self.0 + .account_id + .to_single_account() + .ok_or_else(|| + // TODO: Multisig address? + Error::Unimplemented("account stake delegation".to_owned())) + .map(|single| { + chain_addr::Address(discrimination, chain_addr::Kind::Account(single.into())) + }) + .map(|addr| Address::from(&ExplorerAddress::New(addr))) + .map_err(|e| e.into()) + } + + pub fn pools(&self, context: &Context) -> Vec { + use chain_impl_mockchain::account::DelegationType; + use std::iter::FromIterator as _; + + match self.0.get_delegation_type() { + DelegationType::NonDelegated => vec![], + DelegationType::Full(id) => vec![Pool::from_valid_id(id.clone())], + DelegationType::Ratio(delegation_ratio) => Vec::from_iter( + delegation_ratio + .pools() + .iter() + .cloned() + .map(|(p, _)| Pool::from_valid_id(p)), + ), + } + } +} + +#[juniper::object( + Context = Context, +)] +impl PoolRegistration { + pub fn pool(&self, context: &Context) -> Pool { + Pool::from_valid_id(self.0.to_id()) + } + + /// Beginning of validity for this pool, this is used + /// to keep track of the period of the expected key and the expiry + pub fn start_validity(&self) -> TimeOffsetSeconds { + self.0.start_validity.into() + } + + /// Management threshold for owners, this need to be <= #owners and > 0 + pub fn management_threshold(&self) -> i32 { + // XXX: u8 fits in i32, but maybe some kind of custom scalar is better? + self.0.management_threshold().into() + } + + /// Owners of this pool + pub fn owners(&self) -> Vec { + self.0.owners.iter().map(PublicKey::from).collect() + } + + pub fn operators(&self) -> Vec { + self.0.operators.iter().map(PublicKey::from).collect() + } + + pub fn rewards(&self) -> TaxType { + TaxType(self.0.rewards) + } + + /// Reward account + pub fn reward_account(&self, context: &Context) -> Option
{ + use chain_impl_mockchain::transaction::AccountIdentifier; + let discrimination = context.db.blockchain_config.discrimination; + + // FIXME: Move this transformation to a point earlier + + self.0 + .reward_account + .clone() + .map(|acc_id| match acc_id { + AccountIdentifier::Single(d) => ExplorerAddress::New(chain_addr::Address( + discrimination, + chain_addr::Kind::Account(d.into()), + )), + AccountIdentifier::Multi(d) => { + let mut bytes = [0u8; 32]; + bytes.copy_from_slice(&d.as_ref()[0..32]); + ExplorerAddress::New(chain_addr::Address( + discrimination, + chain_addr::Kind::Multisig(bytes), + )) + } + }) + .map(|explorer_address| Address { + id: explorer_address, + }) + } + + // Genesis Praos keys + // pub keys: GenesisPraosLeader, +} + +#[juniper::object( + Context = Context, +)] +impl OwnerStakeDelegation { + fn pools(&self) -> Vec { + use chain_impl_mockchain::account::DelegationType; + use std::iter::FromIterator as _; + + match self.0.get_delegation_type() { + DelegationType::NonDelegated => vec![], + DelegationType::Full(id) => vec![Pool::from_valid_id(id.clone())], + DelegationType::Ratio(delegation_ratio) => Vec::from_iter( + delegation_ratio + .pools() + .iter() + .cloned() + .map(|(p, _)| Pool::from_valid_id(p)), + ), + } + } +} + +#[juniper::object( + Context = Context, +)] +impl PoolRetirement { + pub fn pool_id(&self) -> PoolId { + PoolId(format!("{}", self.0.pool_id)) + } + + pub fn retirement_time(&self) -> TimeOffsetSeconds { + self.0.retirement_time.into() + } +} + +#[juniper::object( + Context = Context, +)] +impl PoolUpdate { + pub fn pool_id(&self) -> PoolId { + PoolId(format!("{}", self.0.pool_id)) + } + + pub fn start_validity(&self) -> TimeOffsetSeconds { + self.0.new_pool_reg.start_validity.into() + } + + // TODO: Previous keys? + // TODO: Updated keys? +} + +#[juniper::object( + Context = Context, +)] +impl VotePlan { + /// the vote start validity + pub fn vote_start(&self) -> BlockDate { + self.0.vote_start().into() + } + + /// the duration within which it is possible to vote for one of the proposals + /// of this voting plan. + pub fn vote_end(&self) -> BlockDate { + self.0.vote_end().into() + } + + /// the committee duration is the time allocated to the committee to open + /// the ballots and publish the results on chain + pub fn committee_end(&self) -> BlockDate { + self.0.committee_end().into() + } + + pub fn payload_type(&self) -> PayloadType { + self.0.payload_type().into() + } + + /// the proposals to vote for + pub fn proposals(&self) -> Vec { + self.0.proposals().iter().cloned().map(Proposal).collect() + } +} + +#[juniper::object( + Context = Context, +)] +impl VoteCast { + pub fn vote_plan(&self) -> VotePlanId { + self.0.vote_plan().clone().into() + } + + pub fn proposal_index(&self) -> i32 { + self.0.proposal_index() as i32 + } +} + +#[juniper::object( + Context = Context, +)] +impl VoteTally { + pub fn vote_plan(&self) -> VotePlanId { + self.0.id().clone().into() + } +} + +/*------------------------------*/ +/*------- Conversions ---------*/ +/*----------------------------*/ + +impl TryFrom for Certificate { + type Error = super::error::Error; + fn try_from( + original: chain_impl_mockchain::certificate::Certificate, + ) -> Result { + match original { + certificate::Certificate::StakeDelegation(c) => { + Ok(Certificate::StakeDelegation(StakeDelegation(c))) + } + certificate::Certificate::OwnerStakeDelegation(c) => { + Ok(Certificate::OwnerStakeDelegation(OwnerStakeDelegation(c))) + } + certificate::Certificate::PoolRegistration(c) => { + Ok(Certificate::PoolRegistration(Box::new(PoolRegistration(c)))) + } + certificate::Certificate::PoolRetirement(c) => { + Ok(Certificate::PoolRetirement(PoolRetirement(c))) + } + certificate::Certificate::PoolUpdate(c) => { + Ok(Certificate::PoolUpdate(Box::new(PoolUpdate(c)))) + } + certificate::Certificate::VotePlan(c) => Ok(Certificate::VotePlan(VotePlan(c))), + certificate::Certificate::VoteCast(c) => Ok(Certificate::VoteCast(VoteCast(c))), + certificate::Certificate::VoteTally(c) => Ok(Certificate::VoteTally(VoteTally(c))), + } + } +} + +impl From for StakeDelegation { + fn from(delegation: certificate::StakeDelegation) -> StakeDelegation { + StakeDelegation(delegation) + } +} + +impl From for OwnerStakeDelegation { + fn from(owner_stake_delegation: certificate::OwnerStakeDelegation) -> OwnerStakeDelegation { + OwnerStakeDelegation(owner_stake_delegation) + } +} + +impl From for PoolRegistration { + fn from(registration: certificate::PoolRegistration) -> PoolRegistration { + PoolRegistration(registration) + } +} + +impl From for PoolRetirement { + fn from(pool_retirement: certificate::PoolRetirement) -> PoolRetirement { + PoolRetirement(pool_retirement) + } +} + +impl From for PoolUpdate { + fn from(pool_update: certificate::PoolUpdate) -> PoolUpdate { + PoolUpdate(pool_update) + } +} + +impl From for VotePlan { + fn from(vote_plan: certificate::VotePlan) -> VotePlan { + VotePlan(vote_plan) + } +} + +impl From for VoteCast { + fn from(vote_cast: certificate::VoteCast) -> VoteCast { + VoteCast(vote_cast) + } +} diff --git a/explorer/src/api/graphql/connections.rs b/explorer/src/api/graphql/connections.rs new file mode 100644 index 0000000000..d11cf40257 --- /dev/null +++ b/explorer/src/api/graphql/connections.rs @@ -0,0 +1,452 @@ +use super::error::Error; +use super::scalars::{BlockCount, IndexCursor, PoolCount, TransactionCount}; +use super::{Block, Context, Pool, Transaction}; +use crate::db::indexing::ExplorerTransaction; +use chain_impl_mockchain::header::HeaderId as HeaderHash; +use juniper::FieldResult; +use std::convert::TryFrom; + +#[juniper::object( + Context = Context +)] +impl PageInfo { + pub fn has_next_page(&self) -> bool { + self.has_next_page + } + + pub fn has_previous_page(&self) -> bool { + self.has_previous_page + } + + pub fn start_cursor(&self) -> &Option { + &self.start_cursor + } + + pub fn end_cursor(&self) -> &Option { + &self.end_cursor + } +} + +#[juniper::object( + Context = Context +)] +impl BlockEdge { + pub fn node(&self) -> &Block { + &self.node + } + + /// A cursor for use in pagination + pub fn cursor(&self) -> &IndexCursor { + &self.cursor + } +} + +#[juniper::object( + Context = Context +)] +impl TransactionEdge { + pub fn node(&self) -> &Transaction { + &self.node + } + + /// A cursor for use in pagination + pub fn cursor(&self) -> &IndexCursor { + &self.cursor + } +} + +#[juniper::object( + Context = Context +)] +impl PoolEdge { + pub fn node(&self) -> &Pool { + &self.node + } + + /// A cursor for use in pagination + pub fn cursor(&self) -> &IndexCursor { + &self.cursor + } +} + +#[juniper::object( + Context = Context, + name = "BlockConnection" +)] +impl BlockConnection { + pub fn page_info(&self) -> &PageInfo { + &self.page_info + } + + pub fn edges(&self) -> &Vec { + &self.edges + } + + /// A count of the total number of objects in this connection, ignoring pagination. + pub fn total_count(&self) -> &BlockCount { + &self.total_count + } +} + +#[juniper::object( + Context = Context, + name = "TransactionConnection" +)] +impl TransactionConnection { + pub fn page_info(&self) -> &PageInfo { + &self.page_info + } + + pub fn edges(&self) -> &Vec { + &self.edges + } + + /// A count of the total number of objects in this connection, ignoring pagination. + pub fn total_count(&self) -> &TransactionCount { + &self.total_count + } +} + +#[juniper::object( + Context = Context, + name = "PoolConnection" +)] +impl PoolConnection { + pub fn page_info(&self) -> &PageInfo { + &self.page_info + } + + pub fn edges(&self) -> &Vec { + &self.edges + } + + /// A count of the total number of objects in this connection, ignoring pagination. + pub fn total_count(&self) -> &PoolCount { + &self.total_count + } +} + +pub struct PageInfo { + pub has_next_page: bool, + pub has_previous_page: bool, + pub start_cursor: Option, + pub end_cursor: Option, +} + +pub struct Connection { + page_info: PageInfo, + edges: Vec, + total_count: C, +} + +pub struct TransactionEdge { + node: Transaction, + cursor: IndexCursor, +} + +pub struct BlockEdge { + pub node: Block, + pub cursor: IndexCursor, +} + +pub struct PoolEdge { + node: Pool, + pub cursor: IndexCursor, +} + +pub trait Edge { + type Node; + fn new(node: Self::Node, cursor: IndexCursor) -> Self; + + fn cursor(&self) -> &IndexCursor; +} + +pub struct ValidatedPaginationArguments { + first: Option, + last: Option, + before: Option, + after: Option, +} + +pub struct PaginationArguments { + pub first: Option, + pub last: Option, + pub before: Option, + pub after: Option, +} + +impl Connection +where + E: Edge, + C: From, + E::Node: Clone, +{ + pub fn new( + bounds: PaginationInterval, + pagination_arguments: ValidatedPaginationArguments, + get_node_range: impl Fn(PaginationInterval) -> Vec<(E::Node, I)>, + ) -> FieldResult> + where + I: TryFrom, + u64: From, + I: Clone, + IndexCursor: From, + { + let pagination_arguments = pagination_arguments.cursors_into::(); + let bounds = bounds.bounds_into::(); + + let (page_interval, has_next_page, has_previous_page, total_count) = match bounds { + PaginationInterval::Empty => (PaginationInterval::Empty, false, false, 0.into()), + PaginationInterval::Inclusive(total_elements) => { + let InclusivePaginationInterval { + upper_bound, + lower_bound, + } = total_elements; + + let page = compute_range_boundaries(total_elements, pagination_arguments)?; + + let has_next_page = page.upper_bound < upper_bound; + let has_previous_page = page.lower_bound > lower_bound; + + let total_count = upper_bound + .checked_add(1) + .unwrap() + .checked_sub(lower_bound) + .expect("upper_bound should be >= than lower_bound") + .into(); + ( + PaginationInterval::Inclusive(page), + has_next_page, + has_previous_page, + total_count, + ) + } + }; + + let page_interval = page_interval + .bounds_try_into::() + .map_err(|_| "computed page interval is outside pagination boundaries") + .unwrap(); + + let edges: Vec<_> = get_node_range(page_interval) + .iter() + .map(|(hash, node_pagination_identifier)| { + E::new((*hash).clone(), node_pagination_identifier.clone().into()) + }) + .collect(); + + let start_cursor = edges.first().map(|e| e.cursor().clone()); + let end_cursor = edges + .last() + .map(|e| e.cursor().clone()) + .or_else(|| start_cursor.clone()); + + Ok(Connection { + edges, + page_info: PageInfo { + has_next_page, + has_previous_page, + start_cursor, + end_cursor, + }, + total_count, + }) + } +} + +pub type BlockConnection = Connection; +pub type TransactionConnection = Connection; +pub type PoolConnection = Connection; + +#[derive(Clone)] +#[allow(clippy::large_enum_variant)] +pub enum TransactionNodeFetchInfo { + Id(HeaderHash), + Contents(ExplorerTransaction), +} + +impl Edge for TransactionEdge { + type Node = TransactionNodeFetchInfo; + fn new(node: Self::Node, cursor: IndexCursor) -> TransactionEdge { + let tx = match node { + TransactionNodeFetchInfo::Id(hash) => Transaction::from_valid_id(hash), + TransactionNodeFetchInfo::Contents(tx) => Transaction::from_contents(tx), + }; + + TransactionEdge { node: tx, cursor } + } + + fn cursor(&self) -> &IndexCursor { + &self.cursor + } +} + +impl Edge for BlockEdge { + type Node = HeaderHash; + fn new(node: Self::Node, cursor: IndexCursor) -> Self { + BlockEdge { + node: Block::from_valid_hash(node), + cursor, + } + } + + fn cursor(&self) -> &IndexCursor { + &self.cursor + } +} + +impl Edge for PoolEdge { + type Node = Pool; + fn new(node: Self::Node, cursor: IndexCursor) -> Self { + PoolEdge { node, cursor } + } + + fn cursor(&self) -> &IndexCursor { + &self.cursor + } +} + +fn compute_range_boundaries( + total_elements: InclusivePaginationInterval, + pagination_arguments: ValidatedPaginationArguments, +) -> FieldResult> +where +{ + use std::cmp::{max, min}; + + let InclusivePaginationInterval { + upper_bound, + lower_bound, + } = total_elements; + + // Compute the required range of blocks in two variables: [from, to] + // Both ends are inclusive + let mut from: u64 = match pagination_arguments.after { + Some(cursor) => max(cursor + 1, lower_bound), + // If `after` is not set, start from the beginning + None => lower_bound, + }; + + let mut to: u64 = match pagination_arguments.before { + Some(cursor) => min(cursor - 1, upper_bound), + // If `before` is not set, start from the beginning + None => upper_bound, + }; + + // Move `to` enough values to make the result have `first` blocks + if let Some(first) = pagination_arguments.first { + to = min( + from.checked_add(u64::from(first)) + .and_then(|n| n.checked_sub(1)) + .unwrap_or(to), + to, + ); + } + + // Move `from` enough values to make the result have `last` blocks + if let Some(last) = pagination_arguments.last { + from = max( + to.checked_sub(u64::from(last)) + .and_then(|n| n.checked_add(1)) + .unwrap_or(from), + from, + ); + } + + Ok(InclusivePaginationInterval { + lower_bound: from, + upper_bound: to, + }) +} + +impl PaginationArguments { + pub fn validate(self) -> FieldResult> { + let first = self + .first + .map(|signed| -> FieldResult { + if signed < 0 { + Err(Error::ArgumentError("first argument should be positive".to_owned()).into()) + } else { + Ok(u32::try_from(signed).unwrap()) + } + }) + .transpose()?; + + let last = self + .last + .map(|signed| -> FieldResult { + if signed < 0 { + Err(Error::ArgumentError("last argument should be positive".to_owned()).into()) + } else { + Ok(u32::try_from(signed).unwrap()) + } + }) + .transpose()?; + + let before = self.before; + let after = self.after; + + Ok(ValidatedPaginationArguments { + first, + after, + last, + before, + }) + } +} + +impl ValidatedPaginationArguments { + fn cursors_into(self) -> ValidatedPaginationArguments + where + T: From, + { + ValidatedPaginationArguments { + after: self.after.map(T::from), + before: self.before.map(T::from), + first: self.first, + last: self.last, + } + } +} + +pub enum PaginationInterval { + Empty, + Inclusive(InclusivePaginationInterval), +} + +pub struct InclusivePaginationInterval { + pub lower_bound: I, + pub upper_bound: I, +} + +impl PaginationInterval { + fn bounds_into(self) -> PaginationInterval + where + T: From, + { + match self { + Self::Empty => PaginationInterval::::Empty, + Self::Inclusive(interval) => { + PaginationInterval::::Inclusive(InclusivePaginationInterval:: { + lower_bound: T::from(interval.lower_bound), + upper_bound: T::from(interval.upper_bound), + }) + } + } + } + + fn bounds_try_into(self) -> Result, >::Error> + where + T: TryFrom, + { + match self { + Self::Empty => Ok(PaginationInterval::::Empty), + Self::Inclusive(interval) => Ok(PaginationInterval::::Inclusive( + InclusivePaginationInterval:: { + lower_bound: T::try_from(interval.lower_bound)?, + upper_bound: T::try_from(interval.upper_bound)?, + }, + )), + } + } +} diff --git a/explorer/src/api/graphql/error.rs b/explorer/src/api/graphql/error.rs new file mode 100644 index 0000000000..df64d2a6c7 --- /dev/null +++ b/explorer/src/api/graphql/error.rs @@ -0,0 +1,17 @@ +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum Error { + #[error("Internal Error: `{0}`")] + InternalError(String), + #[error("Not Found: `{0}`")] + NotFound(String), + #[error("Unimplemented `{0}`")] + Unimplemented(String), + #[error("Argument Error `{0}`")] + ArgumentError(String), + #[error("Invalid Pagination cursor `{0}`")] + InvalidCursor(String), + #[error("Invalid address `{0}`")] + InvalidAddress(String), +} diff --git a/explorer/src/api/graphql/mod.rs b/explorer/src/api/graphql/mod.rs new file mode 100644 index 0000000000..ef66f641dc --- /dev/null +++ b/explorer/src/api/graphql/mod.rs @@ -0,0 +1,1042 @@ +mod certificates; +mod connections; +mod error; +mod scalars; + +use self::connections::{ + BlockConnection, InclusivePaginationInterval, PaginationArguments, PaginationInterval, + PoolConnection, TransactionConnection, TransactionNodeFetchInfo, +}; +use self::error::Error; +use crate::db::indexing::{ + BlockProducer, EpochData, ExplorerAddress, ExplorerBlock, ExplorerTransaction, StakePoolData, +}; +use crate::db::persistent_sequence::PersistentSequence; +use cardano_legacy_address::Addr as OldAddress; +use certificates::*; +use chain_impl_mockchain::certificate; + +use chain_impl_mockchain::fragment::FragmentId; +use chain_impl_mockchain::header; +use chain_impl_mockchain::header::HeaderId as HeaderHash; +use chain_impl_mockchain::key::BftLeaderId; +use futures::executor::block_on; +pub use juniper::http::GraphQLRequest; +use juniper::{graphql_union, EmptyMutation, FieldResult, RootNode}; +use std::convert::{TryFrom, TryInto}; +use std::str::FromStr; + +use self::scalars::{ + BlockCount, ChainLength, EpochNumber, ExternalProposalId, IndexCursor, NonZero, PoolId, + PublicKey, Slot, Value, VoteOptionRange, +}; + +use crate::db::DB; + +#[derive(Clone)] +pub struct GraphQLSettings { + /// This is the prefix that's used for the Address bech32 string representation in the + /// responses (in the queries any prefix can be used). base32 serialization could + /// also be used, but the `Address` struct doesn't have a deserialization method right + /// now + pub address_bech32_prefix: String, +} + +#[derive(Clone)] +pub struct Block { + hash: HeaderHash, +} + +impl Block { + fn from_string_hash(hash: String, db: &DB) -> FieldResult { + let hash = HeaderHash::from_str(&hash)?; + let block = Block { hash }; + + block.get_explorer_block(db).map(|_| block) + } + + fn from_valid_hash(hash: HeaderHash) -> Block { + Block { hash } + } + + fn get_explorer_block(&self, db: &DB) -> FieldResult { + block_on(db.get_block(&self.hash)).ok_or_else(|| { + Error::InternalError("Couldn't find block's contents in explorer".to_owned()).into() + }) + } +} + +/// A Block +#[juniper::object( + Context = Context +)] +impl Block { + /// The Block unique identifier + pub fn id(&self) -> String { + format!("{}", self.hash) + } + + /// Date the Block was included in the blockchain + pub fn date(&self, context: &Context) -> FieldResult { + self.get_explorer_block(&context.db) + .map(|b| b.date().into()) + } + + /// The transactions contained in the block + pub fn transactions( + &self, + first: Option, + last: Option, + before: Option, + after: Option, + context: &Context, + ) -> FieldResult { + let explorer_block = self.get_explorer_block(&context.db)?; + let mut transactions: Vec<&ExplorerTransaction> = + explorer_block.transactions.values().collect(); + + // TODO: This may be expensive at some point, but I can't rely in + // the HashMap's order (also, I'm assuming the order in the block matters) + transactions + .as_mut_slice() + .sort_unstable_by_key(|tx| tx.offset_in_block); + + let pagination_arguments = PaginationArguments { + first, + last, + before: before.map(u32::try_from).transpose()?, + after: after.map(u32::try_from).transpose()?, + } + .validate()?; + + let boundaries = if !transactions.is_empty() { + PaginationInterval::Inclusive(InclusivePaginationInterval { + lower_bound: 0u32, + upper_bound: transactions + .len() + .checked_sub(1) + .unwrap() + .try_into() + .expect("tried to paginate more than 2^32 elements"), + }) + } else { + PaginationInterval::Empty + }; + + TransactionConnection::new( + boundaries, + pagination_arguments, + |range: PaginationInterval| match range { + PaginationInterval::Empty => vec![], + PaginationInterval::Inclusive(range) => { + let from = usize::try_from(range.lower_bound).unwrap(); + let to = usize::try_from(range.upper_bound).unwrap(); + + (from..=to) + .map(|i| { + ( + TransactionNodeFetchInfo::Contents(transactions[i].clone()), + i.try_into().unwrap(), + ) + }) + .collect::>() + } + }, + ) + } + + pub fn previous_block(&self, context: &Context) -> FieldResult { + self.get_explorer_block(&context.db) + .map(|b| Block::from_valid_hash(b.parent_hash)) + } + + pub fn chain_length(&self, context: &Context) -> FieldResult { + self.get_explorer_block(&context.db) + .map(|block| block.chain_length().into()) + } + + pub fn leader(&self, context: &Context) -> FieldResult> { + self.get_explorer_block(&context.db) + .map(|block| match block.producer() { + BlockProducer::StakePool(pool) => { + Some(Leader::StakePool(Pool::from_valid_id(pool.clone()))) + } + BlockProducer::BftLeader(id) => { + Some(Leader::BftLeader(BftLeader { id: id.clone() })) + } + BlockProducer::None => None, + }) + } + + pub fn total_input(&self, context: &Context) -> FieldResult { + self.get_explorer_block(&context.db) + .map(|block| Value(format!("{}", block.total_input))) + } + + pub fn total_output(&self, context: &Context) -> FieldResult { + self.get_explorer_block(&context.db) + .map(|block| Value(format!("{}", block.total_output))) + } +} + +struct BftLeader { + id: BftLeaderId, +} + +#[juniper::object( + Context = Context, +)] +impl BftLeader { + fn id(&self) -> PublicKey { + self.id.as_public_key().into() + } +} + +enum Leader { + StakePool(Pool), + BftLeader(BftLeader), +} + +graphql_union!(Leader: Context |&self| { + instance_resolvers: |_| { + &Pool => match *self { Leader::StakePool(ref c) => Some(c), _ => None }, + &BftLeader => match *self { Leader::BftLeader(ref c) => Some(c), _ => None }, + } +}); + +impl From<&ExplorerBlock> for Block { + fn from(block: &ExplorerBlock) -> Block { + Block::from_valid_hash(block.id()) + } +} + +#[derive(Clone)] +struct BlockDate { + epoch: Epoch, + slot: Slot, +} + +/// Block's date, composed of an Epoch and a Slot +#[juniper::object( + Context = Context +)] +impl BlockDate { + pub fn epoch(&self) -> &Epoch { + &self.epoch + } + + pub fn slot(&self) -> &Slot { + &self.slot + } +} + +impl From for BlockDate { + fn from(date: header::BlockDate) -> BlockDate { + BlockDate { + epoch: Epoch { id: date.epoch }, + slot: Slot(format!("{}", date.slot_id)), + } + } +} + +#[derive(Clone)] +struct Transaction { + id: FragmentId, + block_hash: Option, + contents: Option, +} + +impl Transaction { + fn from_id(id: FragmentId, context: &Context) -> FieldResult { + let block_hash = block_on(context.db.find_block_hash_by_transaction(&id)) + .ok_or_else(|| Error::NotFound(format!("transaction not found: {}", &id,)))?; + + Ok(Transaction { + id, + block_hash: Some(block_hash), + contents: None, + }) + } + + fn from_valid_id(id: FragmentId) -> Transaction { + Transaction { + id, + block_hash: None, + contents: None, + } + } + + fn from_contents(contents: ExplorerTransaction) -> Transaction { + Transaction { + id: contents.id, + block_hash: None, + contents: Some(contents), + } + } + + fn get_block(&self, context: &Context) -> FieldResult { + let block_id = + match self.block_hash { + Some(block_id) => block_id, + None => block_on(context.db.find_block_hash_by_transaction(&self.id)).ok_or_else( + || Error::InternalError("Transaction's block was not found".to_owned()), + )?, + }; + + block_on(context.db.get_block(&block_id)).ok_or_else(|| { + Error::InternalError( + "transaction is in explorer but couldn't find its block".to_owned(), + ) + .into() + }) + } + + fn get_contents(&self, context: &Context) -> FieldResult { + if let Some(c) = &self.contents { + Ok(c.clone()) + } else { + let block = self.get_block(context)?; + Ok(block + .transactions + .get(&self.id) + .ok_or_else(|| { + Error::InternalError("transaction was not found in respective block".to_owned()) + })? + .clone()) + } + } +} + +/// A transaction in the blockchain +#[juniper::object( + Context = Context +)] +impl Transaction { + /// The hash that identifies the transaction + pub fn id(&self) -> String { + format!("{}", self.id) + } + + /// The block this transaction is in + pub fn block(&self, context: &Context) -> FieldResult { + let block = self.get_block(context)?; + Ok(Block::from(&block)) + } + + pub fn inputs(&self, context: &Context) -> FieldResult> { + let transaction = self.get_contents(context)?; + Ok(transaction + .inputs() + .iter() + .map(|input| TransactionInput { + address: Address::from(&input.address), + amount: Value::from(&input.value), + }) + .collect()) + } + + pub fn outputs(&self, context: &Context) -> FieldResult> { + let transaction = self.get_contents(context)?; + Ok(transaction + .outputs() + .iter() + .map(|input| TransactionOutput { + address: Address::from(&input.address), + amount: Value::from(&input.value), + }) + .collect()) + } + + pub fn certificate(&self, context: &Context) -> FieldResult> { + let transaction = self.get_contents(context)?; + match transaction.certificate { + Some(c) => Ok(Certificate::try_from(c).map(Some)?), + None => Ok(None), + } + } +} + +struct TransactionInput { + amount: Value, + address: Address, +} + +#[juniper::object( + Context = Context +)] +impl TransactionInput { + fn amount(&self) -> &Value { + &self.amount + } + + fn address(&self) -> &Address { + &self.address + } +} + +struct TransactionOutput { + amount: Value, + address: Address, +} + +#[juniper::object( + Context = Context +)] +impl TransactionOutput { + fn amount(&self) -> &Value { + &self.amount + } + + fn address(&self) -> &Address { + &self.address + } +} + +#[derive(Clone)] +struct Address { + id: ExplorerAddress, +} + +impl Address { + fn from_bech32(bech32: &str) -> FieldResult
{ + let addr = chain_addr::AddressReadable::from_string_anyprefix(bech32) + .map(|adr| ExplorerAddress::New(adr.to_address())) + .or_else(|_| OldAddress::from_str(bech32).map(ExplorerAddress::Old)) + .map_err(|_| Error::InvalidAddress(bech32.to_string()))?; + + Ok(Address { id: addr }) + } +} + +impl From<&ExplorerAddress> for Address { + fn from(addr: &ExplorerAddress) -> Address { + Address { id: addr.clone() } + } +} + +#[juniper::object( + Context = Context +)] +impl Address { + /// The base32 representation of an address + fn id(&self, context: &Context) -> String { + match &self.id { + ExplorerAddress::New(addr) => chain_addr::AddressReadable::from_address( + &context.settings.address_bech32_prefix, + addr, + ) + .to_string(), + ExplorerAddress::Old(addr) => format!("{}", addr), + } + } + + fn delegation() -> FieldResult { + Err(Error::Unimplemented("address delegation".to_owned()).into()) + } + + fn transactions( + &self, + first: Option, + last: Option, + before: Option, + after: Option, + context: &Context, + ) -> FieldResult { + let transactions = block_on(context.db.get_transactions_by_address(&self.id)) + .unwrap_or_else(PersistentSequence::::new); + + let boundaries = if transactions.len() > 0 { + PaginationInterval::Inclusive(InclusivePaginationInterval { + lower_bound: 0u64, + upper_bound: transactions.len(), + }) + } else { + PaginationInterval::Empty + }; + + let pagination_arguments = PaginationArguments { + first, + last, + before: before.map(u64::from), + after: after.map(u64::from), + } + .validate()?; + + TransactionConnection::new( + boundaries, + pagination_arguments, + |range: PaginationInterval| match range { + PaginationInterval::Empty => vec![], + PaginationInterval::Inclusive(range) => (range.lower_bound..=range.upper_bound) + .filter_map(|i| { + transactions + .get(i) + .map(|h| (TransactionNodeFetchInfo::Id(*h.as_ref()), i)) + }) + .collect(), + }, + ) + } +} + +struct TaxType(chain_impl_mockchain::rewards::TaxType); + +#[juniper::object( + Context = Context, +)] +impl TaxType { + /// what get subtracted as fixed value + pub fn fixed(&self) -> Value { + Value(format!("{}", self.0.fixed)) + } + /// Ratio of tax after fixed amout subtracted + pub fn ratio(&self) -> Ratio { + Ratio(self.0.ratio) + } + + /// Max limit of tax + pub fn max_limit(&self) -> Option { + self.0.max_limit.map(|n| NonZero(format!("{}", n))) + } +} + +struct Ratio(chain_impl_mockchain::rewards::Ratio); + +#[juniper::object( + Context = Context, +)] +impl Ratio { + pub fn numerator(&self) -> Value { + Value(format!("{}", self.0.numerator)) + } + + pub fn denominator(&self) -> NonZero { + NonZero(format!("{}", self.0.denominator)) + } +} + +pub struct Proposal(certificate::Proposal); + +#[juniper::object( + Context = Context, +)] +impl Proposal { + pub fn external_id(&self) -> ExternalProposalId { + ExternalProposalId(self.0.external_id().to_string()) + } + + /// get the vote options range + /// + /// this is the available range of choices to make for the given + /// proposal. all casted votes for this proposals ought to be in + /// within the given range + pub fn options(&self) -> VoteOptionRange { + self.0.options().clone().into() + } +} + +#[derive(Clone)] +pub struct Pool { + id: certificate::PoolId, + data: Option, + blocks: Option>, +} + +impl Pool { + fn from_string_id(id: &str, db: &DB) -> FieldResult { + let id = certificate::PoolId::from_str(&id)?; + let blocks = block_on(db.get_stake_pool_blocks(&id)) + .ok_or_else(|| Error::NotFound("Stake pool not found".to_owned()))?; + + let data = block_on(db.get_stake_pool_data(&id)) + .ok_or_else(|| Error::NotFound("Stake pool not found".to_owned()))?; + + Ok(Pool { + id, + data: Some(data), + blocks: Some(blocks), + }) + } + + fn from_valid_id(id: certificate::PoolId) -> Pool { + Pool { + id, + blocks: None, + data: None, + } + } + + fn new_with_data(id: certificate::PoolId, data: StakePoolData) -> Self { + Pool { + id, + blocks: None, + data: Some(data), + } + } +} + +#[juniper::object( + Context = Context +)] +impl Pool { + pub fn id(&self) -> PoolId { + PoolId(format!("{}", &self.id)) + } + + pub fn blocks( + &self, + first: Option, + last: Option, + before: Option, + after: Option, + context: &Context, + ) -> FieldResult { + let blocks = match &self.blocks { + Some(b) => b.clone(), + None => block_on(context.db.get_stake_pool_blocks(&self.id)).ok_or_else(|| { + Error::InternalError("Stake pool in block is not indexed".to_owned()) + })?, + }; + + let bounds = if blocks.len() > 0 { + PaginationInterval::Inclusive(InclusivePaginationInterval { + lower_bound: 0u32, + upper_bound: blocks + .len() + .checked_sub(1) + .unwrap() + .try_into() + .expect("Tried to paginate more than 2^32 blocks"), + }) + } else { + PaginationInterval::Empty + }; + + let pagination_arguments = PaginationArguments { + first, + last, + before: before.map(u32::try_from).transpose()?, + after: after.map(u32::try_from).transpose()?, + } + .validate()?; + + BlockConnection::new(bounds, pagination_arguments, |range| match range { + PaginationInterval::Empty => vec![], + PaginationInterval::Inclusive(range) => (range.lower_bound..=range.upper_bound) + .filter_map(|i| blocks.get(i).map(|h| (*h.as_ref(), i))) + .collect(), + }) + } + + pub fn registration(&self, context: &Context) -> FieldResult { + match &self.data { + Some(data) => Ok(data.registration.clone().into()), + None => block_on(context.db.get_stake_pool_data(&self.id)) + .map(|data| PoolRegistration::from(data.registration)) + .ok_or_else(|| Error::NotFound("Stake pool not found".to_owned()).into()), + } + } + + pub fn retirement(&self, context: &Context) -> FieldResult> { + match &self.data { + Some(data) => Ok(data.retirement.clone().map(PoolRetirement::from)), + None => Ok(block_on(async { + context + .db + .get_stake_pool_data(&self.id) + .await + .map(|data| data.retirement) + .and_then(|retirement| retirement.map(PoolRetirement::from)) + })), + } + } +} + +struct Status {} + +#[juniper::object( + Context = Context +)] +impl Status { + pub fn latest_block(&self, context: &Context) -> FieldResult { + latest_block(context).map(|b| Block::from(&b)) + } + + pub fn fee_settings(&self, context: &Context) -> FeeSettings { + let chain_impl_mockchain::fee::LinearFee { + constant, + coefficient, + certificate, + per_certificate_fees, + per_vote_certificate_fees, + } = context.db.blockchain_config.fees; + + FeeSettings { + constant: Value(format!("{}", constant)), + coefficient: Value(format!("{}", coefficient)), + certificate: Value(format!("{}", certificate)), + certificate_pool_registration: Value(format!( + "{}", + per_certificate_fees + .certificate_pool_registration + .map(|v| v.get()) + .unwrap_or(certificate) + )), + certificate_stake_delegation: Value(format!( + "{}", + per_certificate_fees + .certificate_stake_delegation + .map(|v| v.get()) + .unwrap_or(certificate) + )), + certificate_owner_stake_delegation: Value(format!( + "{}", + per_certificate_fees + .certificate_owner_stake_delegation + .map(|v| v.get()) + .unwrap_or(certificate) + )), + certificate_vote_plan: Value(format!( + "{}", + per_vote_certificate_fees + .certificate_vote_plan + .map(|v| v.get()) + .unwrap_or(certificate) + )), + certificate_vote_cast: Value(format!( + "{}", + per_vote_certificate_fees + .certificate_vote_cast + .map(|v| v.get()) + .unwrap_or(certificate) + )), + } + } +} + +struct Treasury { + rewards: Value, + treasury: Value, + treasury_tax: TaxType, +} + +#[juniper::object( + Context = Context +)] +impl Treasury { + fn rewards(&self) -> &Value { + &self.rewards + } + + fn treasury(&self) -> &Value { + &self.treasury + } + + fn treasury_tax(&self) -> &TaxType { + &self.treasury_tax + } +} + +#[derive(juniper::GraphQLObject)] +struct FeeSettings { + constant: Value, + coefficient: Value, + certificate: Value, + certificate_pool_registration: Value, + certificate_stake_delegation: Value, + certificate_owner_stake_delegation: Value, + certificate_vote_plan: Value, + certificate_vote_cast: Value, +} + +#[derive(Clone)] +struct Epoch { + id: header::Epoch, +} + +impl Epoch { + fn from_epoch_number(id: EpochNumber) -> FieldResult { + Ok(Epoch { id: id.try_into()? }) + } + + fn get_epoch_data(&self, db: &DB) -> Option { + block_on(db.get_epoch(self.id)) + } +} + +#[juniper::object( + Context = Context +)] +impl Epoch { + pub fn id(&self) -> EpochNumber { + self.id.into() + } + + /// Get a paginated view of all the blocks in this epoch + pub fn blocks( + &self, + first: Option, + last: Option, + before: Option, + after: Option, + context: &Context, + ) -> FieldResult> { + let epoch_data = match self.get_epoch_data(&context.db) { + Some(epoch_data) => epoch_data, + None => return Ok(None), + }; + + let epoch_lower_bound = block_on(async { + context + .db + .get_block(&epoch_data.first_block) + .await + .map(|block| u32::from(block.chain_length)) + }) + .expect("Epoch lower bound"); + + let epoch_upper_bound = block_on(async { + context + .db + .get_block(&epoch_data.last_block) + .await + .map(|block| u32::from(block.chain_length)) + }) + .expect("Epoch upper bound"); + + let boundaries = PaginationInterval::Inclusive(InclusivePaginationInterval { + lower_bound: 0, + upper_bound: epoch_upper_bound + .checked_sub(epoch_lower_bound) + .expect("pagination upper_bound to be greater or equal than lower_bound"), + }); + + let pagination_arguments = PaginationArguments { + first, + last, + before: before.map(u32::try_from).transpose()?, + after: after.map(u32::try_from).transpose()?, + } + .validate()?; + + BlockConnection::new(boundaries, pagination_arguments, |range| match range { + PaginationInterval::Empty => unreachable!("No blocks found (not even genesis)"), + PaginationInterval::Inclusive(range) => block_on(context.db.get_block_hash_range( + (range.lower_bound + epoch_lower_bound).into(), + (range.upper_bound + epoch_lower_bound + 1).into(), + )) + .iter() + .map(|(hash, index)| (*hash, u32::from(*index) - epoch_lower_bound)) + .collect(), + }) + .map(Some) + } + + pub fn first_block(&self, context: &Context) -> Option { + self.get_epoch_data(&context.db) + .map(|data| Block::from_valid_hash(data.first_block)) + } + + pub fn last_block(&self, context: &Context) -> Option { + self.get_epoch_data(&context.db) + .map(|data| Block::from_valid_hash(data.last_block)) + } + + pub fn total_blocks(&self, context: &Context) -> BlockCount { + self.get_epoch_data(&context.db) + .map_or(0u32.into(), |data| data.total_blocks.into()) + } +} + +struct StakeDistribution { + pools: Vec, +} + +#[juniper::object( + Context = Context, +)] +impl StakeDistribution { + pub fn pools(&self) -> &Vec { + &self.pools + } +} + +struct PoolStakeDistribution { + pool: Pool, + delegated_stake: Value, +} + +#[juniper::object( + Context = Context, +)] +impl PoolStakeDistribution { + pub fn pool(&self) -> &Pool { + &self.pool + } + + pub fn delegated_stake(&self) -> &Value { + &self.delegated_stake + } +} + +pub struct Query; + +#[juniper::object( + Context = Context, +)] +impl Query { + fn block(id: String, context: &Context) -> FieldResult { + Block::from_string_hash(id, &context.db) + } + + fn block_by_chain_length(length: ChainLength, context: &Context) -> FieldResult> { + Ok( + block_on(context.db.find_block_by_chain_length(length.try_into()?)) + .map(Block::from_valid_hash), + ) + } + + /// query all the blocks in a paginated view + fn all_blocks( + &self, + first: Option, + last: Option, + before: Option, + after: Option, + context: &Context, + ) -> FieldResult { + let longest_chain = latest_block(context)?.chain_length; + + let block0 = 0u32; + + let boundaries = PaginationInterval::Inclusive(InclusivePaginationInterval { + lower_bound: block0, + upper_bound: u32::from(longest_chain), + }); + + let pagination_arguments = PaginationArguments { + first, + last, + before: before.map(u32::try_from).transpose()?, + after: after.map(u32::try_from).transpose()?, + } + .validate()?; + + BlockConnection::new(boundaries, pagination_arguments, |range| match range { + PaginationInterval::Empty => vec![], + PaginationInterval::Inclusive(range) => { + let a = range.lower_bound.into(); + let b = range.upper_bound.checked_add(1).unwrap().into(); + block_on(context.db.get_block_hash_range(a, b)) + .iter_mut() + .map(|(hash, chain_length)| (*hash, u32::from(*chain_length))) + .collect() + } + }) + } + + fn transaction(id: String, context: &Context) -> FieldResult { + let id = FragmentId::from_str(&id)?; + + Transaction::from_id(id, context) + } + + fn epoch(id: EpochNumber, context: &Context) -> FieldResult { + Epoch::from_epoch_number(id) + } + + fn address(bech32: String, context: &Context) -> FieldResult
{ + Address::from_bech32(&bech32) + } + + pub fn stake_pool(id: PoolId, context: &Context) -> FieldResult { + Pool::from_string_id(&id.0, &context.db) + } + + pub fn all_stake_pools( + &self, + first: Option, + last: Option, + before: Option, + after: Option, + context: &Context, + ) -> FieldResult { + let mut stake_pools = block_on(context.db.get_stake_pools()); + + // Although it's probably not a big performance concern + // There are a few alternatives to not have to sort this + // - A separate data structure can be used to track InsertionOrder -> PoolId + // (or any other order) + // - Find some way to rely in the Hamt iterator order (but I think this is probably not a good idea) + stake_pools.sort_unstable_by_key(|(id, data)| id.clone()); + + let boundaries = if !stake_pools.is_empty() { + PaginationInterval::Inclusive(InclusivePaginationInterval { + lower_bound: 0u32, + upper_bound: stake_pools + .len() + .checked_sub(1) + .unwrap() + .try_into() + .expect("tried to paginate more than 2^32 elements"), + }) + } else { + PaginationInterval::Empty + }; + + let pagination_arguments = PaginationArguments { + first, + last, + before: before.map(u32::try_from).transpose()?, + after: after.map(u32::try_from).transpose()?, + } + .validate()?; + + PoolConnection::new(boundaries, pagination_arguments, |range| match range { + PaginationInterval::Empty => vec![], + PaginationInterval::Inclusive(range) => { + let from = range.lower_bound; + let to = range.upper_bound; + + (from..=to) + .map(|i: u32| { + let (pool_id, stake_pool_data) = &stake_pools[usize::try_from(i).unwrap()]; + ( + Pool::new_with_data( + certificate::PoolId::clone(pool_id), + StakePoolData::clone(stake_pool_data), + ), + i, + ) + }) + .collect::>() + } + }) + } + + pub fn status() -> FieldResult { + Ok(Status {}) + } +} + +pub struct Context { + pub db: DB, + pub settings: GraphQLSettings, +} + +impl juniper::Context for Context {} + +pub type Schema = RootNode<'static, Query, EmptyMutation>; + +pub fn create_schema() -> Schema { + Schema::new(Query {}, EmptyMutation::new()) +} + +fn latest_block(context: &Context) -> FieldResult { + block_on(async { + let hash = context.db.get_latest_block_hash().await; + context.db.get_block(&hash).await + }) + .ok_or_else(|| Error::InternalError("tip is not in explorer".to_owned())) + .map_err(Into::into) +} diff --git a/explorer/src/api/graphql/scalars.rs b/explorer/src/api/graphql/scalars.rs new file mode 100644 index 0000000000..926604ddac --- /dev/null +++ b/explorer/src/api/graphql/scalars.rs @@ -0,0 +1,267 @@ +use super::error::Error; +use chain_crypto::bech32::Bech32; +use chain_impl_mockchain::{header, value, vote}; +use juniper::{ParseScalarResult, ParseScalarValue}; +use std::convert::{TryFrom, TryInto}; + +#[derive(Clone, juniper::GraphQLScalarValue)] +pub struct Slot(pub String); + +#[derive(juniper::GraphQLScalarValue)] +/// Custom scalar type that represents a block's position in the blockchain. +/// It's a either 0 (the genesis block) or a positive number in string representation. +pub struct ChainLength(pub String); + +#[derive(juniper::GraphQLScalarValue)] +pub struct PoolId(pub String); + +#[derive(juniper::GraphQLScalarValue)] +pub struct Value(pub String); + +#[derive(juniper::GraphQLScalarValue)] +pub struct EpochNumber(pub String); + +#[derive(juniper::GraphQLScalarValue)] +pub struct BlockCount(pub String); + +#[derive(juniper::GraphQLScalarValue)] +pub struct TransactionCount(pub String); + +#[derive(juniper::GraphQLScalarValue)] +pub struct PoolCount(pub String); + +#[derive(juniper::GraphQLScalarValue)] +pub struct PublicKey(pub String); + +#[derive(juniper::GraphQLScalarValue)] +pub struct TimeOffsetSeconds(pub String); + +#[derive(juniper::GraphQLScalarValue)] +pub struct NonZero(pub String); + +#[derive(Clone, juniper::GraphQLScalarValue)] +pub struct VotePlanId(pub String); + +#[derive(Clone, juniper::GraphQLScalarValue)] +pub struct ExternalProposalId(pub String); + +#[derive(Clone, juniper::GraphQLEnum)] +pub enum PayloadType { + Public, +} + +#[derive(Clone, juniper::GraphQLScalarValue)] +pub struct Weight(pub String); + +#[derive(juniper::GraphQLScalarValue)] +pub struct VotePlanCount(pub String); + +/// Vote option range +/// +/// provide a range of available choices for a given proposal. Usual value would +/// be `[0, 3[` (or `0..3` in rust's range syntax), meaning there are 3 options +/// available: `0`, `1` and `2` +#[derive(Clone, juniper::GraphQLObject)] +pub struct VoteOptionRange { + /// the start of the vote option range, starting from 0 usually + start: i32, + /// the exclusive upper bound of the option range. minimal value is 1 + end: i32, +} + +// u32 should be enough to count blocks and transactions (the only two cases for now) +#[derive(Clone)] +pub struct IndexCursor(pub u64); + +juniper::graphql_scalar!(IndexCursor where Scalar = { + description: "Non-opaque cursor that can be used for offset-based pagination" + + resolve(&self) -> Value { + juniper::Value::scalar(self.0.to_string()) + } + + from_input_value(v: &InputValue) -> Option { + v.as_scalar_value::() + .and_then(|s| s.parse::().ok()) + .map(IndexCursor) + } + + from_str<'a>(value: ScalarToken<'a>) -> ParseScalarResult<'a, S> { + >::from_str(value) + } +}); + +/*------------------------------*/ +/*------- Conversions ---------*/ +/*----------------------------*/ + +impl From for ChainLength { + fn from(length: header::ChainLength) -> ChainLength { + ChainLength(u32::from(length).to_string()) + } +} + +impl TryFrom for header::ChainLength { + type Error = std::num::ParseIntError; + fn try_from( + length: ChainLength, + ) -> Result { + length.0.parse::().map(header::ChainLength::from) + } +} + +impl From<&value::Value> for Value { + fn from(v: &value::Value) -> Value { + Value(format!("{}", v)) + } +} + +impl From for Value { + fn from(v: value::Value) -> Value { + (&v).into() + } +} + +impl From for EpochNumber { + fn from(e: header::Epoch) -> EpochNumber { + EpochNumber(format!("{}", e)) + } +} + +impl TryFrom for header::Epoch { + type Error = std::num::ParseIntError; + fn try_from(e: EpochNumber) -> Result { + e.0.parse::() + } +} + +impl From for BlockCount { + fn from(number: u64) -> BlockCount { + BlockCount(format!("{}", number)) + } +} + +impl From for BlockCount { + fn from(number: u32) -> BlockCount { + BlockCount(format!("{}", number)) + } +} + +impl From<&chain_crypto::PublicKey> for PublicKey { + fn from(pk: &chain_crypto::PublicKey) -> PublicKey { + PublicKey(pk.to_bech32_str()) + } +} + +impl From for TimeOffsetSeconds { + fn from(time: chain_time::TimeOffsetSeconds) -> TimeOffsetSeconds { + TimeOffsetSeconds(format!("{}", u64::from(time))) + } +} + +impl From for TransactionCount { + fn from(n: u64) -> TransactionCount { + TransactionCount(format!("{}", n)) + } +} + +impl From for PoolCount { + fn from(n: u64) -> PoolCount { + PoolCount(format!("{}", n)) + } +} + +impl From for IndexCursor { + fn from(number: u32) -> IndexCursor { + IndexCursor(number.into()) + } +} + +impl From for VotePlanId { + fn from(id: chain_impl_mockchain::certificate::VotePlanId) -> VotePlanId { + VotePlanId(id.to_string()) + } +} + +impl From for PayloadType { + fn from(payload_type: vote::PayloadType) -> Self { + match payload_type { + vote::PayloadType::Public => Self::Public, + } + } +} + +impl From for VoteOptionRange { + fn from(options: vote::Options) -> Self { + let range = options.choice_range(); + Self { + start: range.start as i32, + end: range.end as i32, + } + } +} + +impl TryFrom for u32 { + type Error = Error; + fn try_from(c: IndexCursor) -> Result { + c.0.try_into().map_err(|_| { + Error::InvalidCursor( + "block's pagination cursor is greater than maximum 2^32".to_owned(), + ) + }) + } +} + +impl From for u64 { + fn from(number: IndexCursor) -> u64 { + number.0 + } +} + +impl From for IndexCursor { + fn from(number: u64) -> IndexCursor { + IndexCursor(number) + } +} + +impl From for IndexCursor { + fn from(length: header::ChainLength) -> IndexCursor { + IndexCursor(u32::from(length).into()) + } +} + +impl TryFrom for header::ChainLength { + type Error = Error; + fn try_from(c: IndexCursor) -> Result { + let inner: u32 = c.0.try_into().map_err(|_| { + Error::InvalidCursor( + "block's pagination cursor is greater than maximum ChainLength".to_owned(), + ) + })?; + Ok(header::ChainLength::from(inner)) + } +} + +impl From for ExternalProposalId { + fn from(id: chain_impl_mockchain::certificate::ExternalProposalId) -> Self { + ExternalProposalId(id.to_string()) + } +} + +impl From for Weight { + fn from(w: vote::Weight) -> Self { + Self(format!("{}", w)) + } +} + +impl From for VotePlanCount { + fn from(number: u64) -> VotePlanCount { + VotePlanCount(format!("{}", number)) + } +} + +impl From for VotePlanCount { + fn from(number: u32) -> VotePlanCount { + VotePlanCount(format!("{}", number)) + } +} diff --git a/explorer/src/api/handlers.rs b/explorer/src/api/handlers.rs new file mode 100644 index 0000000000..0350b64206 --- /dev/null +++ b/explorer/src/api/handlers.rs @@ -0,0 +1,35 @@ +use juniper::http::GraphQLRequest; +use thiserror::Error; +use tokio::task::{spawn_blocking, JoinError}; +use warp::{reject::Reject, Rejection, Reply}; + +#[derive(Debug, Error)] +pub enum Error { + #[error(transparent)] + BlockingError(#[from] JoinError), +} + +impl Reject for Error {} + +pub async fn graphiql() -> Result { + let html = juniper::http::graphiql::graphiql_source("/graphql"); + Ok(warp::reply::html(html)) +} + +pub async fn graphql( + data: GraphQLRequest, + db: crate::db::DB, + schema: super::graphql::Schema, + settings: super::graphql::GraphQLSettings, +) -> Result { + let context = super::graphql::Context { db, settings }; + + // Run the query in a threadpool, as Juniper is synchronous + spawn_blocking(move || { + let response = data.execute(&schema, &context); + Ok(warp::reply::json(&response)) + }) + .await + .map_err(Error::BlockingError) + .map_err(warp::reject::custom)? +} diff --git a/explorer/src/api/mod.rs b/explorer/src/api/mod.rs new file mode 100644 index 0000000000..89ecf9576b --- /dev/null +++ b/explorer/src/api/mod.rs @@ -0,0 +1,116 @@ +pub mod graphql; +mod handlers; + +use futures::Future; +use jormungandr_lib::interfaces::{Cors, Tls}; +use std::{net::SocketAddr, time::Duration}; +use warp::{http::StatusCode, Filter, Rejection, Reply}; + +pub fn filter( + db: crate::db::DB, + settings: crate::GraphQLSettings, +) -> impl Filter + Clone { + let with_db = warp::any().map(move || db.clone()); + let with_schema = warp::any().map(graphql::create_schema); + let with_settings = warp::any().map(move || settings.clone()); + + let graphql = warp::path!("graphql") + .and(warp::post()) + .and(warp::body::json()) + .and(with_db) + .and(with_schema) + .and(with_settings) + .and_then(handlers::graphql) + .boxed(); + + let graphiql = warp::path!("graphiql") + .and(warp::get()) + .and_then(handlers::graphiql) + .boxed(); + + graphql.or(graphiql).recover(handle_rejection).boxed() +} + +/// Convert rejections to actual HTTP errors +async fn handle_rejection(err: Rejection) -> Result { + if let Some(err) = err.find::() { + let (body, code) = ( + display_internal_server_error(err), + StatusCode::INTERNAL_SERVER_ERROR, + ); + + return Ok(warp::reply::with_status(body, code)); + } + + Err(err) +} + +fn display_internal_server_error(err: &impl std::error::Error) -> String { + use std::fmt::{self, Write}; + + fn error_to_body(err: &impl std::error::Error) -> Result { + let mut reply_body = String::new(); + writeln!(reply_body, "Internal server error: {}", err)?; + let mut source = err.source(); + while let Some(err) = source { + writeln!(reply_body, "-> {}", err)?; + source = err.source(); + } + Ok(reply_body) + } + + error_to_body(err).unwrap_or_else(|err| format!("failed to process internal error: {}", err)) +} + +pub async fn setup_cors( + api: API, + listen_addr: SocketAddr, + tls_config: Option, + cors_config: Option, + shutdown_signal: impl Future + Send + 'static, +) where + API: Filter + Clone + Send + Sync + 'static, + API::Extract: warp::Reply, +{ + match cors_config { + Some(config) => { + let allowed_origins: Vec<&str> = + config.allowed_origins.iter().map(AsRef::as_ref).collect(); + + let mut cors = warp::cors().allow_origins(allowed_origins); + + if let Some(max_age) = config.max_age_secs { + cors = cors.max_age(Duration::from_secs(max_age)); + } + + serve(api.with(cors), listen_addr, tls_config, shutdown_signal).await; + } + None => serve(api, listen_addr, tls_config, shutdown_signal).await, + } +} + +async fn serve( + api: API, + listen_addr: SocketAddr, + tls_config: Option, + shutdown_signal: impl Future + Send + 'static, +) where + API: Filter + Clone + Send + Sync + 'static, + API::Extract: warp::Reply, +{ + let server = warp::serve(api); + match tls_config { + Some(tls_config) => { + let (_, server_fut) = server + .tls() + .cert_path(tls_config.cert_file) + .key_path(tls_config.priv_key_file) + .bind_with_graceful_shutdown(listen_addr, shutdown_signal); + server_fut.await; + } + None => { + let (_, server_fut) = server.bind_with_graceful_shutdown(listen_addr, shutdown_signal); + server_fut.await; + } + } +} diff --git a/explorer/src/db/error.rs b/explorer/src/db/error.rs new file mode 100644 index 0000000000..089557af17 --- /dev/null +++ b/explorer/src/db/error.rs @@ -0,0 +1,17 @@ +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum Error { + #[error("block `{0}` cannot be found in the explorer")] + BlockNotFound(String), + #[error("ancestor of block `{0}` cannot be found in the explorer")] + AncestorNotFound(String), + #[error("transaction `{0}` is already indexed")] + TransactionAlreadyExists(String), + #[error("block `{0}` is already indexed")] + BlockAlreadyExists(String), + #[error("chain length: `{0}` is already indexed")] + ChainLengthBlockAlreadyExists(u32), + #[error("bootstrap error: `{0}`")] + BootstrapError(String), +} diff --git a/explorer/src/db/indexing.rs b/explorer/src/db/indexing.rs new file mode 100644 index 0000000000..25edb5d5dd --- /dev/null +++ b/explorer/src/db/indexing.rs @@ -0,0 +1,406 @@ +use super::persistent_sequence::PersistentSequence; +use cardano_legacy_address::Addr as OldAddress; +use chain_addr::{Address, Discrimination}; +use chain_core::property::{Block as _, Fragment as _}; +use chain_impl_mockchain::{ + block::{Block, Proof}, + certificate::{Certificate, PoolId, PoolRegistration, PoolRetirement}, + fragment::{Fragment, FragmentId}, + header::BlockDate, + header::ChainLength, + header::Epoch, + header::HeaderId as HeaderHash, + key::BftLeaderId, + transaction::{InputEnum, TransactionSlice, Witness}, + value::Value, +}; +use std::{ + collections::{hash_map::DefaultHasher, HashMap}, + convert::TryInto, + sync::Arc, +}; + +pub type Hamt = imhamt::Hamt>; + +pub type Transactions = Hamt; +pub type Blocks = Hamt; +pub type ChainLengths = Hamt; + +pub type Addresses = Hamt>; +pub type Epochs = Hamt; + +pub type StakePoolBlocks = Hamt>; +pub type StakePool = Hamt; + +#[derive(Clone)] +pub struct StakePoolData { + pub registration: PoolRegistration, + pub retirement: Option, + // TODO: Track updates here too? +} + +/// Block with unified inputs the metadata needed in the queries +#[derive(Clone)] +pub struct ExplorerBlock { + /// The HashMap allows for easy search when querying transactions by id + pub transactions: HashMap, + pub id: HeaderHash, + pub date: BlockDate, + pub chain_length: ChainLength, + pub parent_hash: HeaderHash, + pub producer: BlockProducer, + pub total_input: Value, + pub total_output: Value, +} + +#[derive(Clone)] +pub enum BlockProducer { + None, + StakePool(PoolId), + BftLeader(BftLeaderId), +} + +#[derive(Clone)] +pub struct ExplorerTransaction { + pub id: FragmentId, + pub inputs: Vec, + pub outputs: Vec, + pub certificate: Option, + pub offset_in_block: u32, +} + +/// Unified Input representation for utxo and account inputs as used in the graphql API +#[derive(Clone)] +pub struct ExplorerInput { + pub address: ExplorerAddress, + pub value: Value, +} + +#[derive(Clone)] +pub struct ExplorerOutput { + pub address: ExplorerAddress, + pub value: Value, +} + +#[derive(Clone)] +pub struct EpochData { + pub first_block: HeaderHash, + pub last_block: HeaderHash, + pub total_blocks: u32, +} + +#[derive(Eq, PartialEq, Clone, Hash)] +pub enum ExplorerAddress { + New(Address), + Old(OldAddress), +} + +pub struct ExplorerBlockBuildingContext<'a> { + pub discrimination: Discrimination, + pub prev_transactions: &'a Transactions, + pub prev_blocks: &'a Blocks, +} + +impl ExplorerBlock { + /// Map the given `Block` to the `ExplorerBlock`, transforming all the transactions + /// using the previous state to transform the utxo inputs to the form (Address, Amount) + /// and mapping the account inputs to addresses with the given discrimination + /// This function relies on the given block to be validated previously, and will panic + /// otherwise + pub fn resolve_from<'a>( + block: &Block, + context: ExplorerBlockBuildingContext<'a>, + ) -> ExplorerBlock { + let fragments = block.contents.iter(); + let id = block.id(); + let chain_length = block.chain_length(); + + let transactions: HashMap = fragments.enumerate().fold( + HashMap::::new(), + |mut current_block_txs, (offset, fragment)| { + let fragment_id = fragment.id(); + let offset: u32 = offset.try_into().unwrap(); + let metx = match fragment { + Fragment::Transaction(tx) => { + let tx = tx.as_slice(); + Some(ExplorerTransaction::from( + &context, + &fragment_id, + &tx, + None, + offset, + ¤t_block_txs, + )) + } + Fragment::OwnerStakeDelegation(tx) => { + let tx = tx.as_slice(); + Some(ExplorerTransaction::from( + &context, + &fragment_id, + &tx, + Some(Certificate::OwnerStakeDelegation( + tx.payload().into_payload(), + )), + offset, + ¤t_block_txs, + )) + } + Fragment::StakeDelegation(tx) => { + let tx = tx.as_slice(); + Some(ExplorerTransaction::from( + &context, + &fragment_id, + &tx, + Some(Certificate::StakeDelegation(tx.payload().into_payload())), + offset, + ¤t_block_txs, + )) + } + Fragment::PoolRegistration(tx) => { + let tx = tx.as_slice(); + Some(ExplorerTransaction::from( + &context, + &fragment_id, + &tx, + Some(Certificate::PoolRegistration(tx.payload().into_payload())), + offset, + ¤t_block_txs, + )) + } + Fragment::PoolRetirement(tx) => { + let tx = tx.as_slice(); + Some(ExplorerTransaction::from( + &context, + &fragment_id, + &tx, + Some(Certificate::PoolRetirement(tx.payload().into_payload())), + offset, + ¤t_block_txs, + )) + } + Fragment::PoolUpdate(tx) => { + let tx = tx.as_slice(); + Some(ExplorerTransaction::from( + &context, + &fragment_id, + &tx, + Some(Certificate::PoolUpdate(tx.payload().into_payload())), + offset, + ¤t_block_txs, + )) + } + Fragment::VotePlan(tx) => { + let tx = tx.as_slice(); + Some(ExplorerTransaction::from( + &context, + &fragment_id, + &tx, + Some(Certificate::VotePlan(tx.payload().into_payload())), + offset, + ¤t_block_txs, + )) + } + Fragment::VoteCast(tx) => { + let tx = tx.as_slice(); + Some(ExplorerTransaction::from( + &context, + &fragment_id, + &tx, + Some(Certificate::VoteCast(tx.payload().into_payload())), + offset, + ¤t_block_txs, + )) + } + Fragment::VoteTally(tx) => { + let tx = tx.as_slice(); + Some(ExplorerTransaction::from( + &context, + &fragment_id, + &tx, + Some(Certificate::VoteTally(tx.payload().into_payload())), + offset, + ¤t_block_txs, + )) + } + Fragment::OldUtxoDeclaration(decl) => { + let outputs = decl + .addrs + .iter() + .map(|(old_address, value)| ExplorerOutput { + address: ExplorerAddress::Old(old_address.clone()), + value: *value, + }) + .collect(); + Some(ExplorerTransaction { + id: fragment_id, + inputs: vec![], + outputs, + certificate: None, + offset_in_block: offset, + }) + } + _ => None, + }; + + if let Some(etx) = metx { + current_block_txs.insert(fragment_id, etx); + } + current_block_txs + }, + ); + + let producer = match block.header.proof() { + Proof::GenesisPraos(_proof) => { + // Unwrap is safe in this pattern match + BlockProducer::StakePool(block.header.get_stakepool_id().unwrap()) + } + Proof::Bft(_proof) => { + BlockProducer::BftLeader(block.header.get_bft_leader_id().unwrap()) + } + Proof::None => BlockProducer::None, + }; + + let total_input = Value::sum( + transactions + .values() + .flat_map(|tx| tx.inputs.iter().map(|i| i.value)), + ) + .expect("Couldn't compute block's total input"); + + let total_output = Value::sum( + transactions + .values() + .flat_map(|tx| tx.outputs.iter().map(|o| o.value)), + ) + .expect("Couldn't compute block's total output"); + + ExplorerBlock { + id, + transactions, + chain_length, + date: block.header.block_date(), + parent_hash: block.parent_id(), + producer, + total_input, + total_output, + } + } + + pub fn id(&self) -> HeaderHash { + self.id + } + + pub fn date(&self) -> BlockDate { + self.date + } + + pub fn chain_length(&self) -> ChainLength { + self.chain_length + } + + pub fn producer(&self) -> &BlockProducer { + &self.producer + } +} + +impl ExplorerTransaction { + /// Map the given AuthenticatedTransaction to the ExplorerTransaction API representation + /// type. + /// the fragment id is the associated to the given AuthenticatedTransaction before 'unwrapping' + /// The discrimination is needed to get addresses from account inputs. + /// The transactions and blocks are used to resolve utxo inputs + + // TODO: The signature of this got too long, using a builder may be a good idea + // It's called only from one place, though, so it is not that bothersome + pub fn from<'transaction, 'context, T>( + context: &'context ExplorerBlockBuildingContext<'context>, + id: &FragmentId, + tx: &TransactionSlice<'transaction, T>, + certificate: Option, + offset_in_block: u32, + transactions_in_current_block: &HashMap, + ) -> ExplorerTransaction { + let outputs = tx.outputs().iter(); + let inputs = tx.inputs().iter(); + let witnesses = tx.witnesses().iter(); + + let new_outputs = outputs + .map(|output| ExplorerOutput { + address: ExplorerAddress::New(output.address.clone()), + value: output.value, + }) + .collect(); + + let new_inputs = inputs + .map(|i| i.to_enum()) + .zip(witnesses) + .filter_map(|input_with_witness| match input_with_witness { + (InputEnum::AccountInput(id, value), Witness::Account(_)) => { + let kind = chain_addr::Kind::Account( + id.to_single_account() + .expect("the input to be validated") + .into(), + ); + let address = ExplorerAddress::New(Address(context.discrimination, kind)); + Some(ExplorerInput { address, value }) + } + (InputEnum::AccountInput(id, value), Witness::Multisig(_)) => { + let kind = chain_addr::Kind::Multisig( + id.to_multi_account() + .as_ref() + .try_into() + .expect("multisig identifier size doesn't match address kind"), + ); + let address = ExplorerAddress::New(Address(context.discrimination, kind)); + Some(ExplorerInput { address, value }) + } + (InputEnum::UtxoInput(utxo_pointer), _witness) => { + let tx = utxo_pointer.transaction_id; + let index = utxo_pointer.output_index; + + let output = context + .prev_transactions + .lookup(&tx) + .and_then(|block_id| { + context + .prev_blocks + .lookup(&block_id) + .map(|block| &block.transactions[&tx].outputs[index as usize]) + }) + .or_else(|| { + transactions_in_current_block + .get(&tx) + .map(|fragment| &fragment.outputs[index as usize]) + }) + .expect("transaction not found for utxo input"); + + Some(ExplorerInput { + address: output.address.clone(), + value: output.value, + }) + } + _ => None, + }) + .collect(); + + ExplorerTransaction { + id: *id, + inputs: new_inputs, + outputs: new_outputs, + certificate, + offset_in_block, + } + } + + pub fn id(&self) -> FragmentId { + self.id + } + + pub fn inputs(&self) -> &Vec { + &self.inputs + } + + pub fn outputs(&self) -> &Vec { + &self.outputs + } +} diff --git a/explorer/src/db/mod.rs b/explorer/src/db/mod.rs new file mode 100644 index 0000000000..728063979d --- /dev/null +++ b/explorer/src/db/mod.rs @@ -0,0 +1,541 @@ +pub mod error; +pub mod indexing; +pub mod persistent_sequence; + +use self::error::Error; +use self::indexing::{ + Addresses, Blocks, ChainLengths, EpochData, Epochs, ExplorerAddress, ExplorerBlock, StakePool, + StakePoolBlocks, StakePoolData, Transactions, +}; +use self::persistent_sequence::PersistentSequence; +use chain_addr::Discrimination; +use chain_core::property::Block as _; +use chain_impl_mockchain::{ + block::{Block, ChainLength}, + certificate::{Certificate, PoolId}, + chaintypes::ConsensusVersion, + config::ConfigParam, + fee::LinearFee, + fragment::{ConfigParams, Fragment, FragmentId}, + header::{Epoch, HeaderId as HeaderHash}, + multiverse, +}; +use std::convert::Infallible; +use std::sync::Arc; +use tokio::sync::RwLock; + +#[derive(Clone)] +pub struct DB { + /// Structure that keeps all the known states to allow easy branch management + /// each new block is indexed by getting its previous `State` from the multiverse + /// and inserted a new updated one. + multiverse: Multiverse, + pub blockchain_config: BlockchainConfig, + pub tip: HeaderHash, +} + +#[derive(Clone)] +pub struct BlockchainConfig { + /// Used to construct `Address` from `AccountIndentifier` when processing transaction + /// inputs + pub discrimination: Discrimination, + pub consensus_version: ConsensusVersion, + pub fees: LinearFee, +} + +/// Inmutable data structure used to represent the explorer's state at a given Block +/// A new state can be obtained to from a Block and it's previous state, getting two +/// independent states but with memory sharing to minimize resource utilization +#[derive(Clone)] +pub struct State { + parent_ref: Option>, + transactions: Transactions, + blocks: Blocks, + addresses: Addresses, + epochs: Epochs, + chain_lengths: ChainLengths, + stake_pool_data: StakePool, + stake_pool_blocks: StakePoolBlocks, +} + +impl DB { + /// Apply all the blocks in the [block0, MAIN_BRANCH_TAG], also extract the static + /// Blockchain settings from the Block0 (Discrimination) + /// This function is only called once on the node's bootstrap phase + pub async fn bootstrap(block0: Block) -> Result { + let blockchain_config = BlockchainConfig::from_config_params( + block0 + .contents + .iter() + .filter_map(|fragment| match fragment { + Fragment::Initial(config_params) => Some(config_params), + _ => None, + }) + .next() + .expect("the Initial fragment to be present in the genesis block"), + ); + + let block = ExplorerBlock::resolve_from( + &block0, + indexing::ExplorerBlockBuildingContext { + discrimination: blockchain_config.discrimination, + prev_transactions: &Transactions::new(), + prev_blocks: &Blocks::new(), + }, + ); + + let blocks = apply_block_to_blocks(Blocks::new(), &block)?; + let epochs = apply_block_to_epochs(Epochs::new(), &block); + let chain_lengths = apply_block_to_chain_lengths(ChainLengths::new(), &block)?; + let transactions = apply_block_to_transactions(Transactions::new(), &block)?; + let addresses = apply_block_to_addresses(Addresses::new(), &block)?; + let (stake_pool_data, stake_pool_blocks) = + apply_block_to_stake_pools(StakePool::new(), StakePoolBlocks::new(), &block); + + let initial_state = State { + blocks, + epochs, + chain_lengths, + transactions, + addresses, + stake_pool_data, + stake_pool_blocks, + parent_ref: None, + }; + + let multiverse = Multiverse::::new(); + let block0_id = block0.id(); + let _initial_state_ref = multiverse + .insert(block0.chain_length(), block0_id, initial_state) + .await; + + let bootstraped_db = DB { + multiverse, + blockchain_config, + tip: block0_id, + }; + + Ok(bootstraped_db) + } + + /// Try to add a new block to the indexes, this can fail if the parent of the block is + /// not processed. Also, update the longest seen chain with this block as tip if its + /// chain length is greater than the current. + /// This doesn't perform any validation on the given block and the previous state, it + /// is assumed that the Block is valid + pub async fn apply_block(&mut self, block: Block) -> Result, Error> { + let previous_block = block.header.block_parent_hash(); + let chain_length = block.header.chain_length(); + let block_id = block.header.hash(); + let multiverse = self.multiverse.clone(); + let discrimination = self.blockchain_config.discrimination; + + let previous_state: multiverse::Ref = multiverse + .get_ref(previous_block) + .await + .ok_or_else(|| Error::AncestorNotFound(format!("{}", block.id())))?; + + let State { + parent_ref: _, + transactions, + blocks, + addresses, + epochs, + chain_lengths, + stake_pool_data, + stake_pool_blocks, + } = previous_state.state().clone(); + + let explorer_block = ExplorerBlock::resolve_from( + &block, + indexing::ExplorerBlockBuildingContext { + discrimination, + prev_transactions: &transactions, + prev_blocks: &blocks, + }, + ); + let (stake_pool_data, stake_pool_blocks) = + apply_block_to_stake_pools(stake_pool_data, stake_pool_blocks, &explorer_block); + + let state_ref = multiverse + .insert( + chain_length, + block_id, + State { + parent_ref: Some(previous_state), + transactions: apply_block_to_transactions(transactions, &explorer_block)?, + blocks: apply_block_to_blocks(blocks, &explorer_block)?, + addresses: apply_block_to_addresses(addresses, &explorer_block)?, + epochs: apply_block_to_epochs(epochs, &explorer_block), + chain_lengths: apply_block_to_chain_lengths(chain_lengths, &explorer_block)?, + stake_pool_data, + stake_pool_blocks, + }, + ) + .await; + + Ok(state_ref) + } + + pub async fn get_latest_block_hash(&self) -> HeaderHash { + self.tip + } + + pub async fn set_tip(&mut self, tip: HeaderHash) { + self.tip = tip; + } + + pub async fn get_block(&self, block_id: &HeaderHash) -> Option { + let block_id = *block_id; + self.with_state(block_id, move |state| { + state.and_then(|state| state.blocks.lookup(&block_id).map(|b| b.as_ref().clone())) + }) + .await + } + + pub async fn get_epoch(&self, epoch: Epoch) -> Option { + self.with_latest_state(move |state| state.epochs.lookup(&epoch).map(|e| e.as_ref().clone())) + .await + } + + pub async fn find_block_by_chain_length( + &self, + chain_length: ChainLength, + ) -> Option { + self.with_latest_state(move |state| { + state + .chain_lengths + .lookup(&chain_length) + .map(|b| *b.as_ref()) + }) + .await + } + + pub async fn find_block_hash_by_transaction( + &self, + transaction_id: &FragmentId, + ) -> Option { + self.with_latest_state(move |state| { + state + .transactions + .lookup(&transaction_id) + .map(|id| *id.as_ref()) + }) + .await + } + + pub async fn get_transactions_by_address( + &self, + address: &ExplorerAddress, + ) -> Option> { + let address = address.clone(); + self.with_latest_state(move |state| { + state + .addresses + .lookup(&address) + .map(|set| set.as_ref().clone()) + }) + .await + } + + // Get the hashes of all blocks in the range [from, to) + // the ChainLength is returned to for easy of use in the case where + // `to` is greater than the max + pub async fn get_block_hash_range( + &self, + from: ChainLength, + to: ChainLength, + ) -> Vec<(HeaderHash, ChainLength)> { + let from = u32::from(from); + let to = u32::from(to); + + self.with_latest_state(move |state| { + (from..to) + .filter_map(|i| { + state + .chain_lengths + .lookup(&i.into()) + .map(|b| (*b.as_ref(), i.into())) + }) + .collect() + }) + .await + } + + pub async fn get_stake_pool_blocks( + &self, + pool: &PoolId, + ) -> Option> { + let pool = pool.clone(); + self.with_latest_state(move |state| { + state + .stake_pool_blocks + .lookup(&pool) + .map(|i| i.as_ref().clone()) + }) + .await + } + + pub async fn get_stake_pool_data(&self, pool: &PoolId) -> Option { + let pool = pool.clone(); + self.with_latest_state(move |state| { + state + .stake_pool_data + .lookup(&pool) + .map(|i| i.as_ref().clone()) + }) + .await + } + + pub async fn get_stake_pools(&self) -> Vec<(PoolId, Arc)> { + self.with_latest_state(move |state| { + state + .stake_pool_data + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect() + }) + .await + } + + /// run given function with the longest branch's state + async fn with_latest_state(&self, f: impl Fn(State) -> T) -> T { + let multiverse = self.multiverse.clone(); + let branch_id = self.get_latest_block_hash().await; + let maybe_state = multiverse.get(branch_id).await; + let state = maybe_state.expect("the longest chain to be indexed"); + f(state) + } + + async fn with_state(&self, branch: HeaderHash, f: impl Fn(Option) -> T) -> T { + let multiverse = self.multiverse.clone(); + let maybe_state = multiverse.get(branch).await; + f(maybe_state) + } +} + +fn apply_block_to_transactions( + mut transactions: Transactions, + block: &ExplorerBlock, +) -> Result { + let block_id = block.id(); + let ids = block.transactions.values().map(|tx| tx.id()); + + for id in ids { + transactions = transactions + .insert(id, Arc::new(block_id)) + .map_err(|_| Error::TransactionAlreadyExists(format!("{}", id)))?; + } + + Ok(transactions) +} + +fn apply_block_to_blocks(blocks: Blocks, block: &ExplorerBlock) -> Result { + let block_id = block.id(); + blocks + .insert(block_id, Arc::new(block.clone())) + .map_err(|_| Error::BlockAlreadyExists(format!("{}", block_id))) +} + +fn apply_block_to_addresses( + mut addresses: Addresses, + block: &ExplorerBlock, +) -> Result { + let transactions = block.transactions.values(); + + for tx in transactions { + let id = tx.id(); + + // A Hashset is used for preventing duplicates when the address is both an + // input and an output in the given transaction + + let included_addresses: std::collections::HashSet = tx + .outputs() + .iter() + .map(|output| output.address.clone()) + .chain(tx.inputs().iter().map(|input| input.address.clone())) + .collect(); + + for address in included_addresses { + addresses = addresses.insert_or_update_simple( + address, + Arc::new(PersistentSequence::new().append(id)), + |set| { + let new_set = set.append(id); + Some(Arc::new(new_set)) + }, + ) + } + } + Ok(addresses) +} + +fn apply_block_to_epochs(epochs: Epochs, block: &ExplorerBlock) -> Epochs { + let epoch_id = block.date().epoch; + let block_id = block.id(); + + epochs.insert_or_update_simple( + epoch_id, + Arc::new(EpochData { + first_block: block_id, + last_block: block_id, + total_blocks: 0, + }), + |data| { + Some(Arc::new(EpochData { + first_block: data.first_block, + last_block: block_id, + total_blocks: data.total_blocks + 1, + })) + }, + ) +} + +fn apply_block_to_chain_lengths( + chain_lengths: ChainLengths, + block: &ExplorerBlock, +) -> Result { + let new_block_chain_length = block.chain_length(); + let new_block_hash = block.id(); + chain_lengths + .insert(new_block_chain_length, Arc::new(new_block_hash)) + .map_err(|_| { + // I think this shouldn't happen + Error::ChainLengthBlockAlreadyExists(u32::from(new_block_chain_length)) + }) +} + +fn apply_block_to_stake_pools( + data: StakePool, + blocks: StakePoolBlocks, + block: &ExplorerBlock, +) -> (StakePool, StakePoolBlocks) { + let mut blocks = match &block.producer() { + indexing::BlockProducer::StakePool(id) => blocks + .update( + &id, + |array: &Arc>| -> std::result::Result<_, Infallible> { + Ok(Some(Arc::new(array.append(block.id())))) + }, + ) + .expect("block to be created by registered stake pool"), + indexing::BlockProducer::BftLeader(_) => blocks, + indexing::BlockProducer::None => blocks, + }; + + let mut data = data; + + for tx in block.transactions.values() { + if let Some(cert) = &tx.certificate { + blocks = match cert { + Certificate::PoolRegistration(registration) => blocks + .insert(registration.to_id(), Arc::new(PersistentSequence::new())) + .expect("pool was registered more than once"), + _ => blocks, + }; + data = match cert { + Certificate::PoolRegistration(registration) => data + .insert( + registration.to_id(), + Arc::new(StakePoolData { + registration: registration.clone(), + retirement: None, + }), + ) + .expect("pool was registered more than once"), + Certificate::PoolRetirement(retirement) => { + data.update::<_, ()>(&retirement.pool_id, |pool_data| { + Ok(Some(Arc::new(StakePoolData { + registration: pool_data.registration.clone(), + retirement: Some(retirement.clone()), + }))) + }) + .expect("pool was retired before registered"); + data + } + _ => data, + }; + } + } + + (data, blocks) +} + +impl BlockchainConfig { + fn from_config_params(params: &ConfigParams) -> BlockchainConfig { + let discrimination = params + .iter() + .filter_map(|param| match param { + ConfigParam::Discrimination(discrimination) => Some(discrimination), + _ => None, + }) + .next() + .expect("the discrimination to be present"); + + let consensus_version = params + .iter() + .filter_map(|param| match param { + ConfigParam::ConsensusVersion(version) => Some(version), + _ => None, + }) + .next() + .expect("consensus version to be present"); + + let fees = params + .iter() + .filter_map(|param| match param { + ConfigParam::LinearFee(fee) => Some(fee), + _ => None, + }) + .next() + .expect("fee is not in config params"); + + BlockchainConfig { + discrimination: *discrimination, + consensus_version: *consensus_version, + fees: *fees, + } + } +} + +pub struct Multiverse { + inner: Arc>>, +} + +impl Multiverse { + pub fn new() -> Self { + Multiverse { + inner: Arc::new(RwLock::new(multiverse::Multiverse::new())), + } + } + + pub async fn insert( + &self, + chain_length: ChainLength, + hash: HeaderHash, + value: T, + ) -> multiverse::Ref { + let mut guard = self.inner.write().await; + guard.insert(chain_length, hash, value) + } + + pub async fn get_ref(&self, hash: HeaderHash) -> Option> { + let guard = self.inner.read().await; + guard.get_ref(&hash) + } +} + +impl Multiverse { + pub async fn get(&self, hash: HeaderHash) -> Option { + let guard = self.inner.read().await; + guard.get(&hash).as_deref().cloned() + } +} + +impl Clone for Multiverse { + fn clone(&self) -> Self { + Multiverse { + inner: self.inner.clone(), + } + } +} diff --git a/explorer/src/db/persistent_sequence.rs b/explorer/src/db/persistent_sequence.rs new file mode 100644 index 0000000000..4295b56cbb --- /dev/null +++ b/explorer/src/db/persistent_sequence.rs @@ -0,0 +1,41 @@ +use imhamt::Hamt; +use std::{collections::hash_map::DefaultHasher, sync::Arc}; + +// Use a Hamt to store a sequence, the indexes can be used for pagination +// XXX: Maybe there is a better data structure for this? +#[derive(Clone)] +pub struct PersistentSequence { + len: u64, + elements: Hamt>, +} + +impl PersistentSequence { + pub fn new() -> Self { + PersistentSequence { + len: 0, + elements: Hamt::new(), + } + } + + pub fn append(&self, t: T) -> Self { + let len = self.len + 1; + PersistentSequence { + len, + elements: self.elements.insert(len - 1, Arc::new(t)).unwrap(), + } + } + + pub fn get>(&self, i: I) -> Option<&Arc> { + self.elements.lookup(&i.into()) + } + + pub fn len(&self) -> u64 { + self.len + } +} + +impl Default for PersistentSequence { + fn default() -> Self { + PersistentSequence::new() + } +} diff --git a/explorer/src/db/set.rs b/explorer/src/db/set.rs new file mode 100644 index 0000000000..b4acbfddfb --- /dev/null +++ b/explorer/src/db/set.rs @@ -0,0 +1,35 @@ +use imhamt::{Hamt, HamtIter}; +use std::collections::hash_map::DefaultHasher; +use std::hash::Hash; + +#[derive(Clone)] +pub struct HamtSet(Hamt); + +impl HamtSet { + pub fn new() -> HamtSet { + HamtSet(Hamt::new()) + } + + pub fn add_element(&self, element: T) -> HamtSet { + let new_hamt = match self.0.insert(element, ()) { + Ok(new_hamt) => new_hamt, + Err(_) => self.0.clone(), + }; + + HamtSet(new_hamt) + } + + pub fn iter(&self) -> HamtSetIter { + HamtSetIter(self.0.iter()) + } +} + +pub struct HamtSetIter<'a, K>(HamtIter<'a, K, ()>); + +impl<'a, K> Iterator for HamtSetIter<'a, K> { + type Item = &'a K; + + fn next(&mut self) -> Option { + self.0.next().map(|(k, _v)| k) + } +} diff --git a/explorer/src/indexer.rs b/explorer/src/indexer.rs new file mode 100644 index 0000000000..772be3b547 --- /dev/null +++ b/explorer/src/indexer.rs @@ -0,0 +1,92 @@ +use crate::db::DB; +use chain_core::property::Header as _; +use chain_impl_mockchain::block::Block; +use chain_ser::deser::Deserialize as _; +use jormungandr_lib::crypto::hash::Hash; +use slog::{info, Logger}; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum IndexerError { + #[error("could not deserialize response")] + CannotDeserialize(#[from] serde_json::Error), + #[error("rest request error")] + RequestError(#[from] reqwest::Error), + #[error("io error")] + IOError(#[from] std::io::Error), + #[error("hash error")] + HashError(#[from] chain_crypto::hash::Error), + #[error("url error")] + UrlError(#[from] url::ParseError), + #[error(transparent)] + DBError(#[from] crate::db::error::Error), +} + +#[derive(Clone)] +pub struct Indexer { + pub db: DB, + rest: RestClient, + logger: Logger, +} + +impl Indexer { + pub fn new(rest: RestClient, db: crate::db::DB, logger: Logger) -> Self { + Indexer { db, rest, logger } + } + + pub async fn apply_or_fetch_block(&mut self, msg: Hash) -> Result<(), IndexerError> { + info!(self.logger, "applying block {}", msg); + let mut stack = vec![]; + let mut hash = msg.into_hash(); + + loop { + let block = self.rest.get_block(hash.to_string()).await?; + hash = block.header.parent_id(); + + match self.db.apply_block(block.clone()).await { + Ok(_gc_root) => break, + Err(crate::db::error::Error::AncestorNotFound(_missing)) => { + stack.push(block); + continue; + } + Err(e) => return Err(e.into()), + } + } + + while let Some(block) = stack.pop() { + self.db.apply_block(block).await.expect("shouldn't fail"); + } + + Ok(()) + } + + pub async fn set_tip(&mut self, tip: Hash) -> Result<(), IndexerError> { + info!(self.logger, "seting tip to {}", tip); + self.db.set_tip(tip.into_hash()).await; + Ok(()) + } +} + +#[derive(Clone)] +pub struct RestClient { + client: reqwest::Client, + url: url::Url, +} + +impl RestClient { + pub fn new(url: url::Url) -> Self { + RestClient { + url, + client: reqwest::Client::new(), + } + } + + pub async fn get_block(&self, hash: impl AsRef) -> Result { + let url = self.url.join("v0/block/")?.join(hash.as_ref())?; + let block_response = self.client.get(url).send().await?; + + Ok(Block::deserialize(std::io::BufReader::new( + block_response.bytes().await?.as_ref(), + ))?) + } +} diff --git a/explorer/src/main.rs b/explorer/src/main.rs new file mode 100644 index 0000000000..a12ded3949 --- /dev/null +++ b/explorer/src/main.rs @@ -0,0 +1,189 @@ +mod api; +mod db; +mod indexer; +mod settings; +mod subscription; + +use crate::indexer::Indexer; +use api::graphql::GraphQLSettings; +use backoff::{future::FutureOperation as _, ExponentialBackoff}; +use db::DB; +use futures_util::{future::BoxFuture, pin_mut, FutureExt, TryStreamExt}; +use settings::Settings; +use slog::{error, info, trace, Logger}; +use sloggers::terminal::{Destination, TerminalLoggerBuilder}; +use sloggers::types::Severity; +use sloggers::Build; +use std::sync::Arc; +use subscription::SubscriptionError; +use thiserror::Error; +use tokio::{signal::ctrl_c, sync::RwLock}; + +#[derive(Debug, Error)] +pub enum Error { + #[error(transparent)] + IndexerError(#[from] indexer::IndexerError), + #[error(transparent)] + SubscriptionError(#[from] subscription::SubscriptionError), +} + +#[tokio::main] +async fn main() { + let logger = init_logger(); + let settings = Settings::load().unwrap(); + + let notifier_url = settings.notifier_url(); + let rest_url = settings.rest_url(); + + let rest = indexer::RestClient::new(rest_url); + + info!(logger, "fetching block0 from rest"); + + let block0 = match rest.get_block(&settings.block0_hash).await { + Ok(block) => block, + Err(err) => { + error!(logger, "failed to fetch block0: {}", err); + return; + } + }; + + let db = match DB::bootstrap(block0).await { + Ok(db) => db, + Err(err) => { + error!(logger, "failed to bootstrap database: {}", err); + return; + } + }; + + let indexer = Arc::new(RwLock::new(Indexer::new(rest, db.clone(), logger.clone()))); + + let api = api::filter( + db, + GraphQLSettings { + address_bech32_prefix: settings.address_bech32_prefix.clone(), + }, + ); + + let (tx, rx) = tokio::sync::oneshot::channel(); + let mut tx = Some(tx); + + let mut shutdown_signal = { + let logger = logger.clone(); + tokio::spawn(async move { + ctrl_c().await.expect("Error setting Ctrl-C handler"); + if let Err(error) = tx.take().unwrap().send(()) { + error!(logger, "failed to send shutdown signal {:?}", error); + } + }) + } + .fuse(); + + tokio::spawn((|| { + let binding_address = settings.binding_address.clone(); + let tls = settings.tls.clone(); + let cors = settings.cors.clone(); + async move { + api::setup_cors(api, binding_address, tls, cors, async move { + rx.await.ok(); + }) + .await + } + })()); + + let subscription = + start_subscription_with_retry(logger.clone(), notifier_url, Arc::clone(&indexer)).fuse(); + + pin_mut!(subscription); + + info!(logger, "starting subscription"); + futures::select! { + _ = shutdown_signal => { + info!(logger, "interruption signal received, stopping service"); + return; }, + s = subscription => { + if let Err(e) = s { + error!(logger, "failed to start subscription: {}", e); + } + } + } +} + +fn init_logger() -> Logger { + let mut builder = TerminalLoggerBuilder::new(); + builder.level(Severity::Debug); + builder.destination(Destination::Stderr); + builder.build().expect("Couldn't initialize logger") +} + +type SharedIndexer = Arc>; + +async fn start_subscription_with_retry( + logger: Logger, + notifier_url: url::Url, + indexer: SharedIndexer, +) -> Result<(), Error> { + let op = move || -> BoxFuture>> { + let notifier_url = notifier_url.clone(); + let indexer = Arc::clone(&indexer); + let logger = logger.clone(); + + Box::pin(async move { + let indexer = Arc::clone(&indexer); + + let sub = subscription::start_subscription(notifier_url.clone(), logger.clone()) + .await + .map_err(|err| match dbg!(err) { + e if matches!( + &e, + subscription::SubscriptionError::Tungstenite( + async_tungstenite::tungstenite::Error::Io(_), + ) + ) => + { + error!( + logger, + "failed to establish connection with node, reason {}", &e + ); + backoff::Error::Transient(Error::from(e)) + } + e => backoff::Error::Permanent(Error::from(e)), + })?; + + sub.map_err(Error::from) + .try_for_each(|msg| { + let indexer = Arc::clone(&indexer); + let logger = logger.clone(); + + async move { + trace!(logger, "processing new subscription message {:?}", &msg); + match msg { + subscription::JsonMessage::NewBlock(hash) => { + indexer.write().await.apply_or_fetch_block(hash).await?; + } + subscription::JsonMessage::NewTip(hash) => { + indexer.write().await.set_tip(hash).await?; + } + } + Ok(()) + } + }) + .await + .map_err(|err| match err { + e if matches!( + e, + Error::SubscriptionError(SubscriptionError::MaxConnectionsReached) | + Error::SubscriptionError(SubscriptionError::Tungstenite( + async_tungstenite::tungstenite::Error::Protocol(_) + )) + ) => + { + error!(logger, "couldn't connect to notifier"); + backoff::Error::Transient(e) + } + e => backoff::Error::Permanent(e), + }) + }) + }; + + op.retry(ExponentialBackoff::default()).await +} diff --git a/explorer/src/settings.rs b/explorer/src/settings.rs new file mode 100644 index 0000000000..ae04c7c432 --- /dev/null +++ b/explorer/src/settings.rs @@ -0,0 +1,120 @@ +use jormungandr_lib::interfaces::{Cors, Tls}; +use serde::Deserialize; +use std::{fs::File, net::SocketAddr, path::PathBuf}; +use structopt::StructOpt; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum Error { + #[error(transparent)] + Format(#[from] serde_yaml::Error), + #[error(transparent)] + IO(#[from] std::io::Error), + #[error("Invalid host")] + InvalidHost, +} + +#[derive(Debug)] +pub struct Settings { + pub host: url::Host, + pub port: u16, + pub block0_hash: String, + pub binding_address: SocketAddr, + pub address_bech32_prefix: String, + pub tls: Option, + pub cors: Option, +} + +impl Settings { + pub fn load() -> Result { + let cmd = CommandLine::from_args(); + let file: Option = cmd + .config + .map(|file_path| -> Result { + serde_yaml::from_reader(File::open(file_path)?).map_err(Error::from) + }) + .transpose()?; + + let file_host = file + .as_ref() + .and_then(|f| f.host.as_ref()) + .map(|host| url::Host::parse(&host)) + .transpose() + .map_err(|_| Error::InvalidHost)?; + + let host = cmd + .host + .or(file_host) + .unwrap_or(url::Host::parse("localhost").unwrap()); + + let port = cmd + .port + .or(file.as_ref().and_then(|f| f.port)) + .unwrap_or(8443); + + let block0_hash = cmd.block0_hash; + + let binding_address = cmd + .binding_address + .or(file.as_ref().and_then(|f| f.binding_address)) + .unwrap_or("0.0.0.0:3030".parse().unwrap()); + + let address_bech32_prefix = cmd + .address_bech32_prefix + .or(file.as_ref().and_then(|f| f.address_bech32_prefix.clone())) + .unwrap_or("addr".to_string()); + + let tls = file.as_ref().and_then(|settings| settings.tls.clone()); + let cors = file.as_ref().and_then(|settings| settings.cors.clone()); + + Ok(Settings { + host, + port, + block0_hash, + binding_address, + address_bech32_prefix, + tls, + cors, + }) + } + + pub fn rest_url(&self) -> url::Url { + // TODO: this can't fail I think, there must be some way of removing this expect + url::Url::parse(&format!("http://{}:{}/api/", self.host, self.port)) + .expect("couldn't form rest url") + } + + pub fn notifier_url(&self) -> url::Url { + url::Url::parse(&format!("ws://{}:{}/api/", self.host, self.port)) + .expect("couldn't form base url") + .join("v1/notifier") + .unwrap() + } +} + +#[derive(Debug, StructOpt)] +#[structopt(name = "config")] +struct CommandLine { + #[structopt(long, parse(try_from_str = url::Host::parse))] + pub host: Option, + #[structopt(long)] + pub port: Option, + pub block0_hash: String, + #[structopt(long)] + pub binding_address: Option, + #[structopt(long)] + pub address_bech32_prefix: Option, + pub config: Option, +} + +#[derive(Debug, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct Config { + pub storage: Option, + pub tls: Option, + pub cors: Option, + pub host: Option, + pub port: Option, + pub binding_address: Option, + pub address_bech32_prefix: Option, +} diff --git a/explorer/src/subscription.rs b/explorer/src/subscription.rs new file mode 100644 index 0000000000..21593eb256 --- /dev/null +++ b/explorer/src/subscription.rs @@ -0,0 +1,55 @@ +use async_tungstenite::{ + tungstenite::protocol::frame::coding::CloseCode, tungstenite::protocol::Message, +}; +use futures_util::{StreamExt, TryStreamExt}; +use jormungandr_lib::interfaces::notifier; +use slog::info; +use thiserror::Error; + +pub use notifier::JsonMessage; + +#[derive(Debug, Error)] +pub enum SubscriptionError { + #[error("max connections reached")] + MaxConnectionsReached, + #[error("unexpected close event")] + UnexpectedCloseEvent, + #[error(transparent)] + Tungstenite(#[from] async_tungstenite::tungstenite::Error), + #[error(transparent)] + CannotDeserialize(#[from] serde_json::Error), + #[error(transparent)] + RequestError(#[from] reqwest::Error), +} + +pub async fn start_subscription( + url: url::Url, + logger: slog::Logger, +) -> Result>, SubscriptionError> { + info!(logger, "starting subscription"); + let (ws_stream, _) = async_tungstenite::tokio::connect_async(url).await?; + info!( + logger, + "WebSocket handshake has been successfully completed" + ); + + let (_write, read) = ws_stream.split(); + + Ok(read + .map_err(SubscriptionError::Tungstenite) + .and_then(process_message)) +} + +async fn process_message(msg: Message) -> Result { + match msg { + Message::Text(text) => { + let json_msg: JsonMessage = serde_json::from_str(&text)?; + Ok(json_msg) + } + Message::Close(close_frame) => match close_frame.expect("no close code").code { + CloseCode::Library(4000) => Err(SubscriptionError::MaxConnectionsReached), + _ => Err(SubscriptionError::UnexpectedCloseEvent), + }, + _ => unreachable!("unexpected notifier message"), + } +}