From 84bc7e4bc4668ecccc43ca36f0e0121fa35a7bc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bruno=20Fran=C3=A7a?= Date: Wed, 13 Nov 2024 11:47:34 +0000 Subject: [PATCH] refactor: Removing actor pipes and naming (#213) Getting rid of the whole actor pipe architecture. It was only created because we thought we would have many components, in reality we ended up with just two big components. This is just a refactor, no logic changes. --- node/Cargo.lock | 330 +++++++++++------- node/Cargo.toml | 16 +- node/actors/bft/src/io.rs | 23 -- node/actors/bft/src/lib.rs | 102 ------ node/actors/executor/src/io.rs | 70 ---- node/actors/network/src/io.rs | 40 --- node/actors/network/src/state.rs | 1 - node/{actors => components}/bft/Cargo.toml | 2 +- .../bft/src/chonky_bft/block.rs | 0 .../bft/src/chonky_bft/commit.rs | 0 .../bft/src/chonky_bft/mod.rs | 67 +--- .../bft/src/chonky_bft/new_view.rs | 4 +- .../bft/src/chonky_bft/proposal.rs | 2 +- .../bft/src/chonky_bft/proposer.rs | 6 +- .../bft/src/chonky_bft/testonly.rs | 42 +-- .../bft/src/chonky_bft/tests/commit.rs | 13 +- .../bft/src/chonky_bft/tests/mod.rs | 0 .../bft/src/chonky_bft/tests/new_view.rs | 0 .../bft/src/chonky_bft/tests/proposal.rs | 0 .../bft/src/chonky_bft/tests/timeout.rs | 13 +- .../bft/src/chonky_bft/timeout.rs | 2 +- node/{actors => components}/bft/src/config.rs | 2 +- node/components/bft/src/lib.rs | 135 +++++++ .../{actors => components}/bft/src/metrics.rs | 0 .../bft/src/testonly/make.rs | 16 +- .../bft/src/testonly/mod.rs | 0 .../bft/src/testonly/node.rs | 56 +-- .../bft/src/testonly/run.rs | 90 +++-- .../bft/src/testonly/twins/mod.rs | 0 .../bft/src/testonly/twins/partition.rs | 0 .../bft/src/testonly/twins/scenario.rs | 0 .../bft/src/testonly/twins/tests.rs | 0 .../bft/src/tests/mod.rs | 0 .../bft/src/tests/twins.rs | 5 +- .../executor/Cargo.toml | 2 +- .../executor/src/lib.rs | 26 +- .../executor/src/tests.rs | 0 .../{actors => components}/network/Cargo.toml | 2 +- node/{actors => components}/network/build.rs | 0 .../network/src/config.rs | 6 +- .../network/src/consensus/handshake/mod.rs | 0 .../src/consensus/handshake/testonly.rs | 0 .../network/src/consensus/handshake/tests.rs | 0 .../network/src/consensus/mod.rs | 12 +- .../network/src/consensus/tests.rs | 38 +- .../network/src/debug_page/mod.rs | 0 .../network/src/debug_page/style.css | 0 .../network/src/frame.rs | 0 .../network/src/gossip/attestation/metrics.rs | 0 .../network/src/gossip/attestation/mod.rs | 0 .../network/src/gossip/attestation/tests.rs | 0 .../network/src/gossip/fetch.rs | 0 .../network/src/gossip/handshake/mod.rs | 0 .../network/src/gossip/handshake/testonly.rs | 0 .../network/src/gossip/handshake/tests.rs | 0 .../network/src/gossip/loadtest/mod.rs | 0 .../network/src/gossip/loadtest/tests.rs | 0 .../network/src/gossip/mod.rs | 10 +- .../network/src/gossip/runner.rs | 0 .../network/src/gossip/testonly.rs | 0 .../network/src/gossip/tests/fetch_blocks.rs | 0 .../network/src/gossip/tests/mod.rs | 17 +- .../network/src/gossip/tests/syncing.rs | 0 .../network/src/gossip/validator_addrs.rs | 0 node/components/network/src/io.rs | 20 ++ .../{actors => components}/network/src/lib.rs | 44 ++- .../network/src/metrics.rs | 0 .../network/src/mux/config.rs | 0 .../network/src/mux/handshake.rs | 0 .../network/src/mux/header.rs | 0 .../network/src/mux/mod.rs | 0 .../network/src/mux/reusable_stream.rs | 0 .../network/src/mux/tests/mod.rs | 0 .../network/src/mux/tests/proto/mod.proto | 0 .../network/src/mux/tests/proto/mod.rs | 0 .../network/src/mux/transient_stream.rs | 0 .../network/src/noise/bytes.rs | 0 .../network/src/noise/mod.rs | 0 .../network/src/noise/stream.rs | 0 .../network/src/noise/testonly.rs | 0 .../network/src/noise/tests.rs | 0 .../network/src/pool.rs | 0 .../network/src/preface.rs | 0 .../network/src/proto/consensus.proto | 0 .../network/src/proto/gossip.proto | 0 .../network/src/proto/mod.rs | 0 .../network/src/proto/mux.proto | 0 .../network/src/proto/ping.proto | 0 .../network/src/proto/preface.proto | 0 .../network/src/proto/rpc.proto | 0 .../network/src/rpc/consensus.rs | 0 .../network/src/rpc/get_block.rs | 0 .../network/src/rpc/metrics.rs | 0 .../network/src/rpc/mod.rs | 0 .../network/src/rpc/ping.rs | 0 .../network/src/rpc/push_batch_votes.rs | 0 .../network/src/rpc/push_block_store_state.rs | 0 .../network/src/rpc/push_validator_addrs.rs | 0 .../network/src/rpc/testonly.rs | 0 .../network/src/rpc/tests.rs | 0 .../network/src/testonly.rs | 79 +++-- .../network/src/tests.rs | 0 .../network/src/watch.rs | 0 node/deny.toml | 34 +- .../concurrency/src/sync/prunable_mpsc/mod.rs | 5 + .../roles/src/validator/messages/genesis.rs | 2 +- node/libs/utils/src/lib.rs | 1 - node/libs/utils/src/pipe.rs | 76 ---- node/tools/src/main.rs | 2 +- 109 files changed, 645 insertions(+), 768 deletions(-) delete mode 100644 node/actors/bft/src/io.rs delete mode 100644 node/actors/bft/src/lib.rs delete mode 100644 node/actors/executor/src/io.rs delete mode 100644 node/actors/network/src/io.rs delete mode 100644 node/actors/network/src/state.rs rename node/{actors => components}/bft/Cargo.toml (94%) rename node/{actors => components}/bft/src/chonky_bft/block.rs (100%) rename node/{actors => components}/bft/src/chonky_bft/commit.rs (100%) rename node/{actors => components}/bft/src/chonky_bft/mod.rs (85%) rename node/{actors => components}/bft/src/chonky_bft/new_view.rs (98%) rename node/{actors => components}/bft/src/chonky_bft/proposal.rs (99%) rename node/{actors => components}/bft/src/chonky_bft/proposer.rs (94%) rename node/{actors => components}/bft/src/chonky_bft/testonly.rs (90%) rename node/{actors => components}/bft/src/chonky_bft/tests/commit.rs (97%) rename node/{actors => components}/bft/src/chonky_bft/tests/mod.rs (100%) rename node/{actors => components}/bft/src/chonky_bft/tests/new_view.rs (100%) rename node/{actors => components}/bft/src/chonky_bft/tests/proposal.rs (100%) rename node/{actors => components}/bft/src/chonky_bft/tests/timeout.rs (97%) rename node/{actors => components}/bft/src/chonky_bft/timeout.rs (99%) rename node/{actors => components}/bft/src/config.rs (95%) create mode 100644 node/components/bft/src/lib.rs rename node/{actors => components}/bft/src/metrics.rs (100%) rename node/{actors => components}/bft/src/testonly/make.rs (79%) rename node/{actors => components}/bft/src/testonly/mod.rs (100%) rename node/{actors => components}/bft/src/testonly/node.rs (54%) rename node/{actors => components}/bft/src/testonly/run.rs (89%) rename node/{actors => components}/bft/src/testonly/twins/mod.rs (100%) rename node/{actors => components}/bft/src/testonly/twins/partition.rs (100%) rename node/{actors => components}/bft/src/testonly/twins/scenario.rs (100%) rename node/{actors => components}/bft/src/testonly/twins/tests.rs (100%) rename node/{actors => components}/bft/src/tests/mod.rs (100%) rename node/{actors => components}/bft/src/tests/twins.rs (98%) rename node/{actors => components}/executor/Cargo.toml (93%) rename node/{actors => components}/executor/src/lib.rs (87%) rename node/{actors => components}/executor/src/tests.rs (100%) rename node/{actors => components}/network/Cargo.toml (96%) rename node/{actors => components}/network/build.rs (100%) rename node/{actors => components}/network/src/config.rs (97%) rename node/{actors => components}/network/src/consensus/handshake/mod.rs (100%) rename node/{actors => components}/network/src/consensus/handshake/testonly.rs (100%) rename node/{actors => components}/network/src/consensus/handshake/tests.rs (100%) rename node/{actors => components}/network/src/consensus/mod.rs (97%) rename node/{actors => components}/network/src/consensus/tests.rs (94%) rename node/{actors => components}/network/src/debug_page/mod.rs (100%) rename node/{actors => components}/network/src/debug_page/style.css (100%) rename node/{actors => components}/network/src/frame.rs (100%) rename node/{actors => components}/network/src/gossip/attestation/metrics.rs (100%) rename node/{actors => components}/network/src/gossip/attestation/mod.rs (100%) rename node/{actors => components}/network/src/gossip/attestation/tests.rs (100%) rename node/{actors => components}/network/src/gossip/fetch.rs (100%) rename node/{actors => components}/network/src/gossip/handshake/mod.rs (100%) rename node/{actors => components}/network/src/gossip/handshake/testonly.rs (100%) rename node/{actors => components}/network/src/gossip/handshake/tests.rs (100%) rename node/{actors => components}/network/src/gossip/loadtest/mod.rs (100%) rename node/{actors => components}/network/src/gossip/loadtest/tests.rs (100%) rename node/{actors => components}/network/src/gossip/mod.rs (95%) rename node/{actors => components}/network/src/gossip/runner.rs (100%) rename node/{actors => components}/network/src/gossip/testonly.rs (100%) rename node/{actors => components}/network/src/gossip/tests/fetch_blocks.rs (100%) rename node/{actors => components}/network/src/gossip/tests/mod.rs (97%) rename node/{actors => components}/network/src/gossip/tests/syncing.rs (100%) rename node/{actors => components}/network/src/gossip/validator_addrs.rs (100%) create mode 100644 node/components/network/src/io.rs rename node/{actors => components}/network/src/lib.rs (86%) rename node/{actors => components}/network/src/metrics.rs (100%) rename node/{actors => components}/network/src/mux/config.rs (100%) rename node/{actors => components}/network/src/mux/handshake.rs (100%) rename node/{actors => components}/network/src/mux/header.rs (100%) rename node/{actors => components}/network/src/mux/mod.rs (100%) rename node/{actors => components}/network/src/mux/reusable_stream.rs (100%) rename node/{actors => components}/network/src/mux/tests/mod.rs (100%) rename node/{actors => components}/network/src/mux/tests/proto/mod.proto (100%) rename node/{actors => components}/network/src/mux/tests/proto/mod.rs (100%) rename node/{actors => components}/network/src/mux/transient_stream.rs (100%) rename node/{actors => components}/network/src/noise/bytes.rs (100%) rename node/{actors => components}/network/src/noise/mod.rs (100%) rename node/{actors => components}/network/src/noise/stream.rs (100%) rename node/{actors => components}/network/src/noise/testonly.rs (100%) rename node/{actors => components}/network/src/noise/tests.rs (100%) rename node/{actors => components}/network/src/pool.rs (100%) rename node/{actors => components}/network/src/preface.rs (100%) rename node/{actors => components}/network/src/proto/consensus.proto (100%) rename node/{actors => components}/network/src/proto/gossip.proto (100%) rename node/{actors => components}/network/src/proto/mod.rs (100%) rename node/{actors => components}/network/src/proto/mux.proto (100%) rename node/{actors => components}/network/src/proto/ping.proto (100%) rename node/{actors => components}/network/src/proto/preface.proto (100%) rename node/{actors => components}/network/src/proto/rpc.proto (100%) rename node/{actors => components}/network/src/rpc/consensus.rs (100%) rename node/{actors => components}/network/src/rpc/get_block.rs (100%) rename node/{actors => components}/network/src/rpc/metrics.rs (100%) rename node/{actors => components}/network/src/rpc/mod.rs (100%) rename node/{actors => components}/network/src/rpc/ping.rs (100%) rename node/{actors => components}/network/src/rpc/push_batch_votes.rs (100%) rename node/{actors => components}/network/src/rpc/push_block_store_state.rs (100%) rename node/{actors => components}/network/src/rpc/push_validator_addrs.rs (100%) rename node/{actors => components}/network/src/rpc/testonly.rs (100%) rename node/{actors => components}/network/src/rpc/tests.rs (100%) rename node/{actors => components}/network/src/testonly.rs (83%) rename node/{actors => components}/network/src/tests.rs (100%) rename node/{actors => components}/network/src/watch.rs (100%) delete mode 100644 node/libs/utils/src/pipe.rs diff --git a/node/Cargo.lock b/node/Cargo.lock index 222483f7..9d9bb9fa 100644 --- a/node/Cargo.lock +++ b/node/Cargo.lock @@ -76,9 +76,9 @@ dependencies = [ [[package]] name = "allocator-api2" -version = "0.2.18" +version = "0.2.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" +checksum = "45862d1c77f2228b9e10bc609d5bc203d86ebc9b87ad8d5d5167a6c9abf739d9" [[package]] name = "anes" @@ -137,9 +137,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.92" +version = "1.0.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74f37166d7d48a0284b99dd824694c26119c700b53bf0d1540cdb147dbdaaf13" +checksum = "4c95c10ba0b00a02636238b814946408b1322d5ac4760326e6fb8ec956d85775" [[package]] name = "assert_matches" @@ -178,7 +178,7 @@ checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -189,7 +189,7 @@ checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -308,7 +308,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.87", + "syn", ] [[package]] @@ -330,7 +330,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.87", + "syn", "which", ] @@ -440,9 +440,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.1.35" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f57c4b4da2a9d619dd035f27316d7a426305b75be93d09e92f2b9229c34feaf" +checksum = "1aeb932158bd710538c73702db6945cb68a8fb08c519e6e12706b94263b36db8" dependencies = [ "jobserver", "libc", @@ -584,7 +584,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -657,9 +657,9 @@ checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" [[package]] name = "cpufeatures" -version = "0.2.14" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "608697df725056feaccfa42cffdaeeec3fccc4ffc38358ecd19b243e716a78e0" +checksum = "0ca741a962e1b0bff6d724a1a0958b686406e853bb14061f218562e1896f95e6" dependencies = [ "libc", ] @@ -761,7 +761,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "edb49164822f3ee45b17acd4a208cfc1251410cf0cad9a833234c9890774dd9f" dependencies = [ "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -797,7 +797,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -821,7 +821,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.87", + "syn", ] [[package]] @@ -832,7 +832,7 @@ checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" dependencies = [ "darling_core", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -854,17 +854,6 @@ dependencies = [ "powerfmt", ] -[[package]] -name = "derivative" -version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "diff" version = "0.1.13" @@ -891,7 +880,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -951,6 +940,18 @@ dependencies = [ "zeroize", ] +[[package]] +name = "educe" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d7bc049e1bd8cdeb31b68bbd586a9464ecf9f3944af3958a7a9d0f8b9799417" +dependencies = [ + "enum-ordinalize", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "either" version = "1.13.0" @@ -985,6 +986,26 @@ dependencies = [ "stable_deref_trait", ] +[[package]] +name = "enum-ordinalize" +version = "4.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea0dcfa4e54eeb516fe454635a95753ddd39acda650ce703031c6973e315dd5" +dependencies = [ + "enum-ordinalize-derive", +] + +[[package]] +name = "enum-ordinalize-derive" +version = "4.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d28318a75d4aead5c4db25382e8ef717932d0346600cacae6357eb5941bc5ff" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -1024,9 +1045,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "2.1.1" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6" +checksum = "486f806e73c5707928240ddc295403b1b93c96a02038563881c4a2fd84b81ac4" [[package]] name = "ff" @@ -1050,6 +1071,15 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "fluent-uri" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17c704e9dbe1ddd863da1e6ff3567795087b1eb201ce80d8fa81162e1516500d" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1115,7 +1145,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -1248,6 +1278,30 @@ version = "0.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a9bfc1af68b1726ea47d3d5109de126281def866b33970e10fbab11b5dafab3" +[[package]] +name = "headers" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "322106e6bd0cba2d5ead589ddb8150a13d7c4217cf80d7c4f682ca994ccc6aa9" +dependencies = [ + "base64 0.21.7", + "bytes", + "headers-core", + "http 1.1.0", + "httpdate", + "mime", + "sha1", +] + +[[package]] +name = "headers-core" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4" +dependencies = [ + "http 1.1.0", +] + [[package]] name = "heck" version = "0.5.0" @@ -1408,6 +1462,26 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-http-proxy" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d06dbdfbacf34d996c6fb540a71a684a7aae9056c71951163af8a8a4c07b9a4" +dependencies = [ + "bytes", + "futures-util", + "headers", + "http 1.1.0", + "hyper 1.5.0", + "hyper-rustls", + "hyper-util", + "pin-project-lite", + "rustls-native-certs 0.7.3", + "tokio", + "tokio-rustls", + "tower-service", +] + [[package]] name = "hyper-rustls" version = "0.27.3" @@ -1574,7 +1648,7 @@ checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -1727,10 +1801,11 @@ dependencies = [ [[package]] name = "json-patch" -version = "1.4.0" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec9ad60d674508f3ca8f380a928cfe7b096bc729c4e2dbfe3852bc45da3ab30b" +checksum = "5b1fb8864823fad91877e6caea0baca82e49e8db50f8e5c9f9a453e27d3330fc" dependencies = [ + "jsonptr", "serde", "serde_json", "thiserror", @@ -1751,6 +1826,17 @@ dependencies = [ "thiserror", ] +[[package]] +name = "jsonptr" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c6e529149475ca0b2820835d3dce8fcc41c6b943ca608d32f35b449255e4627" +dependencies = [ + "fluent-uri", + "serde", + "serde_json", +] + [[package]] name = "jsonrpsee" version = "0.23.2" @@ -1809,7 +1895,7 @@ dependencies = [ "serde_json", "thiserror", "tokio", - "tower", + "tower 0.4.13", "tracing", "url", ] @@ -1838,7 +1924,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-util", - "tower", + "tower 0.4.13", "tracing", ] @@ -1871,9 +1957,9 @@ dependencies = [ [[package]] name = "k8s-openapi" -version = "0.22.0" +version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19501afb943ae5806548bc3ebd7f3374153ca057a38f480ef30adfde5ef09755" +checksum = "9c8847402328d8301354c94d605481f25a6bdc1ed65471fd96af8eca71141b13" dependencies = [ "base64 0.22.1", "chrono", @@ -1893,9 +1979,9 @@ dependencies = [ [[package]] name = "kube" -version = "0.91.0" +version = "0.96.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "264461a7ebf4fb0fcf23e4c7e4f9387c5696ee61d003de207d9b5a895ff37bfa" +checksum = "efffeb3df0bd4ef3e5d65044573499c0e4889b988070b08c50b25b1329289a1f" dependencies = [ "k8s-openapi", "kube-client", @@ -1906,9 +1992,9 @@ dependencies = [ [[package]] name = "kube-client" -version = "0.91.0" +version = "0.96.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47164ad6c47398ee4bdf90509c7b44026229721cb1377eb4623a1ec2a00a85e9" +checksum = "8bf471ece8ff8d24735ce78dac4d091e9fcb8d74811aeb6b75de4d1c3f5de0f1" dependencies = [ "base64 0.22.1", "bytes", @@ -1920,6 +2006,7 @@ dependencies = [ "http-body 1.0.1", "http-body-util", "hyper 1.5.0", + "hyper-http-proxy", "hyper-rustls", "hyper-timeout", "hyper-util", @@ -1936,16 +2023,16 @@ dependencies = [ "thiserror", "tokio", "tokio-util", - "tower", + "tower 0.5.1", "tower-http", "tracing", ] [[package]] name = "kube-core" -version = "0.91.0" +version = "0.96.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2797d3044a238825432129cd9537e12c2a6dacbbb5352381af5ea55e1505ed4f" +checksum = "f42346d30bb34d1d7adc5c549b691bce7aa3a1e60254e68fab7e2d7b26fe3d77" dependencies = [ "chrono", "form_urlencoded", @@ -1954,45 +2041,46 @@ dependencies = [ "k8s-openapi", "schemars", "serde", + "serde-value", "serde_json", "thiserror", ] [[package]] name = "kube-derive" -version = "0.91.0" +version = "0.96.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fcf837edaa0c478f85e9a3cddb17fa80d58a57c1afa722b3a9e55753ea162f41" +checksum = "f9364e04cc5e0482136c6ee8b7fb7551812da25802249f35b3def7aaa31e82ad" dependencies = [ "darling", "proc-macro2", "quote", "serde_json", - "syn 2.0.87", + "syn", ] [[package]] name = "kube-runtime" -version = "0.91.0" +version = "0.96.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e463e89a1fb222c65a5469b568803153d1bf13d084a8dd42b659e6cca66edc6e" +checksum = "d3fbf1f6ffa98e65f1d2a9a69338bb60605d46be7edf00237784b89e62c9bd44" dependencies = [ "ahash", "async-broadcast", "async-stream", "async-trait", "backoff", - "derivative", + "educe", "futures", "hashbrown 0.14.5", "json-patch", + "jsonptr", "k8s-openapi", "kube-client", "parking_lot", "pin-project", "serde", "serde_json", - "smallvec", "thiserror", "tokio", "tokio-util", @@ -2013,9 +2101,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.161" +version = "0.2.162" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e9489c2807c139ffd9c1794f4af0ebe86a828db53ecdc7fea2111d0fed085d1" +checksum = "18d287de67fe55fd7e1581fe933d965a5a9477b38e949cfa9f8574ef01506398" [[package]] name = "libloading" @@ -2102,7 +2190,7 @@ dependencies = [ "proc-macro2", "quote", "regex-syntax 0.6.29", - "syn 2.0.87", + "syn", ] [[package]] @@ -2159,7 +2247,7 @@ checksum = "49e7bc1560b95a3c4a25d03de42fe76ca718ab92d1a22a55b9b4cf67b3ae635c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -2407,7 +2495,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -2448,7 +2536,7 @@ checksum = "3c0f5fad0874fc7abcd4d750e76917eaebbecaa2c20bde22e1dbeeba8beb758c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -2562,7 +2650,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "64d1ec885c64d0457d564db4ec299b2dae3f9c02808b8ad9c3a089c591b18033" dependencies = [ "proc-macro2", - "syn 2.0.87", + "syn", ] [[package]] @@ -2594,7 +2682,7 @@ checksum = "440f724eba9f6996b75d63681b0a92b06947f1457076d503a4d2e2c8f56442b8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -2624,7 +2712,7 @@ dependencies = [ "prost", "prost-types", "regex", - "syn 2.0.87", + "syn", "tempfile", ] @@ -2638,7 +2726,7 @@ dependencies = [ "itertools 0.12.1", "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -2787,7 +2875,7 @@ checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.4.8", + "regex-automata 0.4.9", "regex-syntax 0.8.5", ] @@ -2802,9 +2890,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.8" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "368758f23274712b504848e9d5a6f010445cc8b87a7cdb4d7cbee666c1288da3" +checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" dependencies = [ "aho-corasick", "memchr", @@ -2887,9 +2975,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.39" +version = "0.38.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "375116bee2be9ed569afe2154ea6a99dfdffd257f533f187498c2a8f5feaf4ee" +checksum = "99e4ea3e1cdc4b559b8e5650f9c8e5998e3e5c1343b4eaf034565f32318d63c0" dependencies = [ "bitflags 2.6.0", "errno", @@ -3039,7 +3127,7 @@ dependencies = [ "proc-macro2", "quote", "serde_derive_internals", - "syn 2.0.87", + "syn", ] [[package]] @@ -3064,11 +3152,10 @@ dependencies = [ [[package]] name = "secrecy" -version = "0.8.0" +version = "0.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9bd1c54ea06cfd2f6b63219704de0b9b4f72dcc2b8fdef820be6cd799780e91e" +checksum = "e891af845473308773346dc847b2c23ee78fe442e0472ac50e22a18a93d3ae5a" dependencies = [ - "serde", "zeroize", ] @@ -3088,9 +3175,9 @@ dependencies = [ [[package]] name = "security-framework-sys" -version = "2.12.0" +version = "2.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea4a292869320c0272d7bc55a5a6aafaff59b4f63404a003887b679a2e05b4b6" +checksum = "fa39c7303dc58b5543c94d22c1766b0d31f2ee58306363ea622b10bbc075eaa2" dependencies = [ "core-foundation-sys", "libc", @@ -3104,9 +3191,9 @@ checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" [[package]] name = "serde" -version = "1.0.214" +version = "1.0.215" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f55c3193aca71c12ad7890f1785d2b73e1b9f63a0bbc353c08ef26fe03fc56b5" +checksum = "6513c1ad0b11a9376da888e3e0baa0077f1aed55c17f50e7b2397136129fb88f" dependencies = [ "serde_derive", ] @@ -3123,13 +3210,13 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.214" +version = "1.0.215" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de523f781f095e28fa605cdce0f8307e451cc0fd14e2eb4cd2e98a355b147766" +checksum = "ad1e866f866923f252f05c889987993144fb74e722403468a4ebd70c3cd756c0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -3140,7 +3227,7 @@ checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -3337,9 +3424,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "syn" -version = "1.0.109" +version = "2.0.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +checksum = "25aa4ce346d03a6dcd68dd8b4010bcb74e54e62c90c573f394c46eae99aba32d" dependencies = [ "proc-macro2", "quote", @@ -3347,15 +3434,10 @@ dependencies = [ ] [[package]] -name = "syn" -version = "2.0.87" +name = "sync_wrapper" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25aa4ce346d03a6dcd68dd8b4010bcb74e54e62c90c573f394c46eae99aba32d" -dependencies = [ - "proc-macro2", - "quote", - "unicode-ident", -] +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" [[package]] name = "synstructure" @@ -3365,14 +3447,14 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] name = "tempfile" -version = "3.13.0" +version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0f2c9fc62d0beef6951ccffd757e241266a2c833136efbe35af6cd2567dca5b" +checksum = "28cce251fcbc87fac86a866eeb0d6c2d536fc16d06f184bb61aeae11aa4cee0c" dependencies = [ "cfg-if", "fastrand", @@ -3398,7 +3480,7 @@ checksum = "f9b53c7124dd88026d5d98a1eb1fd062a578b7d783017c9298825526c7fb6427" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -3417,22 +3499,22 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.68" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02dd99dc800bbb97186339685293e1cc5d9df1f8fae2d0aecd9ff1c77efea892" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.68" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7c61ec9a6f64d2793d8a45faba21efbe3ced62a886d44c36a009b2b519b4c7e" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -3508,9 +3590,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.41.0" +version = "1.41.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "145f3413504347a2be84393cc8a7d2fb4d863b375909ea59f2158261aa258bbb" +checksum = "22cfb5bee7a6a52939ca9224d6ac897bb669134078daa8735560897f69de4d33" dependencies = [ "backtrace", "bytes", @@ -3532,7 +3614,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -3583,6 +3665,21 @@ dependencies = [ "futures-util", "pin-project", "pin-project-lite", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2873938d487c3cfb9aed7546dc9f2711d867c9f90c46b889989a2cb84eba6b4f" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper", "tokio", "tokio-util", "tower-layer", @@ -3592,16 +3689,15 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.5.2" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" +checksum = "8437150ab6bbc8c5f0f519e3d5ed4aa883a83dd4cdd3d1b21f9482936046cb97" dependencies = [ - "base64 0.21.7", + "base64 0.22.1", "bitflags 2.6.0", "bytes", "http 1.1.0", "http-body 1.0.1", - "http-body-util", "mime", "pin-project-lite", "tower-layer", @@ -3641,7 +3737,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -3817,7 +3913,7 @@ checksum = "6a511871dc5de990a3b2a0e715facfbc5da848c0c0395597a1415029fb7c250a" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -3867,7 +3963,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.87", + "syn", "wasm-bindgen-shared", ] @@ -3889,7 +3985,7 @@ checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -4082,7 +4178,7 @@ checksum = "28cc31741b18cb6f1d5ff12f5b7523e3d6eb0852bbbad19d73905511d9849b95" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", "synstructure", ] @@ -4104,7 +4200,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -4124,7 +4220,7 @@ checksum = "0ea7b4a3637ea8669cedf0f1fd5c286a17f3de97b8dd5a70a6c167a1730e63a5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", "synstructure", ] @@ -4145,7 +4241,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -4167,7 +4263,7 @@ checksum = "6eafa6dfb17584ea3e2bd6e76e0cc15ad7af12b09abdd1ca55961bed9b1063c6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -4354,7 +4450,7 @@ dependencies = [ "tempfile", "tokio", "tokio-rustls", - "tower", + "tower 0.4.13", "tracing", "tracing-subscriber", "vise-exporter", @@ -4414,7 +4510,7 @@ dependencies = [ "prost-reflect", "protox", "quote", - "syn 2.0.87", + "syn", ] [[package]] diff --git a/node/Cargo.toml b/node/Cargo.toml index 52f8bc3a..5d058b24 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -1,8 +1,8 @@ [workspace] members = [ - "actors/bft", - "actors/executor", - "actors/network", + "components/bft", + "components/executor", + "components/network", "libs/concurrency", "libs/crypto", "libs/protobuf", @@ -26,10 +26,10 @@ version = "0.5.0" [workspace.dependencies] # Crates from this repo. -zksync_consensus_bft = { version = "=0.5.0", path = "actors/bft" } +zksync_consensus_bft = { version = "=0.5.0", path = "components/bft" } zksync_consensus_crypto = { version = "=0.5.0", path = "libs/crypto" } -zksync_consensus_executor = { version = "=0.5.0", path = "actors/executor" } -zksync_consensus_network = { version = "=0.5.0", path = "actors/network" } +zksync_consensus_executor = { version = "=0.5.0", path = "components/executor" } +zksync_consensus_network = { version = "=0.5.0", path = "components/network" } zksync_consensus_roles = { version = "=0.5.0", path = "libs/roles" } zksync_consensus_storage = { version = "=0.5.0", path = "libs/storage" } zksync_consensus_tools = { version = "=0.5.0", path = "tools" } @@ -73,8 +73,8 @@ hyper-util = { version = "0.1", features = ["full"] } im = "15.1.0" jsonrpsee = { version = "0.23.0", features = ["http-client", "server"] } k256 = { version = "0.13", features = ["ecdsa"] } -k8s-openapi = { version = "0.22.0", features = ["latest"] } -kube = { version = "0.91.0", features = ["derive", "runtime"] } +k8s-openapi = { version = "0.23.0", features = ["latest"] } +kube = { version = "0.96.0", features = ["derive", "runtime"] } num-bigint = "0.4.4" num-traits = "0.2.18" once_cell = "1.17.1" diff --git a/node/actors/bft/src/io.rs b/node/actors/bft/src/io.rs deleted file mode 100644 index b29dc758..00000000 --- a/node/actors/bft/src/io.rs +++ /dev/null @@ -1,23 +0,0 @@ -//! Input and output messages for the Consensus actor. These are processed by the executor actor. - -use zksync_consensus_network::io::{ConsensusInputMessage, ConsensusReq}; - -/// All the messages that other actors can send to the Consensus actor. -#[derive(Debug)] -pub enum InputMessage { - /// Message types from the Network actor. - Network(ConsensusReq), -} - -/// All the messages that the Consensus actor sends to other actors. -#[derive(Debug, PartialEq)] -pub enum OutputMessage { - /// Message types to the Network actor. - Network(ConsensusInputMessage), -} - -impl From for OutputMessage { - fn from(message: ConsensusInputMessage) -> Self { - Self::Network(message) - } -} diff --git a/node/actors/bft/src/lib.rs b/node/actors/bft/src/lib.rs deleted file mode 100644 index 0075b0df..00000000 --- a/node/actors/bft/src/lib.rs +++ /dev/null @@ -1,102 +0,0 @@ -//! This crate contains the consensus actor, which is responsible for handling the logic that allows us to reach agreement on blocks. -//! It uses a new cosnensus algorithm developed at Matter Labs, called ChonkyBFT. You can find the specification of the algorithm [here](../../../../spec). - -use crate::io::{InputMessage, OutputMessage}; -use anyhow::Context; -pub use config::Config; -use std::sync::Arc; -use tracing::Instrument; -use zksync_concurrency::{ctx, error::Wrap as _, scope, sync}; -use zksync_consensus_roles::validator; -use zksync_consensus_utils::pipe::ActorPipe; - -/// This module contains the implementation of the ChonkyBFT algorithm. -mod chonky_bft; -mod config; -pub mod io; -mod metrics; -pub mod testonly; -#[cfg(test)] -mod tests; - -/// Protocol version of this BFT implementation. -pub const PROTOCOL_VERSION: validator::ProtocolVersion = validator::ProtocolVersion::CURRENT; - -/// Payload proposal and verification trait. -#[async_trait::async_trait] -pub trait PayloadManager: std::fmt::Debug + Send + Sync { - /// Used by leader to propose a payload for the next block. - async fn propose( - &self, - ctx: &ctx::Ctx, - number: validator::BlockNumber, - ) -> ctx::Result; - /// Used by replica to verify a payload for the next block proposed by the leader. - async fn verify( - &self, - ctx: &ctx::Ctx, - number: validator::BlockNumber, - payload: &validator::Payload, - ) -> ctx::Result<()>; -} - -impl Config { - /// Starts the bft actor. It will start running, processing incoming messages and - /// sending output messages. - pub async fn run( - self, - ctx: &ctx::Ctx, - mut pipe: ActorPipe, - ) -> anyhow::Result<()> { - let genesis = self.block_store.genesis(); - anyhow::ensure!(genesis.protocol_version == validator::ProtocolVersion::CURRENT); - genesis.verify().context("genesis().verify()")?; - - if let Some(prev) = genesis.first_block.prev() { - tracing::info!("Waiting for the pre-fork blocks to be persisted"); - if let Err(ctx::Canceled) = self.block_store.wait_until_persisted(ctx, prev).await { - return Ok(()); - } - } - - let cfg = Arc::new(self); - let (proposer_sender, proposer_receiver) = sync::watch::channel(None); - let (replica, replica_send) = - chonky_bft::StateMachine::start(ctx, cfg.clone(), pipe.send.clone(), proposer_sender) - .await?; - - let res = scope::run!(ctx, |ctx, s| async { - s.spawn_bg(async { replica.run(ctx).await.wrap("replica.run()") }); - s.spawn_bg(async { - chonky_bft::proposer::run_proposer(ctx, cfg.clone(), pipe.send, proposer_receiver) - .await - .wrap("run_proposer()") - }); - - tracing::info!("Starting consensus actor {:?}", cfg.secret_key.public()); - - // This is the infinite loop where the consensus actually runs. The validator waits for - // a message from the network and processes it accordingly. - loop { - async { - let InputMessage::Network(msg) = pipe - .recv - .recv(ctx) - .instrument(tracing::info_span!("wait_for_message")) - .await?; - - replica_send.send(msg); - - ctx::Ok(()) - } - .instrument(tracing::info_span!("bft_iter")) - .await?; - } - }) - .await; - match res { - Ok(()) | Err(ctx::Error::Canceled(_)) => Ok(()), - Err(ctx::Error::Internal(err)) => Err(err), - } - } -} diff --git a/node/actors/executor/src/io.rs b/node/actors/executor/src/io.rs deleted file mode 100644 index e6fb7dd4..00000000 --- a/node/actors/executor/src/io.rs +++ /dev/null @@ -1,70 +0,0 @@ -//! Module to manage the communication between actors. It simply converts and forwards messages from and to each different actor. -use zksync_concurrency::{ - ctx::{self, channel}, - scope, -}; -use zksync_consensus_bft::io::{ - InputMessage as ConsensusInputMessage, OutputMessage as ConsensusOutputMessage, -}; -use zksync_consensus_network::io::{ - InputMessage as NetworkInputMessage, OutputMessage as NetworkOutputMessage, -}; -use zksync_consensus_utils::pipe::DispatcherPipe; - -/// The IO dispatcher, it is the main struct to handle actor messages. It simply contains a sender and a receiver for -/// a pair of channels for each actor. This of course allows us to send and receive messages to and from each actor. -#[derive(Debug)] -pub(super) struct Dispatcher { - consensus_input: channel::UnboundedSender, - consensus_output: channel::UnboundedReceiver, - network_input: channel::UnboundedSender, - network_output: channel::UnboundedReceiver, -} - -impl Dispatcher { - /// Creates a new IO Dispatcher. - pub(super) fn new( - consensus_pipe: DispatcherPipe, - network_pipe: DispatcherPipe, - ) -> Self { - Dispatcher { - consensus_input: consensus_pipe.send, - consensus_output: consensus_pipe.recv, - network_input: network_pipe.send, - network_output: network_pipe.recv, - } - } - - /// Method to start the IO dispatcher. It is simply a loop to receive messages from the actors and then forward them. - pub(super) async fn run(mut self, ctx: &ctx::Ctx) { - let _: ctx::OrCanceled<()> = scope::run!(ctx, |ctx, s| async { - // Start a task to handle the messages from the consensus actor. - s.spawn(async { - while let Ok(msg) = self.consensus_output.recv(ctx).await { - match msg { - ConsensusOutputMessage::Network(message) => { - self.network_input.send(message.into()); - } - } - } - Ok(()) - }); - - // Start a task to handle the messages from the network actor. - s.spawn(async { - while let Ok(msg) = self.network_output.recv(ctx).await { - match msg { - NetworkOutputMessage::Consensus(message) => { - self.consensus_input - .send(ConsensusInputMessage::Network(message)); - } - } - } - Ok(()) - }); - - Ok(()) - }) - .await; - } -} diff --git a/node/actors/network/src/io.rs b/node/actors/network/src/io.rs deleted file mode 100644 index 6166deef..00000000 --- a/node/actors/network/src/io.rs +++ /dev/null @@ -1,40 +0,0 @@ -#![allow(missing_docs)] -use zksync_concurrency::oneshot; -use zksync_consensus_roles::validator; - -/// All the messages that other actors can send to the Network actor. -#[derive(Debug)] -pub enum InputMessage { - /// Message types from the Consensus actor. - Consensus(ConsensusInputMessage), -} - -/// Message types from the Consensus actor. -#[derive(Debug, PartialEq)] -pub struct ConsensusInputMessage { - pub message: validator::Signed, -} - -impl From for InputMessage { - fn from(message: ConsensusInputMessage) -> Self { - Self::Consensus(message) - } -} - -/// Consensus message received from the network. -#[derive(Debug)] -pub struct ConsensusReq { - /// Payload. - pub msg: validator::Signed, - /// Channel that should be used to notify network actor that - /// processing of this message has been completed. - /// Used for rate limiting. - pub ack: oneshot::Sender<()>, -} - -/// All the messages that the Network actor sends to other actors. -#[derive(Debug)] -pub enum OutputMessage { - /// Message to the Consensus actor. - Consensus(ConsensusReq), -} diff --git a/node/actors/network/src/state.rs b/node/actors/network/src/state.rs deleted file mode 100644 index 529b999a..00000000 --- a/node/actors/network/src/state.rs +++ /dev/null @@ -1 +0,0 @@ -//! Network actor maintaining a pool of outbound and inbound connections to other nodes. diff --git a/node/actors/bft/Cargo.toml b/node/components/bft/Cargo.toml similarity index 94% rename from node/actors/bft/Cargo.toml rename to node/components/bft/Cargo.toml index 70d09bd3..202c0657 100644 --- a/node/actors/bft/Cargo.toml +++ b/node/components/bft/Cargo.toml @@ -1,6 +1,6 @@ [package] authors.workspace = true -description = "ZKsync consensus bft actor" +description = "ZKsync consensus BFT component" edition.workspace = true homepage.workspace = true keywords.workspace = true diff --git a/node/actors/bft/src/chonky_bft/block.rs b/node/components/bft/src/chonky_bft/block.rs similarity index 100% rename from node/actors/bft/src/chonky_bft/block.rs rename to node/components/bft/src/chonky_bft/block.rs diff --git a/node/actors/bft/src/chonky_bft/commit.rs b/node/components/bft/src/chonky_bft/commit.rs similarity index 100% rename from node/actors/bft/src/chonky_bft/commit.rs rename to node/components/bft/src/chonky_bft/commit.rs diff --git a/node/actors/bft/src/chonky_bft/mod.rs b/node/components/bft/src/chonky_bft/mod.rs similarity index 85% rename from node/actors/bft/src/chonky_bft/mod.rs rename to node/components/bft/src/chonky_bft/mod.rs index 9c8e7790..71340dbb 100644 --- a/node/actors/bft/src/chonky_bft/mod.rs +++ b/node/components/bft/src/chonky_bft/mod.rs @@ -1,17 +1,10 @@ -use crate::{io::OutputMessage, metrics, Config}; +use crate::{metrics, Config, FromNetworkMessage, ToNetworkMessage}; use std::{ cmp::max, collections::{BTreeMap, HashMap}, sync::Arc, }; -use zksync_concurrency::{ - ctx, - error::Wrap as _, - metrics::LatencyHistogramExt as _, - sync::{self, prunable_mpsc::SelectionFunctionResult}, - time, -}; -use zksync_consensus_network::io::ConsensusReq; +use zksync_concurrency::{ctx, error::Wrap as _, metrics::LatencyHistogramExt as _, sync, time}; use zksync_consensus_roles::validator::{self, ConsensusMsg}; mod block; @@ -35,13 +28,13 @@ pub(crate) const VIEW_TIMEOUT_DURATION: time::Duration = time::Duration::millise pub(crate) struct StateMachine { /// Consensus configuration. pub(crate) config: Arc, - /// Pipe through which replica sends network messages. - pub(super) outbound_pipe: ctx::channel::UnboundedSender, - /// Pipe through which replica receives network requests. - pub(crate) inbound_pipe: sync::prunable_mpsc::Receiver, + /// Channel through which replica sends network messages. + pub(super) outbound_channel: ctx::channel::UnboundedSender, + /// Channel through which replica receives network requests. + pub(crate) inbound_channel: sync::prunable_mpsc::Receiver, /// The sender part of the proposer watch channel. This is used to notify the proposer loop /// and send the needed justification. - pub(crate) proposer_pipe: sync::watch::Sender>, + pub(crate) proposer_sender: sync::watch::Sender>, /// The current view number. pub(crate) view_number: validator::ViewNumber, @@ -83,9 +76,10 @@ impl StateMachine { pub(crate) async fn start( ctx: &ctx::Ctx, config: Arc, - outbound_pipe: ctx::channel::UnboundedSender, - proposer_pipe: sync::watch::Sender>, - ) -> ctx::Result<(Self, sync::prunable_mpsc::Sender)> { + outbound_channel: ctx::channel::UnboundedSender, + inbound_channel: sync::prunable_mpsc::Receiver, + proposer_sender: sync::watch::Sender>, + ) -> ctx::Result { let backup = config.replica_store.state(ctx).await?; let mut block_proposal_cache: BTreeMap<_, HashMap<_, _>> = BTreeMap::new(); @@ -96,16 +90,11 @@ impl StateMachine { .insert(proposal.payload.hash(), proposal.payload); } - let (send, recv) = sync::prunable_mpsc::channel( - StateMachine::inbound_filter_predicate, - StateMachine::inbound_selection_function, - ); - let this = Self { config, - outbound_pipe, - inbound_pipe: recv, - proposer_pipe, + outbound_channel, + inbound_channel, + proposer_sender, view_number: backup.view, phase: backup.phase, high_vote: backup.high_vote, @@ -120,7 +109,7 @@ impl StateMachine { view_start: ctx.now(), }; - Ok((this, send)) + Ok(this) } /// Runs a loop to process incoming messages (may be `None` if the channel times out while waiting for a message). @@ -140,7 +129,7 @@ impl StateMachine { // Main loop. loop { let recv = self - .inbound_pipe + .inbound_channel .recv(&ctx.with_deadline(self.view_timeout)) .await; @@ -274,34 +263,12 @@ impl StateMachine { }; metrics::METRICS.message_processing_latency[&label].observe_latency(ctx.now() - now); - // Notify network actor that the message has been processed. + // Notify network component that the message has been processed. // Ignore sending error. let _ = req.ack.send(()); } } - fn inbound_filter_predicate(new_req: &ConsensusReq) -> bool { - // Verify message signature - new_req.msg.verify().is_ok() - } - - fn inbound_selection_function( - old_req: &ConsensusReq, - new_req: &ConsensusReq, - ) -> SelectionFunctionResult { - if old_req.msg.key != new_req.msg.key || old_req.msg.msg.label() != new_req.msg.msg.label() - { - SelectionFunctionResult::Keep - } else { - // Discard older message - if old_req.msg.msg.view().number < new_req.msg.msg.view().number { - SelectionFunctionResult::DiscardOld - } else { - SelectionFunctionResult::DiscardNew - } - } - } - /// Processes a (already verified) CommitQC. It bumps the local high_commit_qc and if /// we have the proposal corresponding to this qc, we save the corresponding block to DB. pub(crate) async fn process_commit_qc( diff --git a/node/actors/bft/src/chonky_bft/new_view.rs b/node/components/bft/src/chonky_bft/new_view.rs similarity index 98% rename from node/actors/bft/src/chonky_bft/new_view.rs rename to node/components/bft/src/chonky_bft/new_view.rs index ff401ebb..c8107dd1 100644 --- a/node/actors/bft/src/chonky_bft/new_view.rs +++ b/node/components/bft/src/chonky_bft/new_view.rs @@ -115,7 +115,7 @@ impl StateMachine { // Update the state machine. self.view_number = view; self.phase = validator::Phase::Prepare; - self.proposer_pipe + self.proposer_sender .send(Some(self.get_justification())) .expect("justification_watch.send() failed"); @@ -139,7 +139,7 @@ impl StateMachine { }, )), }; - self.outbound_pipe.send(output_message.into()); + self.outbound_channel.send(output_message); // Log the event and update the metrics. tracing::info!("Starting view {}", self.view_number); diff --git a/node/actors/bft/src/chonky_bft/proposal.rs b/node/components/bft/src/chonky_bft/proposal.rs similarity index 99% rename from node/actors/bft/src/chonky_bft/proposal.rs rename to node/components/bft/src/chonky_bft/proposal.rs index b89f09d8..7da6962c 100644 --- a/node/actors/bft/src/chonky_bft/proposal.rs +++ b/node/components/bft/src/chonky_bft/proposal.rs @@ -250,7 +250,7 @@ impl StateMachine { .secret_key .sign_msg(validator::ConsensusMsg::ReplicaCommit(commit_vote)), }; - self.outbound_pipe.send(output_message.into()); + self.outbound_channel.send(output_message); Ok(()) } diff --git a/node/actors/bft/src/chonky_bft/proposer.rs b/node/components/bft/src/chonky_bft/proposer.rs similarity index 94% rename from node/actors/bft/src/chonky_bft/proposer.rs rename to node/components/bft/src/chonky_bft/proposer.rs index 4a6dd843..ec9e0e33 100644 --- a/node/actors/bft/src/chonky_bft/proposer.rs +++ b/node/components/bft/src/chonky_bft/proposer.rs @@ -1,4 +1,4 @@ -use crate::{io::OutputMessage, metrics, Config}; +use crate::{metrics, Config, ToNetworkMessage}; use std::sync::Arc; use zksync_concurrency::{ctx, error::Wrap as _, sync}; use zksync_consensus_network::io::ConsensusInputMessage; @@ -11,7 +11,7 @@ use super::VIEW_TIMEOUT_DURATION; pub(crate) async fn run_proposer( ctx: &ctx::Ctx, cfg: Arc, - outbound_pipe: ctx::channel::UnboundedSender, + network_sender: ctx::channel::UnboundedSender, mut justification_watch: sync::watch::Receiver>, ) -> ctx::Result<()> { loop { @@ -50,7 +50,7 @@ pub(crate) async fn run_proposer( .secret_key .sign_msg(validator::ConsensusMsg::LeaderProposal(proposal)); - outbound_pipe.send(ConsensusInputMessage { message: msg }.into()); + network_sender.send(ConsensusInputMessage { message: msg }); } } diff --git a/node/actors/bft/src/chonky_bft/testonly.rs b/node/components/bft/src/chonky_bft/testonly.rs similarity index 90% rename from node/actors/bft/src/chonky_bft/testonly.rs rename to node/components/bft/src/chonky_bft/testonly.rs index 2367d1e7..686c971b 100644 --- a/node/actors/bft/src/chonky_bft/testonly.rs +++ b/node/components/bft/src/chonky_bft/testonly.rs @@ -1,15 +1,13 @@ use crate::testonly::RandomPayload; use crate::{ chonky_bft::{self, commit, new_view, proposal, timeout, StateMachine}, - io::OutputMessage, Config, PayloadManager, }; +use crate::{create_input_channel, FromNetworkMessage, ToNetworkMessage}; use assert_matches::assert_matches; use std::sync::Arc; use zksync_concurrency::sync::prunable_mpsc; use zksync_concurrency::{ctx, sync}; -use zksync_consensus_network as network; -use zksync_consensus_network::io::ConsensusReq; use zksync_consensus_roles::validator; use zksync_consensus_storage::{ testonly::{in_memory, TestMemoryStorage}, @@ -28,9 +26,9 @@ pub(crate) const MAX_PAYLOAD_SIZE: usize = 1000; pub(crate) struct UTHarness { pub(crate) replica: StateMachine, pub(crate) keys: Vec, - pub(crate) outbound_pipe: ctx::channel::UnboundedReceiver, - pub(crate) inbound_pipe: prunable_mpsc::Sender, - pub(crate) _proposer_pipe: sync::watch::Receiver>, + pub(crate) outbound_channel: ctx::channel::UnboundedReceiver, + pub(crate) inbound_channel: prunable_mpsc::Sender, + pub(crate) _proposer_channel: sync::watch::Receiver>, } impl UTHarness { @@ -63,7 +61,8 @@ impl UTHarness { let rng = &mut ctx.rng(); let setup = validator::testonly::Setup::new(rng, num_validators); let store = TestMemoryStorage::new(ctx, &setup).await; - let (send, recv) = ctx::channel::unbounded(); + let (output_channel_send, output_channel_recv) = ctx::channel::unbounded(); + let (input_channel_send, input_channel_recv) = create_input_channel(); let (proposer_sender, proposer_receiver) = sync::watch::channel(None); let cfg = Arc::new(Config { @@ -73,16 +72,21 @@ impl UTHarness { payload_manager, max_payload_size: MAX_PAYLOAD_SIZE, }); - let (replica, input_pipe) = - StateMachine::start(ctx, cfg.clone(), send.clone(), proposer_sender) - .await - .unwrap(); + let replica = StateMachine::start( + ctx, + cfg.clone(), + output_channel_send.clone(), + input_channel_recv, + proposer_sender, + ) + .await + .unwrap(); let mut this = UTHarness { replica, keys: setup.validator_keys.clone(), - outbound_pipe: recv, - inbound_pipe: input_pipe, - _proposer_pipe: proposer_receiver, + outbound_channel: output_channel_recv, + inbound_channel: input_channel_send, + _proposer_channel: proposer_receiver, }; let timeout = this.new_replica_timeout(ctx).await; @@ -290,17 +294,15 @@ impl UTHarness { } pub(crate) fn send(&self, msg: validator::Signed) { - self.inbound_pipe.send(ConsensusReq { + self.inbound_channel.send(FromNetworkMessage { msg, ack: zksync_concurrency::oneshot::channel().0, }); } fn try_recv>(&mut self) -> Option> { - self.outbound_pipe.try_recv().map(|message| match message { - OutputMessage::Network(network::io::ConsensusInputMessage { message, .. }) => { - message.cast().unwrap() - } - }) + self.outbound_channel + .try_recv() + .map(|message| message.message.cast().unwrap()) } } diff --git a/node/actors/bft/src/chonky_bft/tests/commit.rs b/node/components/bft/src/chonky_bft/tests/commit.rs similarity index 97% rename from node/actors/bft/src/chonky_bft/tests/commit.rs rename to node/components/bft/src/chonky_bft/tests/commit.rs index 0d65d110..3ee2c911 100644 --- a/node/actors/bft/src/chonky_bft/tests/commit.rs +++ b/node/components/bft/src/chonky_bft/tests/commit.rs @@ -359,7 +359,10 @@ async fn replica_commit_filter_functions_test() { util.send(msg.clone()); // Validate only correct message is received - assert_eq!(util.replica.inbound_pipe.recv(ctx).await.unwrap().msg, msg); + assert_eq!( + util.replica.inbound_channel.recv(ctx).await.unwrap().msg, + msg + ); // Send a msg with view number = 2 let mut replica_commit_from_view_2 = replica_commit.clone(); @@ -393,7 +396,7 @@ async fn replica_commit_filter_functions_test() { // Validate only message from view 4 is received assert_eq!( - util.replica.inbound_pipe.recv(ctx).await.unwrap().msg, + util.replica.inbound_channel.recv(ctx).await.unwrap().msg, msg_from_view_4 ); @@ -409,13 +412,13 @@ async fn replica_commit_filter_functions_test() { )); util.send(msg_from_validator_1.clone()); - //Validate both are present in the inbound_pipe + //Validate both are present in the inbound_channel. assert_eq!( - util.replica.inbound_pipe.recv(ctx).await.unwrap().msg, + util.replica.inbound_channel.recv(ctx).await.unwrap().msg, msg_from_validator_0 ); assert_eq!( - util.replica.inbound_pipe.recv(ctx).await.unwrap().msg, + util.replica.inbound_channel.recv(ctx).await.unwrap().msg, msg_from_validator_1 ); diff --git a/node/actors/bft/src/chonky_bft/tests/mod.rs b/node/components/bft/src/chonky_bft/tests/mod.rs similarity index 100% rename from node/actors/bft/src/chonky_bft/tests/mod.rs rename to node/components/bft/src/chonky_bft/tests/mod.rs diff --git a/node/actors/bft/src/chonky_bft/tests/new_view.rs b/node/components/bft/src/chonky_bft/tests/new_view.rs similarity index 100% rename from node/actors/bft/src/chonky_bft/tests/new_view.rs rename to node/components/bft/src/chonky_bft/tests/new_view.rs diff --git a/node/actors/bft/src/chonky_bft/tests/proposal.rs b/node/components/bft/src/chonky_bft/tests/proposal.rs similarity index 100% rename from node/actors/bft/src/chonky_bft/tests/proposal.rs rename to node/components/bft/src/chonky_bft/tests/proposal.rs diff --git a/node/actors/bft/src/chonky_bft/tests/timeout.rs b/node/components/bft/src/chonky_bft/tests/timeout.rs similarity index 97% rename from node/actors/bft/src/chonky_bft/tests/timeout.rs rename to node/components/bft/src/chonky_bft/tests/timeout.rs index 0ee2dc9f..af778b69 100644 --- a/node/actors/bft/src/chonky_bft/tests/timeout.rs +++ b/node/components/bft/src/chonky_bft/tests/timeout.rs @@ -372,7 +372,10 @@ async fn replica_timeout_filter_functions_test() { util.send(msg.clone()); // Validate only correct message is received - assert_eq!(util.replica.inbound_pipe.recv(ctx).await.unwrap().msg, msg); + assert_eq!( + util.replica.inbound_channel.recv(ctx).await.unwrap().msg, + msg + ); // Send a msg with view number = 2 let mut replica_timeout_from_view_2 = replica_timeout.clone(); @@ -406,7 +409,7 @@ async fn replica_timeout_filter_functions_test() { // Validate only message from view 4 is received assert_eq!( - util.replica.inbound_pipe.recv(ctx).await.unwrap().msg, + util.replica.inbound_channel.recv(ctx).await.unwrap().msg, msg_from_view_4 ); @@ -422,13 +425,13 @@ async fn replica_timeout_filter_functions_test() { )); util.send(msg_from_validator_1.clone()); - // Validate both are present in the inbound_pipe + // Validate both are present in the inbound_channel assert_eq!( - util.replica.inbound_pipe.recv(ctx).await.unwrap().msg, + util.replica.inbound_channel.recv(ctx).await.unwrap().msg, msg_from_validator_0 ); assert_eq!( - util.replica.inbound_pipe.recv(ctx).await.unwrap().msg, + util.replica.inbound_channel.recv(ctx).await.unwrap().msg, msg_from_validator_1 ); diff --git a/node/actors/bft/src/chonky_bft/timeout.rs b/node/components/bft/src/chonky_bft/timeout.rs similarity index 99% rename from node/actors/bft/src/chonky_bft/timeout.rs rename to node/components/bft/src/chonky_bft/timeout.rs index 189e3c16..43bb4919 100644 --- a/node/actors/bft/src/chonky_bft/timeout.rs +++ b/node/components/bft/src/chonky_bft/timeout.rs @@ -176,7 +176,7 @@ impl StateMachine { )), }; - self.outbound_pipe.send(output_message.into()); + self.outbound_channel.send(output_message); // Log the event. tracing::info!("Timed out at view {}", self.view_number); diff --git a/node/actors/bft/src/config.rs b/node/components/bft/src/config.rs similarity index 95% rename from node/actors/bft/src/config.rs rename to node/components/bft/src/config.rs index 77938549..ea83d2fe 100644 --- a/node/actors/bft/src/config.rs +++ b/node/components/bft/src/config.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use zksync_consensus_roles::validator; use zksync_consensus_storage as storage; -/// Configuration of the bft actor. +/// Configuration of the bft component. #[derive(Debug)] pub struct Config { /// The validator's secret key. diff --git a/node/components/bft/src/lib.rs b/node/components/bft/src/lib.rs new file mode 100644 index 00000000..fa98ae4b --- /dev/null +++ b/node/components/bft/src/lib.rs @@ -0,0 +1,135 @@ +//! This crate contains the consensus component, which is responsible for handling the logic that allows us to reach agreement on blocks. +//! It uses a new cosnensus algorithm developed at Matter Labs, called ChonkyBFT. You can find the specification of the algorithm [here](../../../../spec). + +use anyhow::Context; +pub use config::Config; +use std::sync::Arc; +use zksync_concurrency::{ + ctx, + error::Wrap as _, + scope, + sync::{self, prunable_mpsc::SelectionFunctionResult}, +}; +use zksync_consensus_roles::validator; + +/// This module contains the implementation of the ChonkyBFT algorithm. +mod chonky_bft; +mod config; +mod metrics; +pub mod testonly; +#[cfg(test)] +mod tests; + +/// Protocol version of this BFT implementation. +pub const PROTOCOL_VERSION: validator::ProtocolVersion = validator::ProtocolVersion::CURRENT; + +// Renaming network messages for clarity. +#[allow(missing_docs)] +pub type ToNetworkMessage = zksync_consensus_network::io::ConsensusInputMessage; +#[allow(missing_docs)] +pub type FromNetworkMessage = zksync_consensus_network::io::ConsensusReq; + +/// Payload proposal and verification trait. +#[async_trait::async_trait] +pub trait PayloadManager: std::fmt::Debug + Send + Sync { + /// Used by leader to propose a payload for the next block. + async fn propose( + &self, + ctx: &ctx::Ctx, + number: validator::BlockNumber, + ) -> ctx::Result; + /// Used by replica to verify a payload for the next block proposed by the leader. + async fn verify( + &self, + ctx: &ctx::Ctx, + number: validator::BlockNumber, + payload: &validator::Payload, + ) -> ctx::Result<()>; +} + +impl Config { + /// Starts the bft component. It will start running, processing incoming messages and + /// sending output messages. + pub async fn run( + self, + ctx: &ctx::Ctx, + outbound_channel: ctx::channel::UnboundedSender, + inbound_channel: sync::prunable_mpsc::Receiver, + ) -> anyhow::Result<()> { + let genesis = self.block_store.genesis(); + anyhow::ensure!(genesis.protocol_version == validator::ProtocolVersion::CURRENT); + genesis.verify().context("genesis().verify()")?; + + if let Some(prev) = genesis.first_block.prev() { + tracing::info!("Waiting for the pre-fork blocks to be persisted"); + if let Err(ctx::Canceled) = self.block_store.wait_until_persisted(ctx, prev).await { + return Ok(()); + } + } + + let cfg = Arc::new(self); + let (proposer_sender, proposer_receiver) = sync::watch::channel(None); + let replica = chonky_bft::StateMachine::start( + ctx, + cfg.clone(), + outbound_channel.clone(), + inbound_channel, + proposer_sender, + ) + .await?; + + let res = scope::run!(ctx, |ctx, s| async { + tracing::info!("Starting consensus component {:?}", cfg.secret_key.public()); + + s.spawn(async { replica.run(ctx).await.wrap("replica.run()") }); + s.spawn_bg(async { + chonky_bft::proposer::run_proposer( + ctx, + cfg.clone(), + outbound_channel, + proposer_receiver, + ) + .await + .wrap("run_proposer()") + }); + + Ok(()) + }) + .await; + match res { + Ok(()) | Err(ctx::Error::Canceled(_)) => Ok(()), + Err(ctx::Error::Internal(err)) => Err(err), + } + } +} + +/// Creates a new input channel for the network messages. +pub fn create_input_channel() -> ( + sync::prunable_mpsc::Sender, + sync::prunable_mpsc::Receiver, +) { + sync::prunable_mpsc::channel(inbound_filter_predicate, inbound_selection_function) +} + +/// Filter predicate for incoming messages. +fn inbound_filter_predicate(new_req: &FromNetworkMessage) -> bool { + // Verify message signature + new_req.msg.verify().is_ok() +} + +/// Selection function for incoming messages. +fn inbound_selection_function( + old_req: &FromNetworkMessage, + new_req: &FromNetworkMessage, +) -> SelectionFunctionResult { + if old_req.msg.key != new_req.msg.key || old_req.msg.msg.label() != new_req.msg.msg.label() { + SelectionFunctionResult::Keep + } else { + // Discard older message + if old_req.msg.msg.view().number < new_req.msg.msg.view().number { + SelectionFunctionResult::DiscardOld + } else { + SelectionFunctionResult::DiscardNew + } + } +} diff --git a/node/actors/bft/src/metrics.rs b/node/components/bft/src/metrics.rs similarity index 100% rename from node/actors/bft/src/metrics.rs rename to node/components/bft/src/metrics.rs diff --git a/node/actors/bft/src/testonly/make.rs b/node/components/bft/src/testonly/make.rs similarity index 79% rename from node/actors/bft/src/testonly/make.rs rename to node/components/bft/src/testonly/make.rs index 13382860..6ef10a0c 100644 --- a/node/actors/bft/src/testonly/make.rs +++ b/node/components/bft/src/testonly/make.rs @@ -1,23 +1,9 @@ //! This module contains utilities that are only meant for testing purposes. -use crate::io::InputMessage; use crate::PayloadManager; -use rand::{distributions::Standard, prelude::Distribution, Rng}; +use rand::Rng; use zksync_concurrency::ctx; -use zksync_concurrency::oneshot; -use zksync_consensus_network::io::ConsensusReq; use zksync_consensus_roles::validator; -// Generates a random InputMessage. -impl Distribution for Standard { - fn sample(&self, rng: &mut R) -> InputMessage { - let (send, _) = oneshot::channel(); - InputMessage::Network(ConsensusReq { - msg: rng.gen(), - ack: send, - }) - } -} - /// Produces random payload of a given size. #[derive(Debug)] pub struct RandomPayload(pub usize); diff --git a/node/actors/bft/src/testonly/mod.rs b/node/components/bft/src/testonly/mod.rs similarity index 100% rename from node/actors/bft/src/testonly/mod.rs rename to node/components/bft/src/testonly/mod.rs diff --git a/node/actors/bft/src/testonly/node.rs b/node/components/bft/src/testonly/node.rs similarity index 54% rename from node/actors/bft/src/testonly/node.rs rename to node/components/bft/src/testonly/node.rs index 49b15314..45696136 100644 --- a/node/actors/bft/src/testonly/node.rs +++ b/node/components/bft/src/testonly/node.rs @@ -1,11 +1,11 @@ -use crate::{io, testonly, PayloadManager}; +use crate::{testonly, FromNetworkMessage, PayloadManager, ToNetworkMessage}; use anyhow::Context as _; use std::sync::Arc; -use zksync_concurrency::{ctx, scope}; +use zksync_concurrency::ctx::channel; +use zksync_concurrency::{ctx, scope, sync}; use zksync_consensus_network as network; use zksync_consensus_storage as storage; use zksync_consensus_storage::testonly::in_memory; -use zksync_consensus_utils::pipe; pub(crate) const MAX_PAYLOAD_SIZE: usize = 1000; @@ -41,18 +41,16 @@ impl Node { pub(crate) async fn run( &self, ctx: &ctx::Ctx, - network_pipe: &mut pipe::DispatcherPipe< - network::io::InputMessage, - network::io::OutputMessage, - >, + consensus_receiver: sync::prunable_mpsc::Receiver, + consensus_sender: channel::UnboundedSender, ) -> anyhow::Result<()> { - let net_recv = &mut network_pipe.recv; - let net_send = &mut network_pipe.send; - let (consensus_actor_pipe, consensus_pipe) = pipe::new(); - let mut con_recv = consensus_pipe.recv; - let con_send = consensus_pipe.send; scope::run!(ctx, |ctx, s| async { - // Run the consensus actor + // Create a channel for the consensus component to send messages to the network. + // We will use this extra channel to filter messages depending on the nodes + // behavior. + let (net_send, mut net_recv) = channel::unbounded(); + + // Run the consensus component s.spawn(async { let validator_key = self.net.validator_key.clone().unwrap(); crate::Config { @@ -62,35 +60,19 @@ impl Node { payload_manager: self.behavior.payload_manager(), max_payload_size: MAX_PAYLOAD_SIZE, } - .run(ctx, consensus_actor_pipe) + .run(ctx, net_send, consensus_receiver) .await .context("consensus.run()") }); - // Forward input messages received from the network to the actor; - // turns output from others to input for this instance. - s.spawn(async { - while let Ok(network_message) = net_recv.recv(ctx).await { - match network_message { - network::io::OutputMessage::Consensus(req) => { - con_send.send(io::InputMessage::Network(req)); - } - } - } - Ok(()) - }); - // Forward output messages from the actor to the network; + + // Forward output messages from the consensus to the network; // turns output from this to inputs for others. // Get the next message from the channel. Our response depends on what type of replica we are. - while let Ok(msg) = con_recv.recv(ctx).await { - match msg { - io::OutputMessage::Network(message) => { - let message_to_send = match self.behavior { - Behavior::Offline => continue, - Behavior::Honest | Behavior::HonestNotProposing => message, - }; - net_send.send(message_to_send.into()); - } - } + while let Ok(msg) = net_recv.recv(ctx).await { + match self.behavior { + Behavior::Offline => (), + Behavior::Honest | Behavior::HonestNotProposing => consensus_sender.send(msg), + }; } Ok(()) }) diff --git a/node/actors/bft/src/testonly/run.rs b/node/components/bft/src/testonly/run.rs similarity index 89% rename from node/actors/bft/src/testonly/run.rs rename to node/components/bft/src/testonly/run.rs index f303a0d4..0be99713 100644 --- a/node/actors/bft/src/testonly/run.rs +++ b/node/components/bft/src/testonly/run.rs @@ -1,6 +1,7 @@ use super::{Behavior, Node}; +use crate::{FromNetworkMessage, ToNetworkMessage}; use anyhow::Context as _; -use network::{io, Config}; +use network::Config; use rand::seq::SliceRandom; use std::{ collections::{HashMap, HashSet}, @@ -12,12 +13,11 @@ use zksync_concurrency::{ self, channel::{self, UnboundedReceiver, UnboundedSender}, }, - oneshot, scope, + oneshot, scope, sync, }; use zksync_consensus_network::{self as network}; use zksync_consensus_roles::{validator, validator::testonly::Setup}; use zksync_consensus_storage::{testonly::TestMemoryStorage, BlockStore}; -use zksync_consensus_utils::pipe; pub(crate) enum Network { Real, @@ -199,8 +199,16 @@ async fn run_nodes_real(ctx: &ctx::Ctx, specs: &[Node]) -> anyhow::Result<()> { scope::run!(ctx, |ctx, s| async { let mut nodes = vec![]; for (i, spec) in specs.iter().enumerate() { - let (node, runner) = - network::testonly::Instance::new(spec.net.clone(), spec.block_store.clone()); + let (send, recv) = sync::prunable_mpsc::channel( + crate::inbound_filter_predicate, + crate::inbound_selection_function, + ); + let (node, runner) = network::testonly::Instance::new_with_channel( + spec.net.clone(), + spec.block_store.clone(), + send, + recv, + ); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); nodes.push(node); } @@ -209,8 +217,9 @@ async fn run_nodes_real(ctx: &ctx::Ctx, specs: &[Node]) -> anyhow::Result<()> { let spec = &specs[i]; s.spawn( async { - let mut node = node; - spec.run(ctx, node.pipe()).await + let node = node; + spec.run(ctx, node.consensus_receiver, node.consensus_sender) + .await } .instrument(tracing::info_span!("node", i)), ); @@ -242,14 +251,16 @@ async fn run_nodes_twins( let mut gossip_targets = HashMap::new(); for (i, spec) in specs.iter().enumerate() { - let (actor_pipe, dispatcher_pipe) = pipe::new(); + let (output_channel_send, output_channel_recv) = channel::unbounded(); + let (input_channel_send, input_channel_recv) = crate::create_input_channel(); + let validator_key = spec.net.validator_key.as_ref().unwrap().public(); let port = spec.net.server_addr.port(); validator_ports.entry(validator_key).or_default().push(port); - sends.insert(port, actor_pipe.send); - recvs.push((port, actor_pipe.recv)); + sends.insert(port, input_channel_send); + recvs.push((port, output_channel_recv)); stores.insert(port, spec.block_store.clone()); gossip_targets.insert( port, @@ -264,15 +275,10 @@ async fn run_nodes_twins( .collect(), ); - // Run consensus; the dispatcher pipe is its network connection, which means we can use the actor pipe to: - // * send Output messages from other actors to this consensus instance - // * receive Input messages sent by this consensus to the other actors + // Run consensus node. s.spawn( - async { - let mut network_pipe = dispatcher_pipe; - spec.run(ctx, &mut network_pipe).await - } - .instrument(tracing::info_span!("node", i, port)), + async { spec.run(ctx, input_channel_recv, output_channel_send).await } + .instrument(tracing::info_span!("node", i, port)), ); } // Taking these references is necessary for the `scope::run!` environment lifetime rules to compile @@ -316,7 +322,7 @@ async fn run_nodes_twins( .await } -/// Receive input messages from the consensus actor and send them to the others +/// Receive input messages from the consensus component and send them to the others /// according to the partition schedule of the port associated with this instance. /// /// We have to simulate the gossip layer which isn't instantiated by these tests. @@ -325,15 +331,15 @@ async fn run_nodes_twins( async fn twins_receive_loop( ctx: &ctx::Ctx, router: &PortRouter, - sends: &HashMap>, + sends: &HashMap>, gossip: TwinsGossipConfig<'_>, port: Port, - mut recv: UnboundedReceiver, + mut recv: UnboundedReceiver, ) -> anyhow::Result<()> { let rng = &mut ctx.rng(); // Finalized block number iff this node can gossip to the target and the message contains a QC. - let block_to_gossip = |target_port: Port, msg: &io::OutputMessage| { + let block_to_gossip = |target_port: Port, msg: &FromNetworkMessage| { if !gossip.targets.contains(&target_port) { return None; } @@ -355,10 +361,10 @@ async fn twins_receive_loop( // partitioned, and then unstash all previous A-to-B messages. // * If that wouldn't be acceptable then we could have some kind of global view of stashed messages // and unstash them as soon as someone moves on to a new view. - let mut stashes: HashMap> = HashMap::new(); + let mut stashes: HashMap> = HashMap::new(); // Either stash a message, or unstash all messages and send them to the target along with the new one. - let mut send_or_stash = |can_send: bool, target_port: Port, msg: io::OutputMessage| { + let mut send_or_stash = |can_send: bool, target_port: Port, msg: FromNetworkMessage| { let stash = stashes.entry(target_port).or_default(); let view = output_msg_view_number(&msg); let kind = output_msg_label(&msg); @@ -401,15 +407,13 @@ async fn twins_receive_loop( send(msg); }; - while let Ok(io::InputMessage::Consensus(message)) = recv.recv(ctx).await { + while let Ok(message) = recv.recv(ctx).await { let view = message.message.msg.view().number.0 as usize; let kind = message.message.msg.label(); - let msg = || { - io::OutputMessage::Consensus(io::ConsensusReq { - msg: message.message.clone(), - ack: oneshot::channel().0, - }) + let msg = || FromNetworkMessage { + msg: message.message.clone(), + ack: oneshot::channel().0, }; let can_send = |to| { @@ -490,28 +494,22 @@ async fn twins_gossip_loop( .await } -fn output_msg_view_number(msg: &io::OutputMessage) -> validator::ViewNumber { - match msg { - io::OutputMessage::Consensus(cr) => cr.msg.msg.view().number, - } +fn output_msg_view_number(msg: &FromNetworkMessage) -> validator::ViewNumber { + msg.msg.msg.view().number } -fn output_msg_label(msg: &io::OutputMessage) -> &str { - match msg { - io::OutputMessage::Consensus(cr) => cr.msg.msg.label(), - } +fn output_msg_label(msg: &FromNetworkMessage) -> &str { + msg.msg.msg.label() } -fn output_msg_commit_qc(msg: &io::OutputMessage) -> Option<&validator::CommitQC> { +fn output_msg_commit_qc(msg: &FromNetworkMessage) -> Option<&validator::CommitQC> { use validator::ConsensusMsg; - let justification = match msg { - io::OutputMessage::Consensus(cr) => match &cr.msg.msg { - ConsensusMsg::ReplicaTimeout(msg) => return msg.high_qc.as_ref(), - ConsensusMsg::ReplicaCommit(_) => return None, - ConsensusMsg::ReplicaNewView(msg) => &msg.justification, - ConsensusMsg::LeaderProposal(msg) => &msg.justification, - }, + let justification = match &msg.msg.msg { + ConsensusMsg::ReplicaTimeout(msg) => return msg.high_qc.as_ref(), + ConsensusMsg::ReplicaCommit(_) => return None, + ConsensusMsg::ReplicaNewView(msg) => &msg.justification, + ConsensusMsg::LeaderProposal(msg) => &msg.justification, }; match justification { diff --git a/node/actors/bft/src/testonly/twins/mod.rs b/node/components/bft/src/testonly/twins/mod.rs similarity index 100% rename from node/actors/bft/src/testonly/twins/mod.rs rename to node/components/bft/src/testonly/twins/mod.rs diff --git a/node/actors/bft/src/testonly/twins/partition.rs b/node/components/bft/src/testonly/twins/partition.rs similarity index 100% rename from node/actors/bft/src/testonly/twins/partition.rs rename to node/components/bft/src/testonly/twins/partition.rs diff --git a/node/actors/bft/src/testonly/twins/scenario.rs b/node/components/bft/src/testonly/twins/scenario.rs similarity index 100% rename from node/actors/bft/src/testonly/twins/scenario.rs rename to node/components/bft/src/testonly/twins/scenario.rs diff --git a/node/actors/bft/src/testonly/twins/tests.rs b/node/components/bft/src/testonly/twins/tests.rs similarity index 100% rename from node/actors/bft/src/testonly/twins/tests.rs rename to node/components/bft/src/testonly/twins/tests.rs diff --git a/node/actors/bft/src/tests/mod.rs b/node/components/bft/src/tests/mod.rs similarity index 100% rename from node/actors/bft/src/tests/mod.rs rename to node/components/bft/src/tests/mod.rs diff --git a/node/actors/bft/src/tests/twins.rs b/node/components/bft/src/tests/twins.rs similarity index 98% rename from node/actors/bft/src/tests/twins.rs rename to node/components/bft/src/tests/twins.rs index 45603e51..90be381c 100644 --- a/node/actors/bft/src/tests/twins.rs +++ b/node/components/bft/src/tests/twins.rs @@ -254,12 +254,15 @@ async fn twins_network_w2_twins_w_partitions(num_reseeds: usize) { } /// Run Twins scenario with more twins than tolerable and expect it to fail. +/// Disabled by default since it can take a long time to run depending on +/// the initial seed. +#[ignore] #[tokio::test] async fn twins_network_to_fail() { tokio::time::pause(); assert_matches!( // All twins! To find a conflict quicker. - run_twins(6, 6, TwinsScenarios::Multiple(150)).await, + run_twins(6, 6, TwinsScenarios::Multiple(200)).await, Err(TestError::BlockConflict) ); } diff --git a/node/actors/executor/Cargo.toml b/node/components/executor/Cargo.toml similarity index 93% rename from node/actors/executor/Cargo.toml rename to node/components/executor/Cargo.toml index a652df00..c8feec5f 100644 --- a/node/actors/executor/Cargo.toml +++ b/node/components/executor/Cargo.toml @@ -1,6 +1,6 @@ [package] authors.workspace = true -description = "ZKsync consensus executor actor" +description = "ZKsync consensus executor component" edition.workspace = true homepage.workspace = true keywords.workspace = true diff --git a/node/actors/executor/src/lib.rs b/node/components/executor/src/lib.rs similarity index 87% rename from node/actors/executor/src/lib.rs rename to node/components/executor/src/lib.rs index 0bf64484..7fb27892 100644 --- a/node/actors/executor/src/lib.rs +++ b/node/components/executor/src/lib.rs @@ -1,5 +1,4 @@ //! Library files for the executor. We have it separate from the binary so that we can use these files in the tools crate. -use crate::io::Dispatcher; use anyhow::Context as _; pub use network::{gossip::attestation, RpcConfig}; use std::{ @@ -11,10 +10,8 @@ use zksync_consensus_bft as bft; use zksync_consensus_network as network; use zksync_consensus_roles::{node, validator}; use zksync_consensus_storage::{BlockStore, ReplicaStore}; -use zksync_consensus_utils::pipe; use zksync_protobuf::kB; -mod io; #[cfg(test)] mod tests; @@ -74,7 +71,7 @@ impl Config { } } -/// Executor allowing to spin up all actors necessary for a consensus node. +/// Executor allowing to spin up all components necessary for a consensus node. #[derive(Debug)] pub struct Executor { /// General-purpose executor configuration. @@ -113,22 +110,17 @@ impl Executor { pub async fn run(self, ctx: &ctx::Ctx) -> anyhow::Result<()> { let network_config = self.network_config(); - // Generate the communication pipes. We have one for each actor. - let (consensus_actor_pipe, consensus_dispatcher_pipe) = pipe::new(); - let (network_actor_pipe, network_dispatcher_pipe) = pipe::new(); - // Create the IO dispatcher. - let dispatcher = Dispatcher::new(consensus_dispatcher_pipe, network_dispatcher_pipe); + // Generate the communication channels. We have one for each component. + let (consensus_send, consensus_recv) = bft::create_input_channel(); + let (network_send, network_recv) = ctx::channel::unbounded(); - tracing::debug!("Starting actors in separate threads."); + tracing::debug!("Starting components in separate threads."); scope::run!(ctx, |ctx, s| async { - s.spawn(async { - dispatcher.run(ctx).await; - Ok(()) - }); let (net, runner) = network::Network::new( network_config, self.block_store.clone(), - network_actor_pipe, + consensus_send, + network_recv, self.attestation, ); net.register_metrics(); @@ -143,7 +135,7 @@ impl Executor { }); } - // Run the bft actor iff this node is an active validator. + // Run the bft component iff this node is an active validator. let Some(validator) = self.validator else { tracing::info!("Running the node in non-validator mode."); return Ok(()); @@ -166,7 +158,7 @@ impl Executor { payload_manager: validator.payload_manager, max_payload_size: self.config.max_payload_size, } - .run(ctx, consensus_actor_pipe) + .run(ctx, network_send, consensus_recv) .await .context("Consensus stopped") }) diff --git a/node/actors/executor/src/tests.rs b/node/components/executor/src/tests.rs similarity index 100% rename from node/actors/executor/src/tests.rs rename to node/components/executor/src/tests.rs diff --git a/node/actors/network/Cargo.toml b/node/components/network/Cargo.toml similarity index 96% rename from node/actors/network/Cargo.toml rename to node/components/network/Cargo.toml index 439ff197..74e0bcd5 100644 --- a/node/actors/network/Cargo.toml +++ b/node/components/network/Cargo.toml @@ -1,6 +1,6 @@ [package] authors.workspace = true -description = "ZKsync consensus network actor" +description = "ZKsync consensus network component" edition.workspace = true homepage.workspace = true keywords.workspace = true diff --git a/node/actors/network/build.rs b/node/components/network/build.rs similarity index 100% rename from node/actors/network/build.rs rename to node/components/network/build.rs diff --git a/node/actors/network/src/config.rs b/node/components/network/src/config.rs similarity index 97% rename from node/actors/network/src/config.rs rename to node/components/network/src/config.rs index 3a25be62..973c3f91 100644 --- a/node/actors/network/src/config.rs +++ b/node/components/network/src/config.rs @@ -1,4 +1,4 @@ -//! Network actor configs. +//! Network component configs. use std::collections::{HashMap, HashSet}; use zksync_concurrency::{limiter, net, time}; use zksync_consensus_roles::{node, validator}; @@ -82,7 +82,7 @@ pub struct GossipConfig { pub static_outbound: HashMap, } -/// Network actor config. +/// Network component config. #[derive(Debug, Clone)] pub struct Config { /// Label identifying the build version of the binary that this node is running. @@ -116,7 +116,7 @@ pub struct Config { /// Set it to `false` in tests to simulate node behavior before pre-genesis support. pub enable_pregenesis: bool, /// Maximum number of not-yet-persisted blocks fetched from the network. - /// If reached, network actor will wait for more blocks to get persisted + /// If reached, network component will wait for more blocks to get persisted /// before fetching the next ones. It is useful for limiting memory consumption /// when the block persisting rate is low. pub max_block_queue_size: usize, diff --git a/node/actors/network/src/consensus/handshake/mod.rs b/node/components/network/src/consensus/handshake/mod.rs similarity index 100% rename from node/actors/network/src/consensus/handshake/mod.rs rename to node/components/network/src/consensus/handshake/mod.rs diff --git a/node/actors/network/src/consensus/handshake/testonly.rs b/node/components/network/src/consensus/handshake/testonly.rs similarity index 100% rename from node/actors/network/src/consensus/handshake/testonly.rs rename to node/components/network/src/consensus/handshake/testonly.rs diff --git a/node/actors/network/src/consensus/handshake/tests.rs b/node/components/network/src/consensus/handshake/tests.rs similarity index 100% rename from node/actors/network/src/consensus/handshake/tests.rs rename to node/components/network/src/consensus/handshake/tests.rs diff --git a/node/actors/network/src/consensus/mod.rs b/node/components/network/src/consensus/mod.rs similarity index 97% rename from node/actors/network/src/consensus/mod.rs rename to node/components/network/src/consensus/mod.rs index 7b6e3f9a..8e912079 100644 --- a/node/actors/network/src/consensus/mod.rs +++ b/node/components/network/src/consensus/mod.rs @@ -134,14 +134,12 @@ impl rpc::Handler for &Network { req: rpc::consensus::Req, ) -> anyhow::Result { let (send, recv) = oneshot::channel(); - self.gossip - .sender - .send(io::OutputMessage::Consensus(io::ConsensusReq { - msg: req.0, - ack: send, - })); + self.gossip.consensus_sender.send(io::ConsensusReq { + msg: req.0, + ack: send, + }); // TODO(gprusak): disconnection means that there message was rejected OR - // that bft actor is missing (in tests), which leads to unnecessary disconnects. + // that bft component is missing (in tests), which leads to unnecessary disconnects. let _ = recv.recv_or_disconnected(ctx).await?; Ok(rpc::consensus::Resp) } diff --git a/node/actors/network/src/consensus/tests.rs b/node/components/network/src/consensus/tests.rs similarity index 94% rename from node/actors/network/src/consensus/tests.rs rename to node/components/network/src/consensus/tests.rs index a34062f9..f0493a57 100644 --- a/node/actors/network/src/consensus/tests.rs +++ b/node/components/network/src/consensus/tests.rs @@ -310,16 +310,12 @@ async fn test_transmission() { let in_message = io::ConsensusInputMessage { message: want.clone(), }; - nodes[0].pipe.send(in_message.into()); - - loop { - let output_message = nodes[1].pipe.recv(ctx).await.unwrap(); - if let io::OutputMessage::Consensus(got) = output_message { - assert_eq!(want, got.msg); - tracing::info!("OK"); - break; - }; - } + nodes[0].consensus_sender.send(in_message); + + let message = nodes[1].consensus_receiver.recv(ctx).await.unwrap(); + + assert_eq!(want, message.msg); + tracing::info!("OK"); } Ok(()) }) @@ -348,12 +344,9 @@ async fn test_retransmission() { // Make first node broadcast a message. let want: validator::Signed = rng.gen(); - node0.pipe.send( - io::ConsensusInputMessage { - message: want.clone(), - } - .into(), - ); + node0.consensus_sender.send(io::ConsensusInputMessage { + message: want.clone(), + }); // Spawn the second node multiple times. // Each time the node should reconnect and re-receive the broadcasted consensus message. @@ -363,13 +356,12 @@ async fn test_retransmission() { let (mut node1, runner) = testonly::Instance::new(cfgs[1].clone(), store.blocks.clone()); s.spawn_bg(runner.run(ctx)); - loop { - if let io::OutputMessage::Consensus(got) = node1.pipe.recv(ctx).await.unwrap() { - assert_eq!(want, got.msg); - tracing::info!("OK"); - break; - } - } + + let message = node1.consensus_receiver.recv(ctx).await.unwrap(); + + assert_eq!(want, message.msg); + tracing::info!("OK"); + Ok(()) }) .await diff --git a/node/actors/network/src/debug_page/mod.rs b/node/components/network/src/debug_page/mod.rs similarity index 100% rename from node/actors/network/src/debug_page/mod.rs rename to node/components/network/src/debug_page/mod.rs diff --git a/node/actors/network/src/debug_page/style.css b/node/components/network/src/debug_page/style.css similarity index 100% rename from node/actors/network/src/debug_page/style.css rename to node/components/network/src/debug_page/style.css diff --git a/node/actors/network/src/frame.rs b/node/components/network/src/frame.rs similarity index 100% rename from node/actors/network/src/frame.rs rename to node/components/network/src/frame.rs diff --git a/node/actors/network/src/gossip/attestation/metrics.rs b/node/components/network/src/gossip/attestation/metrics.rs similarity index 100% rename from node/actors/network/src/gossip/attestation/metrics.rs rename to node/components/network/src/gossip/attestation/metrics.rs diff --git a/node/actors/network/src/gossip/attestation/mod.rs b/node/components/network/src/gossip/attestation/mod.rs similarity index 100% rename from node/actors/network/src/gossip/attestation/mod.rs rename to node/components/network/src/gossip/attestation/mod.rs diff --git a/node/actors/network/src/gossip/attestation/tests.rs b/node/components/network/src/gossip/attestation/tests.rs similarity index 100% rename from node/actors/network/src/gossip/attestation/tests.rs rename to node/components/network/src/gossip/attestation/tests.rs diff --git a/node/actors/network/src/gossip/fetch.rs b/node/components/network/src/gossip/fetch.rs similarity index 100% rename from node/actors/network/src/gossip/fetch.rs rename to node/components/network/src/gossip/fetch.rs diff --git a/node/actors/network/src/gossip/handshake/mod.rs b/node/components/network/src/gossip/handshake/mod.rs similarity index 100% rename from node/actors/network/src/gossip/handshake/mod.rs rename to node/components/network/src/gossip/handshake/mod.rs diff --git a/node/actors/network/src/gossip/handshake/testonly.rs b/node/components/network/src/gossip/handshake/testonly.rs similarity index 100% rename from node/actors/network/src/gossip/handshake/testonly.rs rename to node/components/network/src/gossip/handshake/testonly.rs diff --git a/node/actors/network/src/gossip/handshake/tests.rs b/node/components/network/src/gossip/handshake/tests.rs similarity index 100% rename from node/actors/network/src/gossip/handshake/tests.rs rename to node/components/network/src/gossip/handshake/tests.rs diff --git a/node/actors/network/src/gossip/loadtest/mod.rs b/node/components/network/src/gossip/loadtest/mod.rs similarity index 100% rename from node/actors/network/src/gossip/loadtest/mod.rs rename to node/components/network/src/gossip/loadtest/mod.rs diff --git a/node/actors/network/src/gossip/loadtest/tests.rs b/node/components/network/src/gossip/loadtest/tests.rs similarity index 100% rename from node/actors/network/src/gossip/loadtest/tests.rs rename to node/components/network/src/gossip/loadtest/tests.rs diff --git a/node/actors/network/src/gossip/mod.rs b/node/components/network/src/gossip/mod.rs similarity index 95% rename from node/actors/network/src/gossip/mod.rs rename to node/components/network/src/gossip/mod.rs index 814f6b35..41e07973 100644 --- a/node/actors/network/src/gossip/mod.rs +++ b/node/components/network/src/gossip/mod.rs @@ -18,7 +18,7 @@ use fetch::RequestItem; use std::sync::{atomic::AtomicUsize, Arc}; use tracing::Instrument; pub(crate) use validator_addrs::*; -use zksync_concurrency::{ctx, ctx::channel, scope, sync}; +use zksync_concurrency::{ctx, scope, sync}; use zksync_consensus_roles::{node, validator}; use zksync_consensus_storage::BlockStore; @@ -56,8 +56,8 @@ pub(crate) struct Network { pub(crate) validator_addrs: ValidatorAddrsWatch, /// Block store to serve `get_block` requests from. pub(crate) block_store: Arc, - /// Output pipe of the network actor. - pub(crate) sender: channel::UnboundedSender, + /// Sender of the channel to the consensus component. + pub(crate) consensus_sender: sync::prunable_mpsc::Sender, /// Queue of block fetching requests. /// /// These are blocks that this node wants to request from remote peers via RPC. @@ -75,11 +75,11 @@ impl Network { pub(crate) fn new( cfg: Config, block_store: Arc, - sender: channel::UnboundedSender, + consensus_sender: sync::prunable_mpsc::Sender, attestation: Arc, ) -> Arc { Arc::new(Self { - sender, + consensus_sender, inbound: PoolWatch::new( cfg.gossip.static_inbound.clone(), cfg.gossip.dynamic_inbound_limit, diff --git a/node/actors/network/src/gossip/runner.rs b/node/components/network/src/gossip/runner.rs similarity index 100% rename from node/actors/network/src/gossip/runner.rs rename to node/components/network/src/gossip/runner.rs diff --git a/node/actors/network/src/gossip/testonly.rs b/node/components/network/src/gossip/testonly.rs similarity index 100% rename from node/actors/network/src/gossip/testonly.rs rename to node/components/network/src/gossip/testonly.rs diff --git a/node/actors/network/src/gossip/tests/fetch_blocks.rs b/node/components/network/src/gossip/tests/fetch_blocks.rs similarity index 100% rename from node/actors/network/src/gossip/tests/fetch_blocks.rs rename to node/components/network/src/gossip/tests/fetch_blocks.rs diff --git a/node/actors/network/src/gossip/tests/mod.rs b/node/components/network/src/gossip/tests/mod.rs similarity index 97% rename from node/actors/network/src/gossip/tests/mod.rs rename to node/components/network/src/gossip/tests/mod.rs index fa96f6cc..2e79162b 100644 --- a/node/actors/network/src/gossip/tests/mod.rs +++ b/node/components/network/src/gossip/tests/mod.rs @@ -518,12 +518,17 @@ async fn test_batch_votes_propagation() { for (i, mut cfg) in cfgs.into_iter().enumerate() { cfg.rpc.push_batch_votes_rate = limiter::Rate::INF; cfg.validator_key = None; - let (node, runner) = testonly::Instance::new_from_config(testonly::InstanceConfig { - cfg: cfg.clone(), - block_store: store.blocks.clone(), - attestation: attestation::Controller::new(Some(setup.attester_keys[i].clone())) - .into(), - }); + let (con_send, con_recv) = sync::prunable_mpsc::unpruned_channel(); + let (node, runner) = testonly::Instance::new_from_config( + testonly::InstanceConfig { + cfg: cfg.clone(), + block_store: store.blocks.clone(), + attestation: attestation::Controller::new(Some(setup.attester_keys[i].clone())) + .into(), + }, + con_send, + con_recv, + ); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); // Task going through the schedule, waiting for ANY node to collect the certificate // to advance to the next round of the schedule. diff --git a/node/actors/network/src/gossip/tests/syncing.rs b/node/components/network/src/gossip/tests/syncing.rs similarity index 100% rename from node/actors/network/src/gossip/tests/syncing.rs rename to node/components/network/src/gossip/tests/syncing.rs diff --git a/node/actors/network/src/gossip/validator_addrs.rs b/node/components/network/src/gossip/validator_addrs.rs similarity index 100% rename from node/actors/network/src/gossip/validator_addrs.rs rename to node/components/network/src/gossip/validator_addrs.rs diff --git a/node/components/network/src/io.rs b/node/components/network/src/io.rs new file mode 100644 index 00000000..f3c16fc1 --- /dev/null +++ b/node/components/network/src/io.rs @@ -0,0 +1,20 @@ +#![allow(missing_docs)] +use zksync_concurrency::oneshot; +use zksync_consensus_roles::validator; + +/// Message types from the Consensus component. +#[derive(Debug, PartialEq)] +pub struct ConsensusInputMessage { + pub message: validator::Signed, +} + +/// Consensus message received from the network. +#[derive(Debug)] +pub struct ConsensusReq { + /// Payload. + pub msg: validator::Signed, + /// Channel that should be used to notify the network component that + /// processing of this message has been completed. + /// Used for rate limiting. + pub ack: oneshot::Sender<()>, +} diff --git a/node/actors/network/src/lib.rs b/node/components/network/src/lib.rs similarity index 86% rename from node/actors/network/src/lib.rs rename to node/components/network/src/lib.rs index 80f57619..bb957ced 100644 --- a/node/actors/network/src/lib.rs +++ b/node/components/network/src/lib.rs @@ -1,4 +1,4 @@ -//! Network actor maintaining a pool of outbound and inbound connections to other nodes. +//! Network component maintaining a pool of outbound and inbound connections to other nodes. use anyhow::Context as _; use gossip::attestation; use std::sync::Arc; @@ -6,10 +6,9 @@ use tracing::Instrument as _; use zksync_concurrency::{ ctx::{self, channel}, error::Wrap as _, - limiter, scope, + limiter, scope, sync, }; use zksync_consensus_storage::BlockStore; -use zksync_consensus_utils::pipe::ActorPipe; mod config; pub mod consensus; @@ -24,7 +23,6 @@ mod pool; mod preface; pub mod proto; mod rpc; -mod state; pub mod testonly; #[cfg(test)] mod tests; @@ -32,7 +30,7 @@ mod watch; pub use config::*; pub use metrics::MeteredStreamStats; -/// State of the network actor observable outside of the actor. +/// State of the network component observable outside of the component. pub struct Network { /// Consensus network state. pub(crate) consensus: Option>, @@ -45,27 +43,28 @@ pub struct Network { pub struct Runner { /// Network state. net: Arc, - /// Receiver of the messages from the dispatcher. - receiver: channel::UnboundedReceiver, + /// Receiver of consensus messages. + consensus_receiver: channel::UnboundedReceiver, } impl Network { - /// Constructs a new network actor state. - /// Call `run_network` to run the actor. + /// Constructs a new network component state. + /// Call `run_network` to run the component. pub fn new( cfg: Config, block_store: Arc, - pipe: ActorPipe, + consensus_sender: sync::prunable_mpsc::Sender, + consensus_receiver: channel::UnboundedReceiver, attestation: Arc, ) -> (Arc, Runner) { - let gossip = gossip::Network::new(cfg, block_store, pipe.send, attestation); + let gossip = gossip::Network::new(cfg, block_store, consensus_sender, attestation); let consensus = consensus::Network::new(gossip.clone()); let net = Arc::new(Self { gossip, consensus }); ( net.clone(), Runner { net, - receiver: pipe.recv, + consensus_receiver, }, ) } @@ -80,23 +79,20 @@ impl Network { async fn handle_message( &self, _ctx: &ctx::Ctx, - message: io::InputMessage, + message: io::ConsensusInputMessage, ) -> anyhow::Result<()> { - match message { - io::InputMessage::Consensus(message) => { - self.consensus - .as_ref() - .context("not a validator node")? - .msg_pool - .send(Arc::new(message)); - } - } + self.consensus + .as_ref() + .context("not a validator node")? + .msg_pool + .send(Arc::new(message)); + Ok(()) } } impl Runner { - /// Runs the network actor. + /// Runs the network component. pub async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { let res: ctx::Result<()> = scope::run!(ctx, |ctx, s| async { let mut listener = self @@ -110,7 +106,7 @@ impl Runner { // Handle incoming messages. s.spawn(async { // We don't propagate cancellation errors - while let Ok(message) = self.receiver.recv(ctx).await { + while let Ok(message) = self.consensus_receiver.recv(ctx).await { s.spawn(async { if let Err(err) = self.net.handle_message(ctx, message).await { tracing::info!("handle_message(): {err:#}"); diff --git a/node/actors/network/src/metrics.rs b/node/components/network/src/metrics.rs similarity index 100% rename from node/actors/network/src/metrics.rs rename to node/components/network/src/metrics.rs diff --git a/node/actors/network/src/mux/config.rs b/node/components/network/src/mux/config.rs similarity index 100% rename from node/actors/network/src/mux/config.rs rename to node/components/network/src/mux/config.rs diff --git a/node/actors/network/src/mux/handshake.rs b/node/components/network/src/mux/handshake.rs similarity index 100% rename from node/actors/network/src/mux/handshake.rs rename to node/components/network/src/mux/handshake.rs diff --git a/node/actors/network/src/mux/header.rs b/node/components/network/src/mux/header.rs similarity index 100% rename from node/actors/network/src/mux/header.rs rename to node/components/network/src/mux/header.rs diff --git a/node/actors/network/src/mux/mod.rs b/node/components/network/src/mux/mod.rs similarity index 100% rename from node/actors/network/src/mux/mod.rs rename to node/components/network/src/mux/mod.rs diff --git a/node/actors/network/src/mux/reusable_stream.rs b/node/components/network/src/mux/reusable_stream.rs similarity index 100% rename from node/actors/network/src/mux/reusable_stream.rs rename to node/components/network/src/mux/reusable_stream.rs diff --git a/node/actors/network/src/mux/tests/mod.rs b/node/components/network/src/mux/tests/mod.rs similarity index 100% rename from node/actors/network/src/mux/tests/mod.rs rename to node/components/network/src/mux/tests/mod.rs diff --git a/node/actors/network/src/mux/tests/proto/mod.proto b/node/components/network/src/mux/tests/proto/mod.proto similarity index 100% rename from node/actors/network/src/mux/tests/proto/mod.proto rename to node/components/network/src/mux/tests/proto/mod.proto diff --git a/node/actors/network/src/mux/tests/proto/mod.rs b/node/components/network/src/mux/tests/proto/mod.rs similarity index 100% rename from node/actors/network/src/mux/tests/proto/mod.rs rename to node/components/network/src/mux/tests/proto/mod.rs diff --git a/node/actors/network/src/mux/transient_stream.rs b/node/components/network/src/mux/transient_stream.rs similarity index 100% rename from node/actors/network/src/mux/transient_stream.rs rename to node/components/network/src/mux/transient_stream.rs diff --git a/node/actors/network/src/noise/bytes.rs b/node/components/network/src/noise/bytes.rs similarity index 100% rename from node/actors/network/src/noise/bytes.rs rename to node/components/network/src/noise/bytes.rs diff --git a/node/actors/network/src/noise/mod.rs b/node/components/network/src/noise/mod.rs similarity index 100% rename from node/actors/network/src/noise/mod.rs rename to node/components/network/src/noise/mod.rs diff --git a/node/actors/network/src/noise/stream.rs b/node/components/network/src/noise/stream.rs similarity index 100% rename from node/actors/network/src/noise/stream.rs rename to node/components/network/src/noise/stream.rs diff --git a/node/actors/network/src/noise/testonly.rs b/node/components/network/src/noise/testonly.rs similarity index 100% rename from node/actors/network/src/noise/testonly.rs rename to node/components/network/src/noise/testonly.rs diff --git a/node/actors/network/src/noise/tests.rs b/node/components/network/src/noise/tests.rs similarity index 100% rename from node/actors/network/src/noise/tests.rs rename to node/components/network/src/noise/tests.rs diff --git a/node/actors/network/src/pool.rs b/node/components/network/src/pool.rs similarity index 100% rename from node/actors/network/src/pool.rs rename to node/components/network/src/pool.rs diff --git a/node/actors/network/src/preface.rs b/node/components/network/src/preface.rs similarity index 100% rename from node/actors/network/src/preface.rs rename to node/components/network/src/preface.rs diff --git a/node/actors/network/src/proto/consensus.proto b/node/components/network/src/proto/consensus.proto similarity index 100% rename from node/actors/network/src/proto/consensus.proto rename to node/components/network/src/proto/consensus.proto diff --git a/node/actors/network/src/proto/gossip.proto b/node/components/network/src/proto/gossip.proto similarity index 100% rename from node/actors/network/src/proto/gossip.proto rename to node/components/network/src/proto/gossip.proto diff --git a/node/actors/network/src/proto/mod.rs b/node/components/network/src/proto/mod.rs similarity index 100% rename from node/actors/network/src/proto/mod.rs rename to node/components/network/src/proto/mod.rs diff --git a/node/actors/network/src/proto/mux.proto b/node/components/network/src/proto/mux.proto similarity index 100% rename from node/actors/network/src/proto/mux.proto rename to node/components/network/src/proto/mux.proto diff --git a/node/actors/network/src/proto/ping.proto b/node/components/network/src/proto/ping.proto similarity index 100% rename from node/actors/network/src/proto/ping.proto rename to node/components/network/src/proto/ping.proto diff --git a/node/actors/network/src/proto/preface.proto b/node/components/network/src/proto/preface.proto similarity index 100% rename from node/actors/network/src/proto/preface.proto rename to node/components/network/src/proto/preface.proto diff --git a/node/actors/network/src/proto/rpc.proto b/node/components/network/src/proto/rpc.proto similarity index 100% rename from node/actors/network/src/proto/rpc.proto rename to node/components/network/src/proto/rpc.proto diff --git a/node/actors/network/src/rpc/consensus.rs b/node/components/network/src/rpc/consensus.rs similarity index 100% rename from node/actors/network/src/rpc/consensus.rs rename to node/components/network/src/rpc/consensus.rs diff --git a/node/actors/network/src/rpc/get_block.rs b/node/components/network/src/rpc/get_block.rs similarity index 100% rename from node/actors/network/src/rpc/get_block.rs rename to node/components/network/src/rpc/get_block.rs diff --git a/node/actors/network/src/rpc/metrics.rs b/node/components/network/src/rpc/metrics.rs similarity index 100% rename from node/actors/network/src/rpc/metrics.rs rename to node/components/network/src/rpc/metrics.rs diff --git a/node/actors/network/src/rpc/mod.rs b/node/components/network/src/rpc/mod.rs similarity index 100% rename from node/actors/network/src/rpc/mod.rs rename to node/components/network/src/rpc/mod.rs diff --git a/node/actors/network/src/rpc/ping.rs b/node/components/network/src/rpc/ping.rs similarity index 100% rename from node/actors/network/src/rpc/ping.rs rename to node/components/network/src/rpc/ping.rs diff --git a/node/actors/network/src/rpc/push_batch_votes.rs b/node/components/network/src/rpc/push_batch_votes.rs similarity index 100% rename from node/actors/network/src/rpc/push_batch_votes.rs rename to node/components/network/src/rpc/push_batch_votes.rs diff --git a/node/actors/network/src/rpc/push_block_store_state.rs b/node/components/network/src/rpc/push_block_store_state.rs similarity index 100% rename from node/actors/network/src/rpc/push_block_store_state.rs rename to node/components/network/src/rpc/push_block_store_state.rs diff --git a/node/actors/network/src/rpc/push_validator_addrs.rs b/node/components/network/src/rpc/push_validator_addrs.rs similarity index 100% rename from node/actors/network/src/rpc/push_validator_addrs.rs rename to node/components/network/src/rpc/push_validator_addrs.rs diff --git a/node/actors/network/src/rpc/testonly.rs b/node/components/network/src/rpc/testonly.rs similarity index 100% rename from node/actors/network/src/rpc/testonly.rs rename to node/components/network/src/rpc/testonly.rs diff --git a/node/actors/network/src/rpc/tests.rs b/node/components/network/src/rpc/tests.rs similarity index 100% rename from node/actors/network/src/rpc/tests.rs rename to node/components/network/src/rpc/tests.rs diff --git a/node/actors/network/src/testonly.rs b/node/components/network/src/testonly.rs similarity index 83% rename from node/actors/network/src/testonly.rs rename to node/components/network/src/testonly.rs index e6d5ca3d..48421f80 100644 --- a/node/actors/network/src/testonly.rs +++ b/node/components/network/src/testonly.rs @@ -1,8 +1,8 @@ //! Testonly utilities. #![allow(dead_code)] use crate::{ - gossip::attestation, io::ConsensusInputMessage, Config, GossipConfig, Network, RpcConfig, - Runner, + gossip::attestation, io::ConsensusInputMessage, io::ConsensusReq, Config, GossipConfig, + Network, RpcConfig, Runner, }; use rand::{ distributions::{Distribution, Standard}, @@ -18,7 +18,6 @@ use zksync_concurrency::{ }; use zksync_consensus_roles::{node, validator}; use zksync_consensus_storage::BlockStore; -use zksync_consensus_utils::pipe; impl Distribution for Standard { fn sample(&self, rng: &mut R) -> ConsensusInputMessage { @@ -78,15 +77,20 @@ pub(crate) async fn forward( let _ = io::shutdown(ctx, &mut write).await; } -/// Node instance, wrapping the network actor state and the +/// Node instance, wrapping the network component state and the /// events channel. +#[allow(clippy::partial_pub_fields)] pub struct Instance { /// State of the instance. - pub(crate) net: Arc, + pub net: Arc, /// Termination signal that can be sent to the node. - pub(crate) terminate: channel::Sender<()>, - /// Dispatcher end of the network pipe. - pub(crate) pipe: pipe::DispatcherPipe, + terminate: channel::Sender<()>, + /// Receiver channel to receive messages from the network component that are + /// intended for the consensus component. + pub consensus_receiver: sync::prunable_mpsc::Receiver, + /// Sender channel to send messages to the network component that are from + /// the consensus component. + pub consensus_sender: channel::UnboundedSender, } /// Construct configs for `n` validators of the consensus. @@ -190,27 +194,59 @@ pub struct InstanceConfig { impl Instance { /// Constructs a new instance. pub fn new(cfg: Config, block_store: Arc) -> (Self, InstanceRunner) { - Self::new_from_config(InstanceConfig { - cfg, - block_store, - attestation: attestation::Controller::new(None).into(), - }) + let (con_send, con_recv) = sync::prunable_mpsc::unpruned_channel(); + Self::new_from_config( + InstanceConfig { + cfg, + block_store, + attestation: attestation::Controller::new(None).into(), + }, + con_send, + con_recv, + ) + } + + /// Constructs a new instance with a custom channel for consensus + /// component. + pub fn new_with_channel( + cfg: Config, + block_store: Arc, + con_send: sync::prunable_mpsc::Sender, + con_recv: sync::prunable_mpsc::Receiver, + ) -> (Self, InstanceRunner) { + Self::new_from_config( + InstanceConfig { + cfg, + block_store, + attestation: attestation::Controller::new(None).into(), + }, + con_send, + con_recv, + ) } /// Construct an instance for a given config. - pub fn new_from_config(cfg: InstanceConfig) -> (Self, InstanceRunner) { - let (actor_pipe, dispatcher_pipe) = pipe::new(); + pub fn new_from_config( + cfg: InstanceConfig, + net_to_con_send: sync::prunable_mpsc::Sender, + net_to_con_recv: sync::prunable_mpsc::Receiver, + ) -> (Self, InstanceRunner) { + let (con_to_net_send, con_to_net_recv) = channel::unbounded(); + let (terminate_send, terminate_recv) = channel::bounded(1); + let (net, net_runner) = Network::new( cfg.cfg, cfg.block_store.clone(), - actor_pipe, + net_to_con_send, + con_to_net_recv, cfg.attestation, ); - let (terminate_send, terminate_recv) = channel::bounded(1); + ( Self { net, - pipe: dispatcher_pipe, + consensus_receiver: net_to_con_recv, + consensus_sender: con_to_net_send, terminate: terminate_send, }, InstanceRunner { @@ -227,13 +263,6 @@ impl Instance { self.terminate.closed(ctx).await } - /// Pipe getter. - pub fn pipe( - &mut self, - ) -> &mut pipe::DispatcherPipe { - &mut self.pipe - } - /// State getter. pub fn state(&self) -> &Arc { &self.net diff --git a/node/actors/network/src/tests.rs b/node/components/network/src/tests.rs similarity index 100% rename from node/actors/network/src/tests.rs rename to node/components/network/src/tests.rs diff --git a/node/actors/network/src/watch.rs b/node/components/network/src/watch.rs similarity index 100% rename from node/actors/network/src/watch.rs rename to node/components/network/src/watch.rs diff --git a/node/deny.toml b/node/deny.toml index b4c9a367..6e7e62ad 100644 --- a/node/deny.toml +++ b/node/deny.toml @@ -5,6 +5,11 @@ targets = [ { triple = "x86_64-unknown-linux-musl" }, ] +[advisories] +ignore = [ + { id = "RUSTSEC-2024-0384", reason = "caused by kube, which hasn't updated their dependencies yet" }, +] + [licenses] # We want to set a high confidence threshold for license detection. confidence-threshold = 1.0 @@ -17,8 +22,8 @@ allow = [ "ISC", "MIT", "OpenSSL", - "Unicode-DFS-2016", "Unicode-3.0", + "Unicode-DFS-2016", # Weak copyleft licenses "MPL-2.0", ] @@ -35,33 +40,6 @@ name = "ring" expression = "MIT" license-files = [{ path = "LICENSE", hash = 0xbd0eed23 }] -[bans] -# Lint level for when multiple versions of the same crate are detected -multiple-versions = "deny" -# Certain crates/versions that will be skipped when doing duplicate detection. -skip = [ - # Old versions required by tempfile and prost-build. - { name = "bitflags", version = "1.3.2" }, - - # Old version required by tracing-subscriber. - { name = "regex-automata", version = "0.1.10" }, - { name = "regex-syntax", version = "0.6.29" }, - - # Old version required by kube-runtime. - { name = "syn", version = "1.0" }, - - # Old version required by protox 0.5.1. - { name = "base64", version = "0.21.7" }, - - # Old versions required by vise-exporter. - { name = "http", version = "0.2.12" }, - { name = "http-body", version = "0.4.6" }, - { name = "hyper", version = "0.14.28" }, - - { name = "rustls-native-certs", version = "0.7.3" }, - { name = "hashbrown", version = "0.14.5" }, -] - [sources] unknown-git = "deny" unknown-registry = "deny" diff --git a/node/libs/concurrency/src/sync/prunable_mpsc/mod.rs b/node/libs/concurrency/src/sync/prunable_mpsc/mod.rs index ba8f0bd6..4676082b 100644 --- a/node/libs/concurrency/src/sync/prunable_mpsc/mod.rs +++ b/node/libs/concurrency/src/sync/prunable_mpsc/mod.rs @@ -56,6 +56,11 @@ pub fn channel( (send, recv) } +/// Creates a channel without any pruning or filtering, and returns the [`Sender`] and [`Receiver`] handles. +pub fn unpruned_channel() -> (Sender, Receiver) { + channel(|_| true, |_, _| SelectionFunctionResult::Keep) +} + struct Shared { send: watch::Sender>, } diff --git a/node/libs/roles/src/validator/messages/genesis.rs b/node/libs/roles/src/validator/messages/genesis.rs index 9f126409..19d6daeb 100644 --- a/node/libs/roles/src/validator/messages/genesis.rs +++ b/node/libs/roles/src/validator/messages/genesis.rs @@ -140,7 +140,7 @@ impl TryFrom for ProtocolVersion { fn try_from(value: u32) -> Result { // Currently, consensus doesn't define restrictions on the possible version. Unsupported - // versions are filtered out on the BFT actor level instead. + // versions are filtered out on the BFT component level instead. Ok(Self(value)) } } diff --git a/node/libs/utils/src/lib.rs b/node/libs/utils/src/lib.rs index 71fc6af1..33d4ecc5 100644 --- a/node/libs/utils/src/lib.rs +++ b/node/libs/utils/src/lib.rs @@ -2,6 +2,5 @@ mod encode; pub mod enum_util; -pub mod pipe; pub use encode::*; diff --git a/node/libs/utils/src/pipe.rs b/node/libs/utils/src/pipe.rs deleted file mode 100644 index d1b3a0d7..00000000 --- a/node/libs/utils/src/pipe.rs +++ /dev/null @@ -1,76 +0,0 @@ -//! This is a wrapper around channels to make it simpler and less error-prone to connect actors and the dispatcher. -//! A Pipe is a basically a bi-directional unbounded channel. - -use std::future::Future; -use zksync_concurrency::ctx::{self, channel, Ctx}; - -/// This is the end of the Pipe that should be held by the actor. -/// -/// The actor can receive `In` and send back `Out` messages. -pub type ActorPipe = Pipe; - -/// This is the end of the Pipe that should be held by the dispatcher. -/// -/// The dispatcher can send `In` messages and receive `Out` back. -pub type DispatcherPipe = Pipe; - -/// This is a generic Pipe end. -/// -/// It is used to receive `In` and send `Out` messages. -/// -/// When viewed from the perspective of [new]: -/// * messages of type `In` are going right-to-left -/// * messages of type `Out` are going left-to-right -/// -/// ```text -/// In <- -------- <- In -/// | Pipe | -/// Out -> -------- -> Out -/// ^ ^ -/// Actor Dispatcher -/// ``` -#[derive(Debug)] -pub struct Pipe { - /// This is the channel that receives messages. - pub recv: channel::UnboundedReceiver, - /// This is the channel that sends messages. - pub send: channel::UnboundedSender, -} - -impl Pipe { - /// Sends a message to the pipe. - pub fn send(&self, msg: Out) { - self.send.send(msg) - } - - /// Awaits a message from the pipe. - pub fn recv<'a>( - &'a mut self, - ctx: &'a Ctx, - ) -> ctx::CtxAware>> { - self.recv.recv(ctx) - } - - /// Tries to get a message from the pipe. Will return None if the pipe is empty. - pub fn try_recv(&mut self) -> Option { - self.recv.try_recv() - } -} - -/// This function creates a new Pipe. It returns the two ends of the pipe, for the actor and the dispatcher. -pub fn new() -> (ActorPipe, DispatcherPipe) { - let (input_sender, input_receiver) = channel::unbounded(); - let (output_sender, output_receiver) = channel::unbounded(); - - let pipe_actor = Pipe { - recv: input_receiver, - send: output_sender, - }; - - let pipe_dispatcher = Pipe { - recv: output_receiver, - send: input_sender, - }; - - (pipe_actor, pipe_dispatcher) -} diff --git a/node/tools/src/main.rs b/node/tools/src/main.rs index bf79a9bd..a80bb6b9 100644 --- a/node/tools/src/main.rs +++ b/node/tools/src/main.rs @@ -1,5 +1,5 @@ //! Main binary for the consensus node. It reads the configuration, initializes all parts of the node and -//! manages communication between the actors. It is the main executable in this workspace. +//! manages communication between the components. It is the main executable in this workspace. use anyhow::Context as _; use clap::Parser; use std::{fs, fs::Permissions, io::IsTerminal as _, os::unix::fs::PermissionsExt, path::PathBuf};