diff --git a/node/Cargo.lock b/node/Cargo.lock index 222483f7..9d9bb9fa 100644 --- a/node/Cargo.lock +++ b/node/Cargo.lock @@ -76,9 +76,9 @@ dependencies = [ [[package]] name = "allocator-api2" -version = "0.2.18" +version = "0.2.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" +checksum = "45862d1c77f2228b9e10bc609d5bc203d86ebc9b87ad8d5d5167a6c9abf739d9" [[package]] name = "anes" @@ -137,9 +137,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.92" +version = "1.0.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74f37166d7d48a0284b99dd824694c26119c700b53bf0d1540cdb147dbdaaf13" +checksum = "4c95c10ba0b00a02636238b814946408b1322d5ac4760326e6fb8ec956d85775" [[package]] name = "assert_matches" @@ -178,7 +178,7 @@ checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -189,7 +189,7 @@ checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -308,7 +308,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.87", + "syn", ] [[package]] @@ -330,7 +330,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.87", + "syn", "which", ] @@ -440,9 +440,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.1.35" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f57c4b4da2a9d619dd035f27316d7a426305b75be93d09e92f2b9229c34feaf" +checksum = "1aeb932158bd710538c73702db6945cb68a8fb08c519e6e12706b94263b36db8" dependencies = [ "jobserver", "libc", @@ -584,7 +584,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -657,9 +657,9 @@ checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" [[package]] name = "cpufeatures" -version = "0.2.14" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "608697df725056feaccfa42cffdaeeec3fccc4ffc38358ecd19b243e716a78e0" +checksum = "0ca741a962e1b0bff6d724a1a0958b686406e853bb14061f218562e1896f95e6" dependencies = [ "libc", ] @@ -761,7 +761,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "edb49164822f3ee45b17acd4a208cfc1251410cf0cad9a833234c9890774dd9f" dependencies = [ "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -797,7 +797,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -821,7 +821,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.87", + "syn", ] [[package]] @@ -832,7 +832,7 @@ checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" dependencies = [ "darling_core", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -854,17 +854,6 @@ dependencies = [ "powerfmt", ] -[[package]] -name = "derivative" -version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "diff" version = "0.1.13" @@ -891,7 +880,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -951,6 +940,18 @@ dependencies = [ "zeroize", ] +[[package]] +name = "educe" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d7bc049e1bd8cdeb31b68bbd586a9464ecf9f3944af3958a7a9d0f8b9799417" +dependencies = [ + "enum-ordinalize", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "either" version = "1.13.0" @@ -985,6 +986,26 @@ dependencies = [ "stable_deref_trait", ] +[[package]] +name = "enum-ordinalize" +version = "4.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea0dcfa4e54eeb516fe454635a95753ddd39acda650ce703031c6973e315dd5" +dependencies = [ + "enum-ordinalize-derive", +] + +[[package]] +name = "enum-ordinalize-derive" +version = "4.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d28318a75d4aead5c4db25382e8ef717932d0346600cacae6357eb5941bc5ff" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -1024,9 +1045,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "2.1.1" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6" +checksum = "486f806e73c5707928240ddc295403b1b93c96a02038563881c4a2fd84b81ac4" [[package]] name = "ff" @@ -1050,6 +1071,15 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "fluent-uri" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17c704e9dbe1ddd863da1e6ff3567795087b1eb201ce80d8fa81162e1516500d" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1115,7 +1145,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -1248,6 +1278,30 @@ version = "0.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a9bfc1af68b1726ea47d3d5109de126281def866b33970e10fbab11b5dafab3" +[[package]] +name = "headers" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "322106e6bd0cba2d5ead589ddb8150a13d7c4217cf80d7c4f682ca994ccc6aa9" +dependencies = [ + "base64 0.21.7", + "bytes", + "headers-core", + "http 1.1.0", + "httpdate", + "mime", + "sha1", +] + +[[package]] +name = "headers-core" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4" +dependencies = [ + "http 1.1.0", +] + [[package]] name = "heck" version = "0.5.0" @@ -1408,6 +1462,26 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-http-proxy" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d06dbdfbacf34d996c6fb540a71a684a7aae9056c71951163af8a8a4c07b9a4" +dependencies = [ + "bytes", + "futures-util", + "headers", + "http 1.1.0", + "hyper 1.5.0", + "hyper-rustls", + "hyper-util", + "pin-project-lite", + "rustls-native-certs 0.7.3", + "tokio", + "tokio-rustls", + "tower-service", +] + [[package]] name = "hyper-rustls" version = "0.27.3" @@ -1574,7 +1648,7 @@ checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -1727,10 +1801,11 @@ dependencies = [ [[package]] name = "json-patch" -version = "1.4.0" +version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec9ad60d674508f3ca8f380a928cfe7b096bc729c4e2dbfe3852bc45da3ab30b" +checksum = "5b1fb8864823fad91877e6caea0baca82e49e8db50f8e5c9f9a453e27d3330fc" dependencies = [ + "jsonptr", "serde", "serde_json", "thiserror", @@ -1751,6 +1826,17 @@ dependencies = [ "thiserror", ] +[[package]] +name = "jsonptr" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c6e529149475ca0b2820835d3dce8fcc41c6b943ca608d32f35b449255e4627" +dependencies = [ + "fluent-uri", + "serde", + "serde_json", +] + [[package]] name = "jsonrpsee" version = "0.23.2" @@ -1809,7 +1895,7 @@ dependencies = [ "serde_json", "thiserror", "tokio", - "tower", + "tower 0.4.13", "tracing", "url", ] @@ -1838,7 +1924,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-util", - "tower", + "tower 0.4.13", "tracing", ] @@ -1871,9 +1957,9 @@ dependencies = [ [[package]] name = "k8s-openapi" -version = "0.22.0" +version = "0.23.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19501afb943ae5806548bc3ebd7f3374153ca057a38f480ef30adfde5ef09755" +checksum = "9c8847402328d8301354c94d605481f25a6bdc1ed65471fd96af8eca71141b13" dependencies = [ "base64 0.22.1", "chrono", @@ -1893,9 +1979,9 @@ dependencies = [ [[package]] name = "kube" -version = "0.91.0" +version = "0.96.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "264461a7ebf4fb0fcf23e4c7e4f9387c5696ee61d003de207d9b5a895ff37bfa" +checksum = "efffeb3df0bd4ef3e5d65044573499c0e4889b988070b08c50b25b1329289a1f" dependencies = [ "k8s-openapi", "kube-client", @@ -1906,9 +1992,9 @@ dependencies = [ [[package]] name = "kube-client" -version = "0.91.0" +version = "0.96.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47164ad6c47398ee4bdf90509c7b44026229721cb1377eb4623a1ec2a00a85e9" +checksum = "8bf471ece8ff8d24735ce78dac4d091e9fcb8d74811aeb6b75de4d1c3f5de0f1" dependencies = [ "base64 0.22.1", "bytes", @@ -1920,6 +2006,7 @@ dependencies = [ "http-body 1.0.1", "http-body-util", "hyper 1.5.0", + "hyper-http-proxy", "hyper-rustls", "hyper-timeout", "hyper-util", @@ -1936,16 +2023,16 @@ dependencies = [ "thiserror", "tokio", "tokio-util", - "tower", + "tower 0.5.1", "tower-http", "tracing", ] [[package]] name = "kube-core" -version = "0.91.0" +version = "0.96.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2797d3044a238825432129cd9537e12c2a6dacbbb5352381af5ea55e1505ed4f" +checksum = "f42346d30bb34d1d7adc5c549b691bce7aa3a1e60254e68fab7e2d7b26fe3d77" dependencies = [ "chrono", "form_urlencoded", @@ -1954,45 +2041,46 @@ dependencies = [ "k8s-openapi", "schemars", "serde", + "serde-value", "serde_json", "thiserror", ] [[package]] name = "kube-derive" -version = "0.91.0" +version = "0.96.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fcf837edaa0c478f85e9a3cddb17fa80d58a57c1afa722b3a9e55753ea162f41" +checksum = "f9364e04cc5e0482136c6ee8b7fb7551812da25802249f35b3def7aaa31e82ad" dependencies = [ "darling", "proc-macro2", "quote", "serde_json", - "syn 2.0.87", + "syn", ] [[package]] name = "kube-runtime" -version = "0.91.0" +version = "0.96.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e463e89a1fb222c65a5469b568803153d1bf13d084a8dd42b659e6cca66edc6e" +checksum = "d3fbf1f6ffa98e65f1d2a9a69338bb60605d46be7edf00237784b89e62c9bd44" dependencies = [ "ahash", "async-broadcast", "async-stream", "async-trait", "backoff", - "derivative", + "educe", "futures", "hashbrown 0.14.5", "json-patch", + "jsonptr", "k8s-openapi", "kube-client", "parking_lot", "pin-project", "serde", "serde_json", - "smallvec", "thiserror", "tokio", "tokio-util", @@ -2013,9 +2101,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" [[package]] name = "libc" -version = "0.2.161" +version = "0.2.162" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e9489c2807c139ffd9c1794f4af0ebe86a828db53ecdc7fea2111d0fed085d1" +checksum = "18d287de67fe55fd7e1581fe933d965a5a9477b38e949cfa9f8574ef01506398" [[package]] name = "libloading" @@ -2102,7 +2190,7 @@ dependencies = [ "proc-macro2", "quote", "regex-syntax 0.6.29", - "syn 2.0.87", + "syn", ] [[package]] @@ -2159,7 +2247,7 @@ checksum = "49e7bc1560b95a3c4a25d03de42fe76ca718ab92d1a22a55b9b4cf67b3ae635c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -2407,7 +2495,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -2448,7 +2536,7 @@ checksum = "3c0f5fad0874fc7abcd4d750e76917eaebbecaa2c20bde22e1dbeeba8beb758c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -2562,7 +2650,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "64d1ec885c64d0457d564db4ec299b2dae3f9c02808b8ad9c3a089c591b18033" dependencies = [ "proc-macro2", - "syn 2.0.87", + "syn", ] [[package]] @@ -2594,7 +2682,7 @@ checksum = "440f724eba9f6996b75d63681b0a92b06947f1457076d503a4d2e2c8f56442b8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -2624,7 +2712,7 @@ dependencies = [ "prost", "prost-types", "regex", - "syn 2.0.87", + "syn", "tempfile", ] @@ -2638,7 +2726,7 @@ dependencies = [ "itertools 0.12.1", "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -2787,7 +2875,7 @@ checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.4.8", + "regex-automata 0.4.9", "regex-syntax 0.8.5", ] @@ -2802,9 +2890,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.8" +version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "368758f23274712b504848e9d5a6f010445cc8b87a7cdb4d7cbee666c1288da3" +checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" dependencies = [ "aho-corasick", "memchr", @@ -2887,9 +2975,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.39" +version = "0.38.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "375116bee2be9ed569afe2154ea6a99dfdffd257f533f187498c2a8f5feaf4ee" +checksum = "99e4ea3e1cdc4b559b8e5650f9c8e5998e3e5c1343b4eaf034565f32318d63c0" dependencies = [ "bitflags 2.6.0", "errno", @@ -3039,7 +3127,7 @@ dependencies = [ "proc-macro2", "quote", "serde_derive_internals", - "syn 2.0.87", + "syn", ] [[package]] @@ -3064,11 +3152,10 @@ dependencies = [ [[package]] name = "secrecy" -version = "0.8.0" +version = "0.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9bd1c54ea06cfd2f6b63219704de0b9b4f72dcc2b8fdef820be6cd799780e91e" +checksum = "e891af845473308773346dc847b2c23ee78fe442e0472ac50e22a18a93d3ae5a" dependencies = [ - "serde", "zeroize", ] @@ -3088,9 +3175,9 @@ dependencies = [ [[package]] name = "security-framework-sys" -version = "2.12.0" +version = "2.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea4a292869320c0272d7bc55a5a6aafaff59b4f63404a003887b679a2e05b4b6" +checksum = "fa39c7303dc58b5543c94d22c1766b0d31f2ee58306363ea622b10bbc075eaa2" dependencies = [ "core-foundation-sys", "libc", @@ -3104,9 +3191,9 @@ checksum = "61697e0a1c7e512e84a621326239844a24d8207b4669b41bc18b32ea5cbf988b" [[package]] name = "serde" -version = "1.0.214" +version = "1.0.215" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f55c3193aca71c12ad7890f1785d2b73e1b9f63a0bbc353c08ef26fe03fc56b5" +checksum = "6513c1ad0b11a9376da888e3e0baa0077f1aed55c17f50e7b2397136129fb88f" dependencies = [ "serde_derive", ] @@ -3123,13 +3210,13 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.214" +version = "1.0.215" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de523f781f095e28fa605cdce0f8307e451cc0fd14e2eb4cd2e98a355b147766" +checksum = "ad1e866f866923f252f05c889987993144fb74e722403468a4ebd70c3cd756c0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -3140,7 +3227,7 @@ checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -3337,9 +3424,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "syn" -version = "1.0.109" +version = "2.0.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +checksum = "25aa4ce346d03a6dcd68dd8b4010bcb74e54e62c90c573f394c46eae99aba32d" dependencies = [ "proc-macro2", "quote", @@ -3347,15 +3434,10 @@ dependencies = [ ] [[package]] -name = "syn" -version = "2.0.87" +name = "sync_wrapper" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25aa4ce346d03a6dcd68dd8b4010bcb74e54e62c90c573f394c46eae99aba32d" -dependencies = [ - "proc-macro2", - "quote", - "unicode-ident", -] +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" [[package]] name = "synstructure" @@ -3365,14 +3447,14 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] name = "tempfile" -version = "3.13.0" +version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0f2c9fc62d0beef6951ccffd757e241266a2c833136efbe35af6cd2567dca5b" +checksum = "28cce251fcbc87fac86a866eeb0d6c2d536fc16d06f184bb61aeae11aa4cee0c" dependencies = [ "cfg-if", "fastrand", @@ -3398,7 +3480,7 @@ checksum = "f9b53c7124dd88026d5d98a1eb1fd062a578b7d783017c9298825526c7fb6427" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -3417,22 +3499,22 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.68" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02dd99dc800bbb97186339685293e1cc5d9df1f8fae2d0aecd9ff1c77efea892" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.68" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7c61ec9a6f64d2793d8a45faba21efbe3ced62a886d44c36a009b2b519b4c7e" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -3508,9 +3590,9 @@ dependencies = [ [[package]] name = "tokio" -version = "1.41.0" +version = "1.41.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "145f3413504347a2be84393cc8a7d2fb4d863b375909ea59f2158261aa258bbb" +checksum = "22cfb5bee7a6a52939ca9224d6ac897bb669134078daa8735560897f69de4d33" dependencies = [ "backtrace", "bytes", @@ -3532,7 +3614,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -3583,6 +3665,21 @@ dependencies = [ "futures-util", "pin-project", "pin-project-lite", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2873938d487c3cfb9aed7546dc9f2711d867c9f90c46b889989a2cb84eba6b4f" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper", "tokio", "tokio-util", "tower-layer", @@ -3592,16 +3689,15 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.5.2" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" +checksum = "8437150ab6bbc8c5f0f519e3d5ed4aa883a83dd4cdd3d1b21f9482936046cb97" dependencies = [ - "base64 0.21.7", + "base64 0.22.1", "bitflags 2.6.0", "bytes", "http 1.1.0", "http-body 1.0.1", - "http-body-util", "mime", "pin-project-lite", "tower-layer", @@ -3641,7 +3737,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -3817,7 +3913,7 @@ checksum = "6a511871dc5de990a3b2a0e715facfbc5da848c0c0395597a1415029fb7c250a" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -3867,7 +3963,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.87", + "syn", "wasm-bindgen-shared", ] @@ -3889,7 +3985,7 @@ checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -4082,7 +4178,7 @@ checksum = "28cc31741b18cb6f1d5ff12f5b7523e3d6eb0852bbbad19d73905511d9849b95" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", "synstructure", ] @@ -4104,7 +4200,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -4124,7 +4220,7 @@ checksum = "0ea7b4a3637ea8669cedf0f1fd5c286a17f3de97b8dd5a70a6c167a1730e63a5" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", "synstructure", ] @@ -4145,7 +4241,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -4167,7 +4263,7 @@ checksum = "6eafa6dfb17584ea3e2bd6e76e0cc15ad7af12b09abdd1ca55961bed9b1063c6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn", ] [[package]] @@ -4354,7 +4450,7 @@ dependencies = [ "tempfile", "tokio", "tokio-rustls", - "tower", + "tower 0.4.13", "tracing", "tracing-subscriber", "vise-exporter", @@ -4414,7 +4510,7 @@ dependencies = [ "prost-reflect", "protox", "quote", - "syn 2.0.87", + "syn", ] [[package]] diff --git a/node/Cargo.toml b/node/Cargo.toml index 52f8bc3a..5d058b24 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -1,8 +1,8 @@ [workspace] members = [ - "actors/bft", - "actors/executor", - "actors/network", + "components/bft", + "components/executor", + "components/network", "libs/concurrency", "libs/crypto", "libs/protobuf", @@ -26,10 +26,10 @@ version = "0.5.0" [workspace.dependencies] # Crates from this repo. -zksync_consensus_bft = { version = "=0.5.0", path = "actors/bft" } +zksync_consensus_bft = { version = "=0.5.0", path = "components/bft" } zksync_consensus_crypto = { version = "=0.5.0", path = "libs/crypto" } -zksync_consensus_executor = { version = "=0.5.0", path = "actors/executor" } -zksync_consensus_network = { version = "=0.5.0", path = "actors/network" } +zksync_consensus_executor = { version = "=0.5.0", path = "components/executor" } +zksync_consensus_network = { version = "=0.5.0", path = "components/network" } zksync_consensus_roles = { version = "=0.5.0", path = "libs/roles" } zksync_consensus_storage = { version = "=0.5.0", path = "libs/storage" } zksync_consensus_tools = { version = "=0.5.0", path = "tools" } @@ -73,8 +73,8 @@ hyper-util = { version = "0.1", features = ["full"] } im = "15.1.0" jsonrpsee = { version = "0.23.0", features = ["http-client", "server"] } k256 = { version = "0.13", features = ["ecdsa"] } -k8s-openapi = { version = "0.22.0", features = ["latest"] } -kube = { version = "0.91.0", features = ["derive", "runtime"] } +k8s-openapi = { version = "0.23.0", features = ["latest"] } +kube = { version = "0.96.0", features = ["derive", "runtime"] } num-bigint = "0.4.4" num-traits = "0.2.18" once_cell = "1.17.1" diff --git a/node/actors/bft/src/io.rs b/node/actors/bft/src/io.rs deleted file mode 100644 index b29dc758..00000000 --- a/node/actors/bft/src/io.rs +++ /dev/null @@ -1,23 +0,0 @@ -//! Input and output messages for the Consensus actor. These are processed by the executor actor. - -use zksync_consensus_network::io::{ConsensusInputMessage, ConsensusReq}; - -/// All the messages that other actors can send to the Consensus actor. -#[derive(Debug)] -pub enum InputMessage { - /// Message types from the Network actor. - Network(ConsensusReq), -} - -/// All the messages that the Consensus actor sends to other actors. -#[derive(Debug, PartialEq)] -pub enum OutputMessage { - /// Message types to the Network actor. - Network(ConsensusInputMessage), -} - -impl From for OutputMessage { - fn from(message: ConsensusInputMessage) -> Self { - Self::Network(message) - } -} diff --git a/node/actors/bft/src/lib.rs b/node/actors/bft/src/lib.rs deleted file mode 100644 index 0075b0df..00000000 --- a/node/actors/bft/src/lib.rs +++ /dev/null @@ -1,102 +0,0 @@ -//! This crate contains the consensus actor, which is responsible for handling the logic that allows us to reach agreement on blocks. -//! It uses a new cosnensus algorithm developed at Matter Labs, called ChonkyBFT. You can find the specification of the algorithm [here](../../../../spec). - -use crate::io::{InputMessage, OutputMessage}; -use anyhow::Context; -pub use config::Config; -use std::sync::Arc; -use tracing::Instrument; -use zksync_concurrency::{ctx, error::Wrap as _, scope, sync}; -use zksync_consensus_roles::validator; -use zksync_consensus_utils::pipe::ActorPipe; - -/// This module contains the implementation of the ChonkyBFT algorithm. -mod chonky_bft; -mod config; -pub mod io; -mod metrics; -pub mod testonly; -#[cfg(test)] -mod tests; - -/// Protocol version of this BFT implementation. -pub const PROTOCOL_VERSION: validator::ProtocolVersion = validator::ProtocolVersion::CURRENT; - -/// Payload proposal and verification trait. -#[async_trait::async_trait] -pub trait PayloadManager: std::fmt::Debug + Send + Sync { - /// Used by leader to propose a payload for the next block. - async fn propose( - &self, - ctx: &ctx::Ctx, - number: validator::BlockNumber, - ) -> ctx::Result; - /// Used by replica to verify a payload for the next block proposed by the leader. - async fn verify( - &self, - ctx: &ctx::Ctx, - number: validator::BlockNumber, - payload: &validator::Payload, - ) -> ctx::Result<()>; -} - -impl Config { - /// Starts the bft actor. It will start running, processing incoming messages and - /// sending output messages. - pub async fn run( - self, - ctx: &ctx::Ctx, - mut pipe: ActorPipe, - ) -> anyhow::Result<()> { - let genesis = self.block_store.genesis(); - anyhow::ensure!(genesis.protocol_version == validator::ProtocolVersion::CURRENT); - genesis.verify().context("genesis().verify()")?; - - if let Some(prev) = genesis.first_block.prev() { - tracing::info!("Waiting for the pre-fork blocks to be persisted"); - if let Err(ctx::Canceled) = self.block_store.wait_until_persisted(ctx, prev).await { - return Ok(()); - } - } - - let cfg = Arc::new(self); - let (proposer_sender, proposer_receiver) = sync::watch::channel(None); - let (replica, replica_send) = - chonky_bft::StateMachine::start(ctx, cfg.clone(), pipe.send.clone(), proposer_sender) - .await?; - - let res = scope::run!(ctx, |ctx, s| async { - s.spawn_bg(async { replica.run(ctx).await.wrap("replica.run()") }); - s.spawn_bg(async { - chonky_bft::proposer::run_proposer(ctx, cfg.clone(), pipe.send, proposer_receiver) - .await - .wrap("run_proposer()") - }); - - tracing::info!("Starting consensus actor {:?}", cfg.secret_key.public()); - - // This is the infinite loop where the consensus actually runs. The validator waits for - // a message from the network and processes it accordingly. - loop { - async { - let InputMessage::Network(msg) = pipe - .recv - .recv(ctx) - .instrument(tracing::info_span!("wait_for_message")) - .await?; - - replica_send.send(msg); - - ctx::Ok(()) - } - .instrument(tracing::info_span!("bft_iter")) - .await?; - } - }) - .await; - match res { - Ok(()) | Err(ctx::Error::Canceled(_)) => Ok(()), - Err(ctx::Error::Internal(err)) => Err(err), - } - } -} diff --git a/node/actors/executor/src/io.rs b/node/actors/executor/src/io.rs deleted file mode 100644 index e6fb7dd4..00000000 --- a/node/actors/executor/src/io.rs +++ /dev/null @@ -1,70 +0,0 @@ -//! Module to manage the communication between actors. It simply converts and forwards messages from and to each different actor. -use zksync_concurrency::{ - ctx::{self, channel}, - scope, -}; -use zksync_consensus_bft::io::{ - InputMessage as ConsensusInputMessage, OutputMessage as ConsensusOutputMessage, -}; -use zksync_consensus_network::io::{ - InputMessage as NetworkInputMessage, OutputMessage as NetworkOutputMessage, -}; -use zksync_consensus_utils::pipe::DispatcherPipe; - -/// The IO dispatcher, it is the main struct to handle actor messages. It simply contains a sender and a receiver for -/// a pair of channels for each actor. This of course allows us to send and receive messages to and from each actor. -#[derive(Debug)] -pub(super) struct Dispatcher { - consensus_input: channel::UnboundedSender, - consensus_output: channel::UnboundedReceiver, - network_input: channel::UnboundedSender, - network_output: channel::UnboundedReceiver, -} - -impl Dispatcher { - /// Creates a new IO Dispatcher. - pub(super) fn new( - consensus_pipe: DispatcherPipe, - network_pipe: DispatcherPipe, - ) -> Self { - Dispatcher { - consensus_input: consensus_pipe.send, - consensus_output: consensus_pipe.recv, - network_input: network_pipe.send, - network_output: network_pipe.recv, - } - } - - /// Method to start the IO dispatcher. It is simply a loop to receive messages from the actors and then forward them. - pub(super) async fn run(mut self, ctx: &ctx::Ctx) { - let _: ctx::OrCanceled<()> = scope::run!(ctx, |ctx, s| async { - // Start a task to handle the messages from the consensus actor. - s.spawn(async { - while let Ok(msg) = self.consensus_output.recv(ctx).await { - match msg { - ConsensusOutputMessage::Network(message) => { - self.network_input.send(message.into()); - } - } - } - Ok(()) - }); - - // Start a task to handle the messages from the network actor. - s.spawn(async { - while let Ok(msg) = self.network_output.recv(ctx).await { - match msg { - NetworkOutputMessage::Consensus(message) => { - self.consensus_input - .send(ConsensusInputMessage::Network(message)); - } - } - } - Ok(()) - }); - - Ok(()) - }) - .await; - } -} diff --git a/node/actors/network/src/io.rs b/node/actors/network/src/io.rs deleted file mode 100644 index 6166deef..00000000 --- a/node/actors/network/src/io.rs +++ /dev/null @@ -1,40 +0,0 @@ -#![allow(missing_docs)] -use zksync_concurrency::oneshot; -use zksync_consensus_roles::validator; - -/// All the messages that other actors can send to the Network actor. -#[derive(Debug)] -pub enum InputMessage { - /// Message types from the Consensus actor. - Consensus(ConsensusInputMessage), -} - -/// Message types from the Consensus actor. -#[derive(Debug, PartialEq)] -pub struct ConsensusInputMessage { - pub message: validator::Signed, -} - -impl From for InputMessage { - fn from(message: ConsensusInputMessage) -> Self { - Self::Consensus(message) - } -} - -/// Consensus message received from the network. -#[derive(Debug)] -pub struct ConsensusReq { - /// Payload. - pub msg: validator::Signed, - /// Channel that should be used to notify network actor that - /// processing of this message has been completed. - /// Used for rate limiting. - pub ack: oneshot::Sender<()>, -} - -/// All the messages that the Network actor sends to other actors. -#[derive(Debug)] -pub enum OutputMessage { - /// Message to the Consensus actor. - Consensus(ConsensusReq), -} diff --git a/node/actors/network/src/state.rs b/node/actors/network/src/state.rs deleted file mode 100644 index 529b999a..00000000 --- a/node/actors/network/src/state.rs +++ /dev/null @@ -1 +0,0 @@ -//! Network actor maintaining a pool of outbound and inbound connections to other nodes. diff --git a/node/actors/bft/Cargo.toml b/node/components/bft/Cargo.toml similarity index 94% rename from node/actors/bft/Cargo.toml rename to node/components/bft/Cargo.toml index 70d09bd3..202c0657 100644 --- a/node/actors/bft/Cargo.toml +++ b/node/components/bft/Cargo.toml @@ -1,6 +1,6 @@ [package] authors.workspace = true -description = "ZKsync consensus bft actor" +description = "ZKsync consensus BFT component" edition.workspace = true homepage.workspace = true keywords.workspace = true diff --git a/node/actors/bft/src/chonky_bft/block.rs b/node/components/bft/src/chonky_bft/block.rs similarity index 100% rename from node/actors/bft/src/chonky_bft/block.rs rename to node/components/bft/src/chonky_bft/block.rs diff --git a/node/actors/bft/src/chonky_bft/commit.rs b/node/components/bft/src/chonky_bft/commit.rs similarity index 100% rename from node/actors/bft/src/chonky_bft/commit.rs rename to node/components/bft/src/chonky_bft/commit.rs diff --git a/node/actors/bft/src/chonky_bft/mod.rs b/node/components/bft/src/chonky_bft/mod.rs similarity index 85% rename from node/actors/bft/src/chonky_bft/mod.rs rename to node/components/bft/src/chonky_bft/mod.rs index 9c8e7790..71340dbb 100644 --- a/node/actors/bft/src/chonky_bft/mod.rs +++ b/node/components/bft/src/chonky_bft/mod.rs @@ -1,17 +1,10 @@ -use crate::{io::OutputMessage, metrics, Config}; +use crate::{metrics, Config, FromNetworkMessage, ToNetworkMessage}; use std::{ cmp::max, collections::{BTreeMap, HashMap}, sync::Arc, }; -use zksync_concurrency::{ - ctx, - error::Wrap as _, - metrics::LatencyHistogramExt as _, - sync::{self, prunable_mpsc::SelectionFunctionResult}, - time, -}; -use zksync_consensus_network::io::ConsensusReq; +use zksync_concurrency::{ctx, error::Wrap as _, metrics::LatencyHistogramExt as _, sync, time}; use zksync_consensus_roles::validator::{self, ConsensusMsg}; mod block; @@ -35,13 +28,13 @@ pub(crate) const VIEW_TIMEOUT_DURATION: time::Duration = time::Duration::millise pub(crate) struct StateMachine { /// Consensus configuration. pub(crate) config: Arc, - /// Pipe through which replica sends network messages. - pub(super) outbound_pipe: ctx::channel::UnboundedSender, - /// Pipe through which replica receives network requests. - pub(crate) inbound_pipe: sync::prunable_mpsc::Receiver, + /// Channel through which replica sends network messages. + pub(super) outbound_channel: ctx::channel::UnboundedSender, + /// Channel through which replica receives network requests. + pub(crate) inbound_channel: sync::prunable_mpsc::Receiver, /// The sender part of the proposer watch channel. This is used to notify the proposer loop /// and send the needed justification. - pub(crate) proposer_pipe: sync::watch::Sender>, + pub(crate) proposer_sender: sync::watch::Sender>, /// The current view number. pub(crate) view_number: validator::ViewNumber, @@ -83,9 +76,10 @@ impl StateMachine { pub(crate) async fn start( ctx: &ctx::Ctx, config: Arc, - outbound_pipe: ctx::channel::UnboundedSender, - proposer_pipe: sync::watch::Sender>, - ) -> ctx::Result<(Self, sync::prunable_mpsc::Sender)> { + outbound_channel: ctx::channel::UnboundedSender, + inbound_channel: sync::prunable_mpsc::Receiver, + proposer_sender: sync::watch::Sender>, + ) -> ctx::Result { let backup = config.replica_store.state(ctx).await?; let mut block_proposal_cache: BTreeMap<_, HashMap<_, _>> = BTreeMap::new(); @@ -96,16 +90,11 @@ impl StateMachine { .insert(proposal.payload.hash(), proposal.payload); } - let (send, recv) = sync::prunable_mpsc::channel( - StateMachine::inbound_filter_predicate, - StateMachine::inbound_selection_function, - ); - let this = Self { config, - outbound_pipe, - inbound_pipe: recv, - proposer_pipe, + outbound_channel, + inbound_channel, + proposer_sender, view_number: backup.view, phase: backup.phase, high_vote: backup.high_vote, @@ -120,7 +109,7 @@ impl StateMachine { view_start: ctx.now(), }; - Ok((this, send)) + Ok(this) } /// Runs a loop to process incoming messages (may be `None` if the channel times out while waiting for a message). @@ -140,7 +129,7 @@ impl StateMachine { // Main loop. loop { let recv = self - .inbound_pipe + .inbound_channel .recv(&ctx.with_deadline(self.view_timeout)) .await; @@ -274,34 +263,12 @@ impl StateMachine { }; metrics::METRICS.message_processing_latency[&label].observe_latency(ctx.now() - now); - // Notify network actor that the message has been processed. + // Notify network component that the message has been processed. // Ignore sending error. let _ = req.ack.send(()); } } - fn inbound_filter_predicate(new_req: &ConsensusReq) -> bool { - // Verify message signature - new_req.msg.verify().is_ok() - } - - fn inbound_selection_function( - old_req: &ConsensusReq, - new_req: &ConsensusReq, - ) -> SelectionFunctionResult { - if old_req.msg.key != new_req.msg.key || old_req.msg.msg.label() != new_req.msg.msg.label() - { - SelectionFunctionResult::Keep - } else { - // Discard older message - if old_req.msg.msg.view().number < new_req.msg.msg.view().number { - SelectionFunctionResult::DiscardOld - } else { - SelectionFunctionResult::DiscardNew - } - } - } - /// Processes a (already verified) CommitQC. It bumps the local high_commit_qc and if /// we have the proposal corresponding to this qc, we save the corresponding block to DB. pub(crate) async fn process_commit_qc( diff --git a/node/actors/bft/src/chonky_bft/new_view.rs b/node/components/bft/src/chonky_bft/new_view.rs similarity index 98% rename from node/actors/bft/src/chonky_bft/new_view.rs rename to node/components/bft/src/chonky_bft/new_view.rs index ff401ebb..c8107dd1 100644 --- a/node/actors/bft/src/chonky_bft/new_view.rs +++ b/node/components/bft/src/chonky_bft/new_view.rs @@ -115,7 +115,7 @@ impl StateMachine { // Update the state machine. self.view_number = view; self.phase = validator::Phase::Prepare; - self.proposer_pipe + self.proposer_sender .send(Some(self.get_justification())) .expect("justification_watch.send() failed"); @@ -139,7 +139,7 @@ impl StateMachine { }, )), }; - self.outbound_pipe.send(output_message.into()); + self.outbound_channel.send(output_message); // Log the event and update the metrics. tracing::info!("Starting view {}", self.view_number); diff --git a/node/actors/bft/src/chonky_bft/proposal.rs b/node/components/bft/src/chonky_bft/proposal.rs similarity index 99% rename from node/actors/bft/src/chonky_bft/proposal.rs rename to node/components/bft/src/chonky_bft/proposal.rs index b89f09d8..7da6962c 100644 --- a/node/actors/bft/src/chonky_bft/proposal.rs +++ b/node/components/bft/src/chonky_bft/proposal.rs @@ -250,7 +250,7 @@ impl StateMachine { .secret_key .sign_msg(validator::ConsensusMsg::ReplicaCommit(commit_vote)), }; - self.outbound_pipe.send(output_message.into()); + self.outbound_channel.send(output_message); Ok(()) } diff --git a/node/actors/bft/src/chonky_bft/proposer.rs b/node/components/bft/src/chonky_bft/proposer.rs similarity index 94% rename from node/actors/bft/src/chonky_bft/proposer.rs rename to node/components/bft/src/chonky_bft/proposer.rs index 4a6dd843..ec9e0e33 100644 --- a/node/actors/bft/src/chonky_bft/proposer.rs +++ b/node/components/bft/src/chonky_bft/proposer.rs @@ -1,4 +1,4 @@ -use crate::{io::OutputMessage, metrics, Config}; +use crate::{metrics, Config, ToNetworkMessage}; use std::sync::Arc; use zksync_concurrency::{ctx, error::Wrap as _, sync}; use zksync_consensus_network::io::ConsensusInputMessage; @@ -11,7 +11,7 @@ use super::VIEW_TIMEOUT_DURATION; pub(crate) async fn run_proposer( ctx: &ctx::Ctx, cfg: Arc, - outbound_pipe: ctx::channel::UnboundedSender, + network_sender: ctx::channel::UnboundedSender, mut justification_watch: sync::watch::Receiver>, ) -> ctx::Result<()> { loop { @@ -50,7 +50,7 @@ pub(crate) async fn run_proposer( .secret_key .sign_msg(validator::ConsensusMsg::LeaderProposal(proposal)); - outbound_pipe.send(ConsensusInputMessage { message: msg }.into()); + network_sender.send(ConsensusInputMessage { message: msg }); } } diff --git a/node/actors/bft/src/chonky_bft/testonly.rs b/node/components/bft/src/chonky_bft/testonly.rs similarity index 90% rename from node/actors/bft/src/chonky_bft/testonly.rs rename to node/components/bft/src/chonky_bft/testonly.rs index 2367d1e7..686c971b 100644 --- a/node/actors/bft/src/chonky_bft/testonly.rs +++ b/node/components/bft/src/chonky_bft/testonly.rs @@ -1,15 +1,13 @@ use crate::testonly::RandomPayload; use crate::{ chonky_bft::{self, commit, new_view, proposal, timeout, StateMachine}, - io::OutputMessage, Config, PayloadManager, }; +use crate::{create_input_channel, FromNetworkMessage, ToNetworkMessage}; use assert_matches::assert_matches; use std::sync::Arc; use zksync_concurrency::sync::prunable_mpsc; use zksync_concurrency::{ctx, sync}; -use zksync_consensus_network as network; -use zksync_consensus_network::io::ConsensusReq; use zksync_consensus_roles::validator; use zksync_consensus_storage::{ testonly::{in_memory, TestMemoryStorage}, @@ -28,9 +26,9 @@ pub(crate) const MAX_PAYLOAD_SIZE: usize = 1000; pub(crate) struct UTHarness { pub(crate) replica: StateMachine, pub(crate) keys: Vec, - pub(crate) outbound_pipe: ctx::channel::UnboundedReceiver, - pub(crate) inbound_pipe: prunable_mpsc::Sender, - pub(crate) _proposer_pipe: sync::watch::Receiver>, + pub(crate) outbound_channel: ctx::channel::UnboundedReceiver, + pub(crate) inbound_channel: prunable_mpsc::Sender, + pub(crate) _proposer_channel: sync::watch::Receiver>, } impl UTHarness { @@ -63,7 +61,8 @@ impl UTHarness { let rng = &mut ctx.rng(); let setup = validator::testonly::Setup::new(rng, num_validators); let store = TestMemoryStorage::new(ctx, &setup).await; - let (send, recv) = ctx::channel::unbounded(); + let (output_channel_send, output_channel_recv) = ctx::channel::unbounded(); + let (input_channel_send, input_channel_recv) = create_input_channel(); let (proposer_sender, proposer_receiver) = sync::watch::channel(None); let cfg = Arc::new(Config { @@ -73,16 +72,21 @@ impl UTHarness { payload_manager, max_payload_size: MAX_PAYLOAD_SIZE, }); - let (replica, input_pipe) = - StateMachine::start(ctx, cfg.clone(), send.clone(), proposer_sender) - .await - .unwrap(); + let replica = StateMachine::start( + ctx, + cfg.clone(), + output_channel_send.clone(), + input_channel_recv, + proposer_sender, + ) + .await + .unwrap(); let mut this = UTHarness { replica, keys: setup.validator_keys.clone(), - outbound_pipe: recv, - inbound_pipe: input_pipe, - _proposer_pipe: proposer_receiver, + outbound_channel: output_channel_recv, + inbound_channel: input_channel_send, + _proposer_channel: proposer_receiver, }; let timeout = this.new_replica_timeout(ctx).await; @@ -290,17 +294,15 @@ impl UTHarness { } pub(crate) fn send(&self, msg: validator::Signed) { - self.inbound_pipe.send(ConsensusReq { + self.inbound_channel.send(FromNetworkMessage { msg, ack: zksync_concurrency::oneshot::channel().0, }); } fn try_recv>(&mut self) -> Option> { - self.outbound_pipe.try_recv().map(|message| match message { - OutputMessage::Network(network::io::ConsensusInputMessage { message, .. }) => { - message.cast().unwrap() - } - }) + self.outbound_channel + .try_recv() + .map(|message| message.message.cast().unwrap()) } } diff --git a/node/actors/bft/src/chonky_bft/tests/commit.rs b/node/components/bft/src/chonky_bft/tests/commit.rs similarity index 97% rename from node/actors/bft/src/chonky_bft/tests/commit.rs rename to node/components/bft/src/chonky_bft/tests/commit.rs index 0d65d110..3ee2c911 100644 --- a/node/actors/bft/src/chonky_bft/tests/commit.rs +++ b/node/components/bft/src/chonky_bft/tests/commit.rs @@ -359,7 +359,10 @@ async fn replica_commit_filter_functions_test() { util.send(msg.clone()); // Validate only correct message is received - assert_eq!(util.replica.inbound_pipe.recv(ctx).await.unwrap().msg, msg); + assert_eq!( + util.replica.inbound_channel.recv(ctx).await.unwrap().msg, + msg + ); // Send a msg with view number = 2 let mut replica_commit_from_view_2 = replica_commit.clone(); @@ -393,7 +396,7 @@ async fn replica_commit_filter_functions_test() { // Validate only message from view 4 is received assert_eq!( - util.replica.inbound_pipe.recv(ctx).await.unwrap().msg, + util.replica.inbound_channel.recv(ctx).await.unwrap().msg, msg_from_view_4 ); @@ -409,13 +412,13 @@ async fn replica_commit_filter_functions_test() { )); util.send(msg_from_validator_1.clone()); - //Validate both are present in the inbound_pipe + //Validate both are present in the inbound_channel. assert_eq!( - util.replica.inbound_pipe.recv(ctx).await.unwrap().msg, + util.replica.inbound_channel.recv(ctx).await.unwrap().msg, msg_from_validator_0 ); assert_eq!( - util.replica.inbound_pipe.recv(ctx).await.unwrap().msg, + util.replica.inbound_channel.recv(ctx).await.unwrap().msg, msg_from_validator_1 ); diff --git a/node/actors/bft/src/chonky_bft/tests/mod.rs b/node/components/bft/src/chonky_bft/tests/mod.rs similarity index 100% rename from node/actors/bft/src/chonky_bft/tests/mod.rs rename to node/components/bft/src/chonky_bft/tests/mod.rs diff --git a/node/actors/bft/src/chonky_bft/tests/new_view.rs b/node/components/bft/src/chonky_bft/tests/new_view.rs similarity index 100% rename from node/actors/bft/src/chonky_bft/tests/new_view.rs rename to node/components/bft/src/chonky_bft/tests/new_view.rs diff --git a/node/actors/bft/src/chonky_bft/tests/proposal.rs b/node/components/bft/src/chonky_bft/tests/proposal.rs similarity index 100% rename from node/actors/bft/src/chonky_bft/tests/proposal.rs rename to node/components/bft/src/chonky_bft/tests/proposal.rs diff --git a/node/actors/bft/src/chonky_bft/tests/timeout.rs b/node/components/bft/src/chonky_bft/tests/timeout.rs similarity index 97% rename from node/actors/bft/src/chonky_bft/tests/timeout.rs rename to node/components/bft/src/chonky_bft/tests/timeout.rs index 0ee2dc9f..af778b69 100644 --- a/node/actors/bft/src/chonky_bft/tests/timeout.rs +++ b/node/components/bft/src/chonky_bft/tests/timeout.rs @@ -372,7 +372,10 @@ async fn replica_timeout_filter_functions_test() { util.send(msg.clone()); // Validate only correct message is received - assert_eq!(util.replica.inbound_pipe.recv(ctx).await.unwrap().msg, msg); + assert_eq!( + util.replica.inbound_channel.recv(ctx).await.unwrap().msg, + msg + ); // Send a msg with view number = 2 let mut replica_timeout_from_view_2 = replica_timeout.clone(); @@ -406,7 +409,7 @@ async fn replica_timeout_filter_functions_test() { // Validate only message from view 4 is received assert_eq!( - util.replica.inbound_pipe.recv(ctx).await.unwrap().msg, + util.replica.inbound_channel.recv(ctx).await.unwrap().msg, msg_from_view_4 ); @@ -422,13 +425,13 @@ async fn replica_timeout_filter_functions_test() { )); util.send(msg_from_validator_1.clone()); - // Validate both are present in the inbound_pipe + // Validate both are present in the inbound_channel assert_eq!( - util.replica.inbound_pipe.recv(ctx).await.unwrap().msg, + util.replica.inbound_channel.recv(ctx).await.unwrap().msg, msg_from_validator_0 ); assert_eq!( - util.replica.inbound_pipe.recv(ctx).await.unwrap().msg, + util.replica.inbound_channel.recv(ctx).await.unwrap().msg, msg_from_validator_1 ); diff --git a/node/actors/bft/src/chonky_bft/timeout.rs b/node/components/bft/src/chonky_bft/timeout.rs similarity index 99% rename from node/actors/bft/src/chonky_bft/timeout.rs rename to node/components/bft/src/chonky_bft/timeout.rs index 189e3c16..43bb4919 100644 --- a/node/actors/bft/src/chonky_bft/timeout.rs +++ b/node/components/bft/src/chonky_bft/timeout.rs @@ -176,7 +176,7 @@ impl StateMachine { )), }; - self.outbound_pipe.send(output_message.into()); + self.outbound_channel.send(output_message); // Log the event. tracing::info!("Timed out at view {}", self.view_number); diff --git a/node/actors/bft/src/config.rs b/node/components/bft/src/config.rs similarity index 95% rename from node/actors/bft/src/config.rs rename to node/components/bft/src/config.rs index 77938549..ea83d2fe 100644 --- a/node/actors/bft/src/config.rs +++ b/node/components/bft/src/config.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use zksync_consensus_roles::validator; use zksync_consensus_storage as storage; -/// Configuration of the bft actor. +/// Configuration of the bft component. #[derive(Debug)] pub struct Config { /// The validator's secret key. diff --git a/node/components/bft/src/lib.rs b/node/components/bft/src/lib.rs new file mode 100644 index 00000000..fa98ae4b --- /dev/null +++ b/node/components/bft/src/lib.rs @@ -0,0 +1,135 @@ +//! This crate contains the consensus component, which is responsible for handling the logic that allows us to reach agreement on blocks. +//! It uses a new cosnensus algorithm developed at Matter Labs, called ChonkyBFT. You can find the specification of the algorithm [here](../../../../spec). + +use anyhow::Context; +pub use config::Config; +use std::sync::Arc; +use zksync_concurrency::{ + ctx, + error::Wrap as _, + scope, + sync::{self, prunable_mpsc::SelectionFunctionResult}, +}; +use zksync_consensus_roles::validator; + +/// This module contains the implementation of the ChonkyBFT algorithm. +mod chonky_bft; +mod config; +mod metrics; +pub mod testonly; +#[cfg(test)] +mod tests; + +/// Protocol version of this BFT implementation. +pub const PROTOCOL_VERSION: validator::ProtocolVersion = validator::ProtocolVersion::CURRENT; + +// Renaming network messages for clarity. +#[allow(missing_docs)] +pub type ToNetworkMessage = zksync_consensus_network::io::ConsensusInputMessage; +#[allow(missing_docs)] +pub type FromNetworkMessage = zksync_consensus_network::io::ConsensusReq; + +/// Payload proposal and verification trait. +#[async_trait::async_trait] +pub trait PayloadManager: std::fmt::Debug + Send + Sync { + /// Used by leader to propose a payload for the next block. + async fn propose( + &self, + ctx: &ctx::Ctx, + number: validator::BlockNumber, + ) -> ctx::Result; + /// Used by replica to verify a payload for the next block proposed by the leader. + async fn verify( + &self, + ctx: &ctx::Ctx, + number: validator::BlockNumber, + payload: &validator::Payload, + ) -> ctx::Result<()>; +} + +impl Config { + /// Starts the bft component. It will start running, processing incoming messages and + /// sending output messages. + pub async fn run( + self, + ctx: &ctx::Ctx, + outbound_channel: ctx::channel::UnboundedSender, + inbound_channel: sync::prunable_mpsc::Receiver, + ) -> anyhow::Result<()> { + let genesis = self.block_store.genesis(); + anyhow::ensure!(genesis.protocol_version == validator::ProtocolVersion::CURRENT); + genesis.verify().context("genesis().verify()")?; + + if let Some(prev) = genesis.first_block.prev() { + tracing::info!("Waiting for the pre-fork blocks to be persisted"); + if let Err(ctx::Canceled) = self.block_store.wait_until_persisted(ctx, prev).await { + return Ok(()); + } + } + + let cfg = Arc::new(self); + let (proposer_sender, proposer_receiver) = sync::watch::channel(None); + let replica = chonky_bft::StateMachine::start( + ctx, + cfg.clone(), + outbound_channel.clone(), + inbound_channel, + proposer_sender, + ) + .await?; + + let res = scope::run!(ctx, |ctx, s| async { + tracing::info!("Starting consensus component {:?}", cfg.secret_key.public()); + + s.spawn(async { replica.run(ctx).await.wrap("replica.run()") }); + s.spawn_bg(async { + chonky_bft::proposer::run_proposer( + ctx, + cfg.clone(), + outbound_channel, + proposer_receiver, + ) + .await + .wrap("run_proposer()") + }); + + Ok(()) + }) + .await; + match res { + Ok(()) | Err(ctx::Error::Canceled(_)) => Ok(()), + Err(ctx::Error::Internal(err)) => Err(err), + } + } +} + +/// Creates a new input channel for the network messages. +pub fn create_input_channel() -> ( + sync::prunable_mpsc::Sender, + sync::prunable_mpsc::Receiver, +) { + sync::prunable_mpsc::channel(inbound_filter_predicate, inbound_selection_function) +} + +/// Filter predicate for incoming messages. +fn inbound_filter_predicate(new_req: &FromNetworkMessage) -> bool { + // Verify message signature + new_req.msg.verify().is_ok() +} + +/// Selection function for incoming messages. +fn inbound_selection_function( + old_req: &FromNetworkMessage, + new_req: &FromNetworkMessage, +) -> SelectionFunctionResult { + if old_req.msg.key != new_req.msg.key || old_req.msg.msg.label() != new_req.msg.msg.label() { + SelectionFunctionResult::Keep + } else { + // Discard older message + if old_req.msg.msg.view().number < new_req.msg.msg.view().number { + SelectionFunctionResult::DiscardOld + } else { + SelectionFunctionResult::DiscardNew + } + } +} diff --git a/node/actors/bft/src/metrics.rs b/node/components/bft/src/metrics.rs similarity index 100% rename from node/actors/bft/src/metrics.rs rename to node/components/bft/src/metrics.rs diff --git a/node/actors/bft/src/testonly/make.rs b/node/components/bft/src/testonly/make.rs similarity index 79% rename from node/actors/bft/src/testonly/make.rs rename to node/components/bft/src/testonly/make.rs index 13382860..6ef10a0c 100644 --- a/node/actors/bft/src/testonly/make.rs +++ b/node/components/bft/src/testonly/make.rs @@ -1,23 +1,9 @@ //! This module contains utilities that are only meant for testing purposes. -use crate::io::InputMessage; use crate::PayloadManager; -use rand::{distributions::Standard, prelude::Distribution, Rng}; +use rand::Rng; use zksync_concurrency::ctx; -use zksync_concurrency::oneshot; -use zksync_consensus_network::io::ConsensusReq; use zksync_consensus_roles::validator; -// Generates a random InputMessage. -impl Distribution for Standard { - fn sample(&self, rng: &mut R) -> InputMessage { - let (send, _) = oneshot::channel(); - InputMessage::Network(ConsensusReq { - msg: rng.gen(), - ack: send, - }) - } -} - /// Produces random payload of a given size. #[derive(Debug)] pub struct RandomPayload(pub usize); diff --git a/node/actors/bft/src/testonly/mod.rs b/node/components/bft/src/testonly/mod.rs similarity index 100% rename from node/actors/bft/src/testonly/mod.rs rename to node/components/bft/src/testonly/mod.rs diff --git a/node/actors/bft/src/testonly/node.rs b/node/components/bft/src/testonly/node.rs similarity index 54% rename from node/actors/bft/src/testonly/node.rs rename to node/components/bft/src/testonly/node.rs index 49b15314..45696136 100644 --- a/node/actors/bft/src/testonly/node.rs +++ b/node/components/bft/src/testonly/node.rs @@ -1,11 +1,11 @@ -use crate::{io, testonly, PayloadManager}; +use crate::{testonly, FromNetworkMessage, PayloadManager, ToNetworkMessage}; use anyhow::Context as _; use std::sync::Arc; -use zksync_concurrency::{ctx, scope}; +use zksync_concurrency::ctx::channel; +use zksync_concurrency::{ctx, scope, sync}; use zksync_consensus_network as network; use zksync_consensus_storage as storage; use zksync_consensus_storage::testonly::in_memory; -use zksync_consensus_utils::pipe; pub(crate) const MAX_PAYLOAD_SIZE: usize = 1000; @@ -41,18 +41,16 @@ impl Node { pub(crate) async fn run( &self, ctx: &ctx::Ctx, - network_pipe: &mut pipe::DispatcherPipe< - network::io::InputMessage, - network::io::OutputMessage, - >, + consensus_receiver: sync::prunable_mpsc::Receiver, + consensus_sender: channel::UnboundedSender, ) -> anyhow::Result<()> { - let net_recv = &mut network_pipe.recv; - let net_send = &mut network_pipe.send; - let (consensus_actor_pipe, consensus_pipe) = pipe::new(); - let mut con_recv = consensus_pipe.recv; - let con_send = consensus_pipe.send; scope::run!(ctx, |ctx, s| async { - // Run the consensus actor + // Create a channel for the consensus component to send messages to the network. + // We will use this extra channel to filter messages depending on the nodes + // behavior. + let (net_send, mut net_recv) = channel::unbounded(); + + // Run the consensus component s.spawn(async { let validator_key = self.net.validator_key.clone().unwrap(); crate::Config { @@ -62,35 +60,19 @@ impl Node { payload_manager: self.behavior.payload_manager(), max_payload_size: MAX_PAYLOAD_SIZE, } - .run(ctx, consensus_actor_pipe) + .run(ctx, net_send, consensus_receiver) .await .context("consensus.run()") }); - // Forward input messages received from the network to the actor; - // turns output from others to input for this instance. - s.spawn(async { - while let Ok(network_message) = net_recv.recv(ctx).await { - match network_message { - network::io::OutputMessage::Consensus(req) => { - con_send.send(io::InputMessage::Network(req)); - } - } - } - Ok(()) - }); - // Forward output messages from the actor to the network; + + // Forward output messages from the consensus to the network; // turns output from this to inputs for others. // Get the next message from the channel. Our response depends on what type of replica we are. - while let Ok(msg) = con_recv.recv(ctx).await { - match msg { - io::OutputMessage::Network(message) => { - let message_to_send = match self.behavior { - Behavior::Offline => continue, - Behavior::Honest | Behavior::HonestNotProposing => message, - }; - net_send.send(message_to_send.into()); - } - } + while let Ok(msg) = net_recv.recv(ctx).await { + match self.behavior { + Behavior::Offline => (), + Behavior::Honest | Behavior::HonestNotProposing => consensus_sender.send(msg), + }; } Ok(()) }) diff --git a/node/actors/bft/src/testonly/run.rs b/node/components/bft/src/testonly/run.rs similarity index 89% rename from node/actors/bft/src/testonly/run.rs rename to node/components/bft/src/testonly/run.rs index f303a0d4..0be99713 100644 --- a/node/actors/bft/src/testonly/run.rs +++ b/node/components/bft/src/testonly/run.rs @@ -1,6 +1,7 @@ use super::{Behavior, Node}; +use crate::{FromNetworkMessage, ToNetworkMessage}; use anyhow::Context as _; -use network::{io, Config}; +use network::Config; use rand::seq::SliceRandom; use std::{ collections::{HashMap, HashSet}, @@ -12,12 +13,11 @@ use zksync_concurrency::{ self, channel::{self, UnboundedReceiver, UnboundedSender}, }, - oneshot, scope, + oneshot, scope, sync, }; use zksync_consensus_network::{self as network}; use zksync_consensus_roles::{validator, validator::testonly::Setup}; use zksync_consensus_storage::{testonly::TestMemoryStorage, BlockStore}; -use zksync_consensus_utils::pipe; pub(crate) enum Network { Real, @@ -199,8 +199,16 @@ async fn run_nodes_real(ctx: &ctx::Ctx, specs: &[Node]) -> anyhow::Result<()> { scope::run!(ctx, |ctx, s| async { let mut nodes = vec![]; for (i, spec) in specs.iter().enumerate() { - let (node, runner) = - network::testonly::Instance::new(spec.net.clone(), spec.block_store.clone()); + let (send, recv) = sync::prunable_mpsc::channel( + crate::inbound_filter_predicate, + crate::inbound_selection_function, + ); + let (node, runner) = network::testonly::Instance::new_with_channel( + spec.net.clone(), + spec.block_store.clone(), + send, + recv, + ); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); nodes.push(node); } @@ -209,8 +217,9 @@ async fn run_nodes_real(ctx: &ctx::Ctx, specs: &[Node]) -> anyhow::Result<()> { let spec = &specs[i]; s.spawn( async { - let mut node = node; - spec.run(ctx, node.pipe()).await + let node = node; + spec.run(ctx, node.consensus_receiver, node.consensus_sender) + .await } .instrument(tracing::info_span!("node", i)), ); @@ -242,14 +251,16 @@ async fn run_nodes_twins( let mut gossip_targets = HashMap::new(); for (i, spec) in specs.iter().enumerate() { - let (actor_pipe, dispatcher_pipe) = pipe::new(); + let (output_channel_send, output_channel_recv) = channel::unbounded(); + let (input_channel_send, input_channel_recv) = crate::create_input_channel(); + let validator_key = spec.net.validator_key.as_ref().unwrap().public(); let port = spec.net.server_addr.port(); validator_ports.entry(validator_key).or_default().push(port); - sends.insert(port, actor_pipe.send); - recvs.push((port, actor_pipe.recv)); + sends.insert(port, input_channel_send); + recvs.push((port, output_channel_recv)); stores.insert(port, spec.block_store.clone()); gossip_targets.insert( port, @@ -264,15 +275,10 @@ async fn run_nodes_twins( .collect(), ); - // Run consensus; the dispatcher pipe is its network connection, which means we can use the actor pipe to: - // * send Output messages from other actors to this consensus instance - // * receive Input messages sent by this consensus to the other actors + // Run consensus node. s.spawn( - async { - let mut network_pipe = dispatcher_pipe; - spec.run(ctx, &mut network_pipe).await - } - .instrument(tracing::info_span!("node", i, port)), + async { spec.run(ctx, input_channel_recv, output_channel_send).await } + .instrument(tracing::info_span!("node", i, port)), ); } // Taking these references is necessary for the `scope::run!` environment lifetime rules to compile @@ -316,7 +322,7 @@ async fn run_nodes_twins( .await } -/// Receive input messages from the consensus actor and send them to the others +/// Receive input messages from the consensus component and send them to the others /// according to the partition schedule of the port associated with this instance. /// /// We have to simulate the gossip layer which isn't instantiated by these tests. @@ -325,15 +331,15 @@ async fn run_nodes_twins( async fn twins_receive_loop( ctx: &ctx::Ctx, router: &PortRouter, - sends: &HashMap>, + sends: &HashMap>, gossip: TwinsGossipConfig<'_>, port: Port, - mut recv: UnboundedReceiver, + mut recv: UnboundedReceiver, ) -> anyhow::Result<()> { let rng = &mut ctx.rng(); // Finalized block number iff this node can gossip to the target and the message contains a QC. - let block_to_gossip = |target_port: Port, msg: &io::OutputMessage| { + let block_to_gossip = |target_port: Port, msg: &FromNetworkMessage| { if !gossip.targets.contains(&target_port) { return None; } @@ -355,10 +361,10 @@ async fn twins_receive_loop( // partitioned, and then unstash all previous A-to-B messages. // * If that wouldn't be acceptable then we could have some kind of global view of stashed messages // and unstash them as soon as someone moves on to a new view. - let mut stashes: HashMap> = HashMap::new(); + let mut stashes: HashMap> = HashMap::new(); // Either stash a message, or unstash all messages and send them to the target along with the new one. - let mut send_or_stash = |can_send: bool, target_port: Port, msg: io::OutputMessage| { + let mut send_or_stash = |can_send: bool, target_port: Port, msg: FromNetworkMessage| { let stash = stashes.entry(target_port).or_default(); let view = output_msg_view_number(&msg); let kind = output_msg_label(&msg); @@ -401,15 +407,13 @@ async fn twins_receive_loop( send(msg); }; - while let Ok(io::InputMessage::Consensus(message)) = recv.recv(ctx).await { + while let Ok(message) = recv.recv(ctx).await { let view = message.message.msg.view().number.0 as usize; let kind = message.message.msg.label(); - let msg = || { - io::OutputMessage::Consensus(io::ConsensusReq { - msg: message.message.clone(), - ack: oneshot::channel().0, - }) + let msg = || FromNetworkMessage { + msg: message.message.clone(), + ack: oneshot::channel().0, }; let can_send = |to| { @@ -490,28 +494,22 @@ async fn twins_gossip_loop( .await } -fn output_msg_view_number(msg: &io::OutputMessage) -> validator::ViewNumber { - match msg { - io::OutputMessage::Consensus(cr) => cr.msg.msg.view().number, - } +fn output_msg_view_number(msg: &FromNetworkMessage) -> validator::ViewNumber { + msg.msg.msg.view().number } -fn output_msg_label(msg: &io::OutputMessage) -> &str { - match msg { - io::OutputMessage::Consensus(cr) => cr.msg.msg.label(), - } +fn output_msg_label(msg: &FromNetworkMessage) -> &str { + msg.msg.msg.label() } -fn output_msg_commit_qc(msg: &io::OutputMessage) -> Option<&validator::CommitQC> { +fn output_msg_commit_qc(msg: &FromNetworkMessage) -> Option<&validator::CommitQC> { use validator::ConsensusMsg; - let justification = match msg { - io::OutputMessage::Consensus(cr) => match &cr.msg.msg { - ConsensusMsg::ReplicaTimeout(msg) => return msg.high_qc.as_ref(), - ConsensusMsg::ReplicaCommit(_) => return None, - ConsensusMsg::ReplicaNewView(msg) => &msg.justification, - ConsensusMsg::LeaderProposal(msg) => &msg.justification, - }, + let justification = match &msg.msg.msg { + ConsensusMsg::ReplicaTimeout(msg) => return msg.high_qc.as_ref(), + ConsensusMsg::ReplicaCommit(_) => return None, + ConsensusMsg::ReplicaNewView(msg) => &msg.justification, + ConsensusMsg::LeaderProposal(msg) => &msg.justification, }; match justification { diff --git a/node/actors/bft/src/testonly/twins/mod.rs b/node/components/bft/src/testonly/twins/mod.rs similarity index 100% rename from node/actors/bft/src/testonly/twins/mod.rs rename to node/components/bft/src/testonly/twins/mod.rs diff --git a/node/actors/bft/src/testonly/twins/partition.rs b/node/components/bft/src/testonly/twins/partition.rs similarity index 100% rename from node/actors/bft/src/testonly/twins/partition.rs rename to node/components/bft/src/testonly/twins/partition.rs diff --git a/node/actors/bft/src/testonly/twins/scenario.rs b/node/components/bft/src/testonly/twins/scenario.rs similarity index 100% rename from node/actors/bft/src/testonly/twins/scenario.rs rename to node/components/bft/src/testonly/twins/scenario.rs diff --git a/node/actors/bft/src/testonly/twins/tests.rs b/node/components/bft/src/testonly/twins/tests.rs similarity index 100% rename from node/actors/bft/src/testonly/twins/tests.rs rename to node/components/bft/src/testonly/twins/tests.rs diff --git a/node/actors/bft/src/tests/mod.rs b/node/components/bft/src/tests/mod.rs similarity index 100% rename from node/actors/bft/src/tests/mod.rs rename to node/components/bft/src/tests/mod.rs diff --git a/node/actors/bft/src/tests/twins.rs b/node/components/bft/src/tests/twins.rs similarity index 98% rename from node/actors/bft/src/tests/twins.rs rename to node/components/bft/src/tests/twins.rs index 45603e51..90be381c 100644 --- a/node/actors/bft/src/tests/twins.rs +++ b/node/components/bft/src/tests/twins.rs @@ -254,12 +254,15 @@ async fn twins_network_w2_twins_w_partitions(num_reseeds: usize) { } /// Run Twins scenario with more twins than tolerable and expect it to fail. +/// Disabled by default since it can take a long time to run depending on +/// the initial seed. +#[ignore] #[tokio::test] async fn twins_network_to_fail() { tokio::time::pause(); assert_matches!( // All twins! To find a conflict quicker. - run_twins(6, 6, TwinsScenarios::Multiple(150)).await, + run_twins(6, 6, TwinsScenarios::Multiple(200)).await, Err(TestError::BlockConflict) ); } diff --git a/node/actors/executor/Cargo.toml b/node/components/executor/Cargo.toml similarity index 93% rename from node/actors/executor/Cargo.toml rename to node/components/executor/Cargo.toml index a652df00..c8feec5f 100644 --- a/node/actors/executor/Cargo.toml +++ b/node/components/executor/Cargo.toml @@ -1,6 +1,6 @@ [package] authors.workspace = true -description = "ZKsync consensus executor actor" +description = "ZKsync consensus executor component" edition.workspace = true homepage.workspace = true keywords.workspace = true diff --git a/node/actors/executor/src/lib.rs b/node/components/executor/src/lib.rs similarity index 87% rename from node/actors/executor/src/lib.rs rename to node/components/executor/src/lib.rs index 0bf64484..7fb27892 100644 --- a/node/actors/executor/src/lib.rs +++ b/node/components/executor/src/lib.rs @@ -1,5 +1,4 @@ //! Library files for the executor. We have it separate from the binary so that we can use these files in the tools crate. -use crate::io::Dispatcher; use anyhow::Context as _; pub use network::{gossip::attestation, RpcConfig}; use std::{ @@ -11,10 +10,8 @@ use zksync_consensus_bft as bft; use zksync_consensus_network as network; use zksync_consensus_roles::{node, validator}; use zksync_consensus_storage::{BlockStore, ReplicaStore}; -use zksync_consensus_utils::pipe; use zksync_protobuf::kB; -mod io; #[cfg(test)] mod tests; @@ -74,7 +71,7 @@ impl Config { } } -/// Executor allowing to spin up all actors necessary for a consensus node. +/// Executor allowing to spin up all components necessary for a consensus node. #[derive(Debug)] pub struct Executor { /// General-purpose executor configuration. @@ -113,22 +110,17 @@ impl Executor { pub async fn run(self, ctx: &ctx::Ctx) -> anyhow::Result<()> { let network_config = self.network_config(); - // Generate the communication pipes. We have one for each actor. - let (consensus_actor_pipe, consensus_dispatcher_pipe) = pipe::new(); - let (network_actor_pipe, network_dispatcher_pipe) = pipe::new(); - // Create the IO dispatcher. - let dispatcher = Dispatcher::new(consensus_dispatcher_pipe, network_dispatcher_pipe); + // Generate the communication channels. We have one for each component. + let (consensus_send, consensus_recv) = bft::create_input_channel(); + let (network_send, network_recv) = ctx::channel::unbounded(); - tracing::debug!("Starting actors in separate threads."); + tracing::debug!("Starting components in separate threads."); scope::run!(ctx, |ctx, s| async { - s.spawn(async { - dispatcher.run(ctx).await; - Ok(()) - }); let (net, runner) = network::Network::new( network_config, self.block_store.clone(), - network_actor_pipe, + consensus_send, + network_recv, self.attestation, ); net.register_metrics(); @@ -143,7 +135,7 @@ impl Executor { }); } - // Run the bft actor iff this node is an active validator. + // Run the bft component iff this node is an active validator. let Some(validator) = self.validator else { tracing::info!("Running the node in non-validator mode."); return Ok(()); @@ -166,7 +158,7 @@ impl Executor { payload_manager: validator.payload_manager, max_payload_size: self.config.max_payload_size, } - .run(ctx, consensus_actor_pipe) + .run(ctx, network_send, consensus_recv) .await .context("Consensus stopped") }) diff --git a/node/actors/executor/src/tests.rs b/node/components/executor/src/tests.rs similarity index 100% rename from node/actors/executor/src/tests.rs rename to node/components/executor/src/tests.rs diff --git a/node/actors/network/Cargo.toml b/node/components/network/Cargo.toml similarity index 96% rename from node/actors/network/Cargo.toml rename to node/components/network/Cargo.toml index 439ff197..74e0bcd5 100644 --- a/node/actors/network/Cargo.toml +++ b/node/components/network/Cargo.toml @@ -1,6 +1,6 @@ [package] authors.workspace = true -description = "ZKsync consensus network actor" +description = "ZKsync consensus network component" edition.workspace = true homepage.workspace = true keywords.workspace = true diff --git a/node/actors/network/build.rs b/node/components/network/build.rs similarity index 100% rename from node/actors/network/build.rs rename to node/components/network/build.rs diff --git a/node/actors/network/src/config.rs b/node/components/network/src/config.rs similarity index 97% rename from node/actors/network/src/config.rs rename to node/components/network/src/config.rs index 3a25be62..973c3f91 100644 --- a/node/actors/network/src/config.rs +++ b/node/components/network/src/config.rs @@ -1,4 +1,4 @@ -//! Network actor configs. +//! Network component configs. use std::collections::{HashMap, HashSet}; use zksync_concurrency::{limiter, net, time}; use zksync_consensus_roles::{node, validator}; @@ -82,7 +82,7 @@ pub struct GossipConfig { pub static_outbound: HashMap, } -/// Network actor config. +/// Network component config. #[derive(Debug, Clone)] pub struct Config { /// Label identifying the build version of the binary that this node is running. @@ -116,7 +116,7 @@ pub struct Config { /// Set it to `false` in tests to simulate node behavior before pre-genesis support. pub enable_pregenesis: bool, /// Maximum number of not-yet-persisted blocks fetched from the network. - /// If reached, network actor will wait for more blocks to get persisted + /// If reached, network component will wait for more blocks to get persisted /// before fetching the next ones. It is useful for limiting memory consumption /// when the block persisting rate is low. pub max_block_queue_size: usize, diff --git a/node/actors/network/src/consensus/handshake/mod.rs b/node/components/network/src/consensus/handshake/mod.rs similarity index 100% rename from node/actors/network/src/consensus/handshake/mod.rs rename to node/components/network/src/consensus/handshake/mod.rs diff --git a/node/actors/network/src/consensus/handshake/testonly.rs b/node/components/network/src/consensus/handshake/testonly.rs similarity index 100% rename from node/actors/network/src/consensus/handshake/testonly.rs rename to node/components/network/src/consensus/handshake/testonly.rs diff --git a/node/actors/network/src/consensus/handshake/tests.rs b/node/components/network/src/consensus/handshake/tests.rs similarity index 100% rename from node/actors/network/src/consensus/handshake/tests.rs rename to node/components/network/src/consensus/handshake/tests.rs diff --git a/node/actors/network/src/consensus/mod.rs b/node/components/network/src/consensus/mod.rs similarity index 97% rename from node/actors/network/src/consensus/mod.rs rename to node/components/network/src/consensus/mod.rs index 7b6e3f9a..8e912079 100644 --- a/node/actors/network/src/consensus/mod.rs +++ b/node/components/network/src/consensus/mod.rs @@ -134,14 +134,12 @@ impl rpc::Handler for &Network { req: rpc::consensus::Req, ) -> anyhow::Result { let (send, recv) = oneshot::channel(); - self.gossip - .sender - .send(io::OutputMessage::Consensus(io::ConsensusReq { - msg: req.0, - ack: send, - })); + self.gossip.consensus_sender.send(io::ConsensusReq { + msg: req.0, + ack: send, + }); // TODO(gprusak): disconnection means that there message was rejected OR - // that bft actor is missing (in tests), which leads to unnecessary disconnects. + // that bft component is missing (in tests), which leads to unnecessary disconnects. let _ = recv.recv_or_disconnected(ctx).await?; Ok(rpc::consensus::Resp) } diff --git a/node/actors/network/src/consensus/tests.rs b/node/components/network/src/consensus/tests.rs similarity index 94% rename from node/actors/network/src/consensus/tests.rs rename to node/components/network/src/consensus/tests.rs index a34062f9..f0493a57 100644 --- a/node/actors/network/src/consensus/tests.rs +++ b/node/components/network/src/consensus/tests.rs @@ -310,16 +310,12 @@ async fn test_transmission() { let in_message = io::ConsensusInputMessage { message: want.clone(), }; - nodes[0].pipe.send(in_message.into()); - - loop { - let output_message = nodes[1].pipe.recv(ctx).await.unwrap(); - if let io::OutputMessage::Consensus(got) = output_message { - assert_eq!(want, got.msg); - tracing::info!("OK"); - break; - }; - } + nodes[0].consensus_sender.send(in_message); + + let message = nodes[1].consensus_receiver.recv(ctx).await.unwrap(); + + assert_eq!(want, message.msg); + tracing::info!("OK"); } Ok(()) }) @@ -348,12 +344,9 @@ async fn test_retransmission() { // Make first node broadcast a message. let want: validator::Signed = rng.gen(); - node0.pipe.send( - io::ConsensusInputMessage { - message: want.clone(), - } - .into(), - ); + node0.consensus_sender.send(io::ConsensusInputMessage { + message: want.clone(), + }); // Spawn the second node multiple times. // Each time the node should reconnect and re-receive the broadcasted consensus message. @@ -363,13 +356,12 @@ async fn test_retransmission() { let (mut node1, runner) = testonly::Instance::new(cfgs[1].clone(), store.blocks.clone()); s.spawn_bg(runner.run(ctx)); - loop { - if let io::OutputMessage::Consensus(got) = node1.pipe.recv(ctx).await.unwrap() { - assert_eq!(want, got.msg); - tracing::info!("OK"); - break; - } - } + + let message = node1.consensus_receiver.recv(ctx).await.unwrap(); + + assert_eq!(want, message.msg); + tracing::info!("OK"); + Ok(()) }) .await diff --git a/node/actors/network/src/debug_page/mod.rs b/node/components/network/src/debug_page/mod.rs similarity index 100% rename from node/actors/network/src/debug_page/mod.rs rename to node/components/network/src/debug_page/mod.rs diff --git a/node/actors/network/src/debug_page/style.css b/node/components/network/src/debug_page/style.css similarity index 100% rename from node/actors/network/src/debug_page/style.css rename to node/components/network/src/debug_page/style.css diff --git a/node/actors/network/src/frame.rs b/node/components/network/src/frame.rs similarity index 100% rename from node/actors/network/src/frame.rs rename to node/components/network/src/frame.rs diff --git a/node/actors/network/src/gossip/attestation/metrics.rs b/node/components/network/src/gossip/attestation/metrics.rs similarity index 100% rename from node/actors/network/src/gossip/attestation/metrics.rs rename to node/components/network/src/gossip/attestation/metrics.rs diff --git a/node/actors/network/src/gossip/attestation/mod.rs b/node/components/network/src/gossip/attestation/mod.rs similarity index 100% rename from node/actors/network/src/gossip/attestation/mod.rs rename to node/components/network/src/gossip/attestation/mod.rs diff --git a/node/actors/network/src/gossip/attestation/tests.rs b/node/components/network/src/gossip/attestation/tests.rs similarity index 100% rename from node/actors/network/src/gossip/attestation/tests.rs rename to node/components/network/src/gossip/attestation/tests.rs diff --git a/node/actors/network/src/gossip/fetch.rs b/node/components/network/src/gossip/fetch.rs similarity index 100% rename from node/actors/network/src/gossip/fetch.rs rename to node/components/network/src/gossip/fetch.rs diff --git a/node/actors/network/src/gossip/handshake/mod.rs b/node/components/network/src/gossip/handshake/mod.rs similarity index 100% rename from node/actors/network/src/gossip/handshake/mod.rs rename to node/components/network/src/gossip/handshake/mod.rs diff --git a/node/actors/network/src/gossip/handshake/testonly.rs b/node/components/network/src/gossip/handshake/testonly.rs similarity index 100% rename from node/actors/network/src/gossip/handshake/testonly.rs rename to node/components/network/src/gossip/handshake/testonly.rs diff --git a/node/actors/network/src/gossip/handshake/tests.rs b/node/components/network/src/gossip/handshake/tests.rs similarity index 100% rename from node/actors/network/src/gossip/handshake/tests.rs rename to node/components/network/src/gossip/handshake/tests.rs diff --git a/node/actors/network/src/gossip/loadtest/mod.rs b/node/components/network/src/gossip/loadtest/mod.rs similarity index 100% rename from node/actors/network/src/gossip/loadtest/mod.rs rename to node/components/network/src/gossip/loadtest/mod.rs diff --git a/node/actors/network/src/gossip/loadtest/tests.rs b/node/components/network/src/gossip/loadtest/tests.rs similarity index 100% rename from node/actors/network/src/gossip/loadtest/tests.rs rename to node/components/network/src/gossip/loadtest/tests.rs diff --git a/node/actors/network/src/gossip/mod.rs b/node/components/network/src/gossip/mod.rs similarity index 95% rename from node/actors/network/src/gossip/mod.rs rename to node/components/network/src/gossip/mod.rs index 814f6b35..41e07973 100644 --- a/node/actors/network/src/gossip/mod.rs +++ b/node/components/network/src/gossip/mod.rs @@ -18,7 +18,7 @@ use fetch::RequestItem; use std::sync::{atomic::AtomicUsize, Arc}; use tracing::Instrument; pub(crate) use validator_addrs::*; -use zksync_concurrency::{ctx, ctx::channel, scope, sync}; +use zksync_concurrency::{ctx, scope, sync}; use zksync_consensus_roles::{node, validator}; use zksync_consensus_storage::BlockStore; @@ -56,8 +56,8 @@ pub(crate) struct Network { pub(crate) validator_addrs: ValidatorAddrsWatch, /// Block store to serve `get_block` requests from. pub(crate) block_store: Arc, - /// Output pipe of the network actor. - pub(crate) sender: channel::UnboundedSender, + /// Sender of the channel to the consensus component. + pub(crate) consensus_sender: sync::prunable_mpsc::Sender, /// Queue of block fetching requests. /// /// These are blocks that this node wants to request from remote peers via RPC. @@ -75,11 +75,11 @@ impl Network { pub(crate) fn new( cfg: Config, block_store: Arc, - sender: channel::UnboundedSender, + consensus_sender: sync::prunable_mpsc::Sender, attestation: Arc, ) -> Arc { Arc::new(Self { - sender, + consensus_sender, inbound: PoolWatch::new( cfg.gossip.static_inbound.clone(), cfg.gossip.dynamic_inbound_limit, diff --git a/node/actors/network/src/gossip/runner.rs b/node/components/network/src/gossip/runner.rs similarity index 100% rename from node/actors/network/src/gossip/runner.rs rename to node/components/network/src/gossip/runner.rs diff --git a/node/actors/network/src/gossip/testonly.rs b/node/components/network/src/gossip/testonly.rs similarity index 100% rename from node/actors/network/src/gossip/testonly.rs rename to node/components/network/src/gossip/testonly.rs diff --git a/node/actors/network/src/gossip/tests/fetch_blocks.rs b/node/components/network/src/gossip/tests/fetch_blocks.rs similarity index 100% rename from node/actors/network/src/gossip/tests/fetch_blocks.rs rename to node/components/network/src/gossip/tests/fetch_blocks.rs diff --git a/node/actors/network/src/gossip/tests/mod.rs b/node/components/network/src/gossip/tests/mod.rs similarity index 97% rename from node/actors/network/src/gossip/tests/mod.rs rename to node/components/network/src/gossip/tests/mod.rs index fa96f6cc..2e79162b 100644 --- a/node/actors/network/src/gossip/tests/mod.rs +++ b/node/components/network/src/gossip/tests/mod.rs @@ -518,12 +518,17 @@ async fn test_batch_votes_propagation() { for (i, mut cfg) in cfgs.into_iter().enumerate() { cfg.rpc.push_batch_votes_rate = limiter::Rate::INF; cfg.validator_key = None; - let (node, runner) = testonly::Instance::new_from_config(testonly::InstanceConfig { - cfg: cfg.clone(), - block_store: store.blocks.clone(), - attestation: attestation::Controller::new(Some(setup.attester_keys[i].clone())) - .into(), - }); + let (con_send, con_recv) = sync::prunable_mpsc::unpruned_channel(); + let (node, runner) = testonly::Instance::new_from_config( + testonly::InstanceConfig { + cfg: cfg.clone(), + block_store: store.blocks.clone(), + attestation: attestation::Controller::new(Some(setup.attester_keys[i].clone())) + .into(), + }, + con_send, + con_recv, + ); s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); // Task going through the schedule, waiting for ANY node to collect the certificate // to advance to the next round of the schedule. diff --git a/node/actors/network/src/gossip/tests/syncing.rs b/node/components/network/src/gossip/tests/syncing.rs similarity index 100% rename from node/actors/network/src/gossip/tests/syncing.rs rename to node/components/network/src/gossip/tests/syncing.rs diff --git a/node/actors/network/src/gossip/validator_addrs.rs b/node/components/network/src/gossip/validator_addrs.rs similarity index 100% rename from node/actors/network/src/gossip/validator_addrs.rs rename to node/components/network/src/gossip/validator_addrs.rs diff --git a/node/components/network/src/io.rs b/node/components/network/src/io.rs new file mode 100644 index 00000000..f3c16fc1 --- /dev/null +++ b/node/components/network/src/io.rs @@ -0,0 +1,20 @@ +#![allow(missing_docs)] +use zksync_concurrency::oneshot; +use zksync_consensus_roles::validator; + +/// Message types from the Consensus component. +#[derive(Debug, PartialEq)] +pub struct ConsensusInputMessage { + pub message: validator::Signed, +} + +/// Consensus message received from the network. +#[derive(Debug)] +pub struct ConsensusReq { + /// Payload. + pub msg: validator::Signed, + /// Channel that should be used to notify the network component that + /// processing of this message has been completed. + /// Used for rate limiting. + pub ack: oneshot::Sender<()>, +} diff --git a/node/actors/network/src/lib.rs b/node/components/network/src/lib.rs similarity index 86% rename from node/actors/network/src/lib.rs rename to node/components/network/src/lib.rs index 80f57619..bb957ced 100644 --- a/node/actors/network/src/lib.rs +++ b/node/components/network/src/lib.rs @@ -1,4 +1,4 @@ -//! Network actor maintaining a pool of outbound and inbound connections to other nodes. +//! Network component maintaining a pool of outbound and inbound connections to other nodes. use anyhow::Context as _; use gossip::attestation; use std::sync::Arc; @@ -6,10 +6,9 @@ use tracing::Instrument as _; use zksync_concurrency::{ ctx::{self, channel}, error::Wrap as _, - limiter, scope, + limiter, scope, sync, }; use zksync_consensus_storage::BlockStore; -use zksync_consensus_utils::pipe::ActorPipe; mod config; pub mod consensus; @@ -24,7 +23,6 @@ mod pool; mod preface; pub mod proto; mod rpc; -mod state; pub mod testonly; #[cfg(test)] mod tests; @@ -32,7 +30,7 @@ mod watch; pub use config::*; pub use metrics::MeteredStreamStats; -/// State of the network actor observable outside of the actor. +/// State of the network component observable outside of the component. pub struct Network { /// Consensus network state. pub(crate) consensus: Option>, @@ -45,27 +43,28 @@ pub struct Network { pub struct Runner { /// Network state. net: Arc, - /// Receiver of the messages from the dispatcher. - receiver: channel::UnboundedReceiver, + /// Receiver of consensus messages. + consensus_receiver: channel::UnboundedReceiver, } impl Network { - /// Constructs a new network actor state. - /// Call `run_network` to run the actor. + /// Constructs a new network component state. + /// Call `run_network` to run the component. pub fn new( cfg: Config, block_store: Arc, - pipe: ActorPipe, + consensus_sender: sync::prunable_mpsc::Sender, + consensus_receiver: channel::UnboundedReceiver, attestation: Arc, ) -> (Arc, Runner) { - let gossip = gossip::Network::new(cfg, block_store, pipe.send, attestation); + let gossip = gossip::Network::new(cfg, block_store, consensus_sender, attestation); let consensus = consensus::Network::new(gossip.clone()); let net = Arc::new(Self { gossip, consensus }); ( net.clone(), Runner { net, - receiver: pipe.recv, + consensus_receiver, }, ) } @@ -80,23 +79,20 @@ impl Network { async fn handle_message( &self, _ctx: &ctx::Ctx, - message: io::InputMessage, + message: io::ConsensusInputMessage, ) -> anyhow::Result<()> { - match message { - io::InputMessage::Consensus(message) => { - self.consensus - .as_ref() - .context("not a validator node")? - .msg_pool - .send(Arc::new(message)); - } - } + self.consensus + .as_ref() + .context("not a validator node")? + .msg_pool + .send(Arc::new(message)); + Ok(()) } } impl Runner { - /// Runs the network actor. + /// Runs the network component. pub async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { let res: ctx::Result<()> = scope::run!(ctx, |ctx, s| async { let mut listener = self @@ -110,7 +106,7 @@ impl Runner { // Handle incoming messages. s.spawn(async { // We don't propagate cancellation errors - while let Ok(message) = self.receiver.recv(ctx).await { + while let Ok(message) = self.consensus_receiver.recv(ctx).await { s.spawn(async { if let Err(err) = self.net.handle_message(ctx, message).await { tracing::info!("handle_message(): {err:#}"); diff --git a/node/actors/network/src/metrics.rs b/node/components/network/src/metrics.rs similarity index 100% rename from node/actors/network/src/metrics.rs rename to node/components/network/src/metrics.rs diff --git a/node/actors/network/src/mux/config.rs b/node/components/network/src/mux/config.rs similarity index 100% rename from node/actors/network/src/mux/config.rs rename to node/components/network/src/mux/config.rs diff --git a/node/actors/network/src/mux/handshake.rs b/node/components/network/src/mux/handshake.rs similarity index 100% rename from node/actors/network/src/mux/handshake.rs rename to node/components/network/src/mux/handshake.rs diff --git a/node/actors/network/src/mux/header.rs b/node/components/network/src/mux/header.rs similarity index 100% rename from node/actors/network/src/mux/header.rs rename to node/components/network/src/mux/header.rs diff --git a/node/actors/network/src/mux/mod.rs b/node/components/network/src/mux/mod.rs similarity index 100% rename from node/actors/network/src/mux/mod.rs rename to node/components/network/src/mux/mod.rs diff --git a/node/actors/network/src/mux/reusable_stream.rs b/node/components/network/src/mux/reusable_stream.rs similarity index 100% rename from node/actors/network/src/mux/reusable_stream.rs rename to node/components/network/src/mux/reusable_stream.rs diff --git a/node/actors/network/src/mux/tests/mod.rs b/node/components/network/src/mux/tests/mod.rs similarity index 100% rename from node/actors/network/src/mux/tests/mod.rs rename to node/components/network/src/mux/tests/mod.rs diff --git a/node/actors/network/src/mux/tests/proto/mod.proto b/node/components/network/src/mux/tests/proto/mod.proto similarity index 100% rename from node/actors/network/src/mux/tests/proto/mod.proto rename to node/components/network/src/mux/tests/proto/mod.proto diff --git a/node/actors/network/src/mux/tests/proto/mod.rs b/node/components/network/src/mux/tests/proto/mod.rs similarity index 100% rename from node/actors/network/src/mux/tests/proto/mod.rs rename to node/components/network/src/mux/tests/proto/mod.rs diff --git a/node/actors/network/src/mux/transient_stream.rs b/node/components/network/src/mux/transient_stream.rs similarity index 100% rename from node/actors/network/src/mux/transient_stream.rs rename to node/components/network/src/mux/transient_stream.rs diff --git a/node/actors/network/src/noise/bytes.rs b/node/components/network/src/noise/bytes.rs similarity index 100% rename from node/actors/network/src/noise/bytes.rs rename to node/components/network/src/noise/bytes.rs diff --git a/node/actors/network/src/noise/mod.rs b/node/components/network/src/noise/mod.rs similarity index 100% rename from node/actors/network/src/noise/mod.rs rename to node/components/network/src/noise/mod.rs diff --git a/node/actors/network/src/noise/stream.rs b/node/components/network/src/noise/stream.rs similarity index 100% rename from node/actors/network/src/noise/stream.rs rename to node/components/network/src/noise/stream.rs diff --git a/node/actors/network/src/noise/testonly.rs b/node/components/network/src/noise/testonly.rs similarity index 100% rename from node/actors/network/src/noise/testonly.rs rename to node/components/network/src/noise/testonly.rs diff --git a/node/actors/network/src/noise/tests.rs b/node/components/network/src/noise/tests.rs similarity index 100% rename from node/actors/network/src/noise/tests.rs rename to node/components/network/src/noise/tests.rs diff --git a/node/actors/network/src/pool.rs b/node/components/network/src/pool.rs similarity index 100% rename from node/actors/network/src/pool.rs rename to node/components/network/src/pool.rs diff --git a/node/actors/network/src/preface.rs b/node/components/network/src/preface.rs similarity index 100% rename from node/actors/network/src/preface.rs rename to node/components/network/src/preface.rs diff --git a/node/actors/network/src/proto/consensus.proto b/node/components/network/src/proto/consensus.proto similarity index 100% rename from node/actors/network/src/proto/consensus.proto rename to node/components/network/src/proto/consensus.proto diff --git a/node/actors/network/src/proto/gossip.proto b/node/components/network/src/proto/gossip.proto similarity index 100% rename from node/actors/network/src/proto/gossip.proto rename to node/components/network/src/proto/gossip.proto diff --git a/node/actors/network/src/proto/mod.rs b/node/components/network/src/proto/mod.rs similarity index 100% rename from node/actors/network/src/proto/mod.rs rename to node/components/network/src/proto/mod.rs diff --git a/node/actors/network/src/proto/mux.proto b/node/components/network/src/proto/mux.proto similarity index 100% rename from node/actors/network/src/proto/mux.proto rename to node/components/network/src/proto/mux.proto diff --git a/node/actors/network/src/proto/ping.proto b/node/components/network/src/proto/ping.proto similarity index 100% rename from node/actors/network/src/proto/ping.proto rename to node/components/network/src/proto/ping.proto diff --git a/node/actors/network/src/proto/preface.proto b/node/components/network/src/proto/preface.proto similarity index 100% rename from node/actors/network/src/proto/preface.proto rename to node/components/network/src/proto/preface.proto diff --git a/node/actors/network/src/proto/rpc.proto b/node/components/network/src/proto/rpc.proto similarity index 100% rename from node/actors/network/src/proto/rpc.proto rename to node/components/network/src/proto/rpc.proto diff --git a/node/actors/network/src/rpc/consensus.rs b/node/components/network/src/rpc/consensus.rs similarity index 100% rename from node/actors/network/src/rpc/consensus.rs rename to node/components/network/src/rpc/consensus.rs diff --git a/node/actors/network/src/rpc/get_block.rs b/node/components/network/src/rpc/get_block.rs similarity index 100% rename from node/actors/network/src/rpc/get_block.rs rename to node/components/network/src/rpc/get_block.rs diff --git a/node/actors/network/src/rpc/metrics.rs b/node/components/network/src/rpc/metrics.rs similarity index 100% rename from node/actors/network/src/rpc/metrics.rs rename to node/components/network/src/rpc/metrics.rs diff --git a/node/actors/network/src/rpc/mod.rs b/node/components/network/src/rpc/mod.rs similarity index 100% rename from node/actors/network/src/rpc/mod.rs rename to node/components/network/src/rpc/mod.rs diff --git a/node/actors/network/src/rpc/ping.rs b/node/components/network/src/rpc/ping.rs similarity index 100% rename from node/actors/network/src/rpc/ping.rs rename to node/components/network/src/rpc/ping.rs diff --git a/node/actors/network/src/rpc/push_batch_votes.rs b/node/components/network/src/rpc/push_batch_votes.rs similarity index 100% rename from node/actors/network/src/rpc/push_batch_votes.rs rename to node/components/network/src/rpc/push_batch_votes.rs diff --git a/node/actors/network/src/rpc/push_block_store_state.rs b/node/components/network/src/rpc/push_block_store_state.rs similarity index 100% rename from node/actors/network/src/rpc/push_block_store_state.rs rename to node/components/network/src/rpc/push_block_store_state.rs diff --git a/node/actors/network/src/rpc/push_validator_addrs.rs b/node/components/network/src/rpc/push_validator_addrs.rs similarity index 100% rename from node/actors/network/src/rpc/push_validator_addrs.rs rename to node/components/network/src/rpc/push_validator_addrs.rs diff --git a/node/actors/network/src/rpc/testonly.rs b/node/components/network/src/rpc/testonly.rs similarity index 100% rename from node/actors/network/src/rpc/testonly.rs rename to node/components/network/src/rpc/testonly.rs diff --git a/node/actors/network/src/rpc/tests.rs b/node/components/network/src/rpc/tests.rs similarity index 100% rename from node/actors/network/src/rpc/tests.rs rename to node/components/network/src/rpc/tests.rs diff --git a/node/actors/network/src/testonly.rs b/node/components/network/src/testonly.rs similarity index 83% rename from node/actors/network/src/testonly.rs rename to node/components/network/src/testonly.rs index e6d5ca3d..48421f80 100644 --- a/node/actors/network/src/testonly.rs +++ b/node/components/network/src/testonly.rs @@ -1,8 +1,8 @@ //! Testonly utilities. #![allow(dead_code)] use crate::{ - gossip::attestation, io::ConsensusInputMessage, Config, GossipConfig, Network, RpcConfig, - Runner, + gossip::attestation, io::ConsensusInputMessage, io::ConsensusReq, Config, GossipConfig, + Network, RpcConfig, Runner, }; use rand::{ distributions::{Distribution, Standard}, @@ -18,7 +18,6 @@ use zksync_concurrency::{ }; use zksync_consensus_roles::{node, validator}; use zksync_consensus_storage::BlockStore; -use zksync_consensus_utils::pipe; impl Distribution for Standard { fn sample(&self, rng: &mut R) -> ConsensusInputMessage { @@ -78,15 +77,20 @@ pub(crate) async fn forward( let _ = io::shutdown(ctx, &mut write).await; } -/// Node instance, wrapping the network actor state and the +/// Node instance, wrapping the network component state and the /// events channel. +#[allow(clippy::partial_pub_fields)] pub struct Instance { /// State of the instance. - pub(crate) net: Arc, + pub net: Arc, /// Termination signal that can be sent to the node. - pub(crate) terminate: channel::Sender<()>, - /// Dispatcher end of the network pipe. - pub(crate) pipe: pipe::DispatcherPipe, + terminate: channel::Sender<()>, + /// Receiver channel to receive messages from the network component that are + /// intended for the consensus component. + pub consensus_receiver: sync::prunable_mpsc::Receiver, + /// Sender channel to send messages to the network component that are from + /// the consensus component. + pub consensus_sender: channel::UnboundedSender, } /// Construct configs for `n` validators of the consensus. @@ -190,27 +194,59 @@ pub struct InstanceConfig { impl Instance { /// Constructs a new instance. pub fn new(cfg: Config, block_store: Arc) -> (Self, InstanceRunner) { - Self::new_from_config(InstanceConfig { - cfg, - block_store, - attestation: attestation::Controller::new(None).into(), - }) + let (con_send, con_recv) = sync::prunable_mpsc::unpruned_channel(); + Self::new_from_config( + InstanceConfig { + cfg, + block_store, + attestation: attestation::Controller::new(None).into(), + }, + con_send, + con_recv, + ) + } + + /// Constructs a new instance with a custom channel for consensus + /// component. + pub fn new_with_channel( + cfg: Config, + block_store: Arc, + con_send: sync::prunable_mpsc::Sender, + con_recv: sync::prunable_mpsc::Receiver, + ) -> (Self, InstanceRunner) { + Self::new_from_config( + InstanceConfig { + cfg, + block_store, + attestation: attestation::Controller::new(None).into(), + }, + con_send, + con_recv, + ) } /// Construct an instance for a given config. - pub fn new_from_config(cfg: InstanceConfig) -> (Self, InstanceRunner) { - let (actor_pipe, dispatcher_pipe) = pipe::new(); + pub fn new_from_config( + cfg: InstanceConfig, + net_to_con_send: sync::prunable_mpsc::Sender, + net_to_con_recv: sync::prunable_mpsc::Receiver, + ) -> (Self, InstanceRunner) { + let (con_to_net_send, con_to_net_recv) = channel::unbounded(); + let (terminate_send, terminate_recv) = channel::bounded(1); + let (net, net_runner) = Network::new( cfg.cfg, cfg.block_store.clone(), - actor_pipe, + net_to_con_send, + con_to_net_recv, cfg.attestation, ); - let (terminate_send, terminate_recv) = channel::bounded(1); + ( Self { net, - pipe: dispatcher_pipe, + consensus_receiver: net_to_con_recv, + consensus_sender: con_to_net_send, terminate: terminate_send, }, InstanceRunner { @@ -227,13 +263,6 @@ impl Instance { self.terminate.closed(ctx).await } - /// Pipe getter. - pub fn pipe( - &mut self, - ) -> &mut pipe::DispatcherPipe { - &mut self.pipe - } - /// State getter. pub fn state(&self) -> &Arc { &self.net diff --git a/node/actors/network/src/tests.rs b/node/components/network/src/tests.rs similarity index 100% rename from node/actors/network/src/tests.rs rename to node/components/network/src/tests.rs diff --git a/node/actors/network/src/watch.rs b/node/components/network/src/watch.rs similarity index 100% rename from node/actors/network/src/watch.rs rename to node/components/network/src/watch.rs diff --git a/node/deny.toml b/node/deny.toml index b4c9a367..6e7e62ad 100644 --- a/node/deny.toml +++ b/node/deny.toml @@ -5,6 +5,11 @@ targets = [ { triple = "x86_64-unknown-linux-musl" }, ] +[advisories] +ignore = [ + { id = "RUSTSEC-2024-0384", reason = "caused by kube, which hasn't updated their dependencies yet" }, +] + [licenses] # We want to set a high confidence threshold for license detection. confidence-threshold = 1.0 @@ -17,8 +22,8 @@ allow = [ "ISC", "MIT", "OpenSSL", - "Unicode-DFS-2016", "Unicode-3.0", + "Unicode-DFS-2016", # Weak copyleft licenses "MPL-2.0", ] @@ -35,33 +40,6 @@ name = "ring" expression = "MIT" license-files = [{ path = "LICENSE", hash = 0xbd0eed23 }] -[bans] -# Lint level for when multiple versions of the same crate are detected -multiple-versions = "deny" -# Certain crates/versions that will be skipped when doing duplicate detection. -skip = [ - # Old versions required by tempfile and prost-build. - { name = "bitflags", version = "1.3.2" }, - - # Old version required by tracing-subscriber. - { name = "regex-automata", version = "0.1.10" }, - { name = "regex-syntax", version = "0.6.29" }, - - # Old version required by kube-runtime. - { name = "syn", version = "1.0" }, - - # Old version required by protox 0.5.1. - { name = "base64", version = "0.21.7" }, - - # Old versions required by vise-exporter. - { name = "http", version = "0.2.12" }, - { name = "http-body", version = "0.4.6" }, - { name = "hyper", version = "0.14.28" }, - - { name = "rustls-native-certs", version = "0.7.3" }, - { name = "hashbrown", version = "0.14.5" }, -] - [sources] unknown-git = "deny" unknown-registry = "deny" diff --git a/node/libs/concurrency/src/sync/prunable_mpsc/mod.rs b/node/libs/concurrency/src/sync/prunable_mpsc/mod.rs index ba8f0bd6..4676082b 100644 --- a/node/libs/concurrency/src/sync/prunable_mpsc/mod.rs +++ b/node/libs/concurrency/src/sync/prunable_mpsc/mod.rs @@ -56,6 +56,11 @@ pub fn channel( (send, recv) } +/// Creates a channel without any pruning or filtering, and returns the [`Sender`] and [`Receiver`] handles. +pub fn unpruned_channel() -> (Sender, Receiver) { + channel(|_| true, |_, _| SelectionFunctionResult::Keep) +} + struct Shared { send: watch::Sender>, } diff --git a/node/libs/roles/src/validator/messages/genesis.rs b/node/libs/roles/src/validator/messages/genesis.rs index 9f126409..19d6daeb 100644 --- a/node/libs/roles/src/validator/messages/genesis.rs +++ b/node/libs/roles/src/validator/messages/genesis.rs @@ -140,7 +140,7 @@ impl TryFrom for ProtocolVersion { fn try_from(value: u32) -> Result { // Currently, consensus doesn't define restrictions on the possible version. Unsupported - // versions are filtered out on the BFT actor level instead. + // versions are filtered out on the BFT component level instead. Ok(Self(value)) } } diff --git a/node/libs/utils/src/lib.rs b/node/libs/utils/src/lib.rs index 71fc6af1..33d4ecc5 100644 --- a/node/libs/utils/src/lib.rs +++ b/node/libs/utils/src/lib.rs @@ -2,6 +2,5 @@ mod encode; pub mod enum_util; -pub mod pipe; pub use encode::*; diff --git a/node/libs/utils/src/pipe.rs b/node/libs/utils/src/pipe.rs deleted file mode 100644 index d1b3a0d7..00000000 --- a/node/libs/utils/src/pipe.rs +++ /dev/null @@ -1,76 +0,0 @@ -//! This is a wrapper around channels to make it simpler and less error-prone to connect actors and the dispatcher. -//! A Pipe is a basically a bi-directional unbounded channel. - -use std::future::Future; -use zksync_concurrency::ctx::{self, channel, Ctx}; - -/// This is the end of the Pipe that should be held by the actor. -/// -/// The actor can receive `In` and send back `Out` messages. -pub type ActorPipe = Pipe; - -/// This is the end of the Pipe that should be held by the dispatcher. -/// -/// The dispatcher can send `In` messages and receive `Out` back. -pub type DispatcherPipe = Pipe; - -/// This is a generic Pipe end. -/// -/// It is used to receive `In` and send `Out` messages. -/// -/// When viewed from the perspective of [new]: -/// * messages of type `In` are going right-to-left -/// * messages of type `Out` are going left-to-right -/// -/// ```text -/// In <- -------- <- In -/// | Pipe | -/// Out -> -------- -> Out -/// ^ ^ -/// Actor Dispatcher -/// ``` -#[derive(Debug)] -pub struct Pipe { - /// This is the channel that receives messages. - pub recv: channel::UnboundedReceiver, - /// This is the channel that sends messages. - pub send: channel::UnboundedSender, -} - -impl Pipe { - /// Sends a message to the pipe. - pub fn send(&self, msg: Out) { - self.send.send(msg) - } - - /// Awaits a message from the pipe. - pub fn recv<'a>( - &'a mut self, - ctx: &'a Ctx, - ) -> ctx::CtxAware>> { - self.recv.recv(ctx) - } - - /// Tries to get a message from the pipe. Will return None if the pipe is empty. - pub fn try_recv(&mut self) -> Option { - self.recv.try_recv() - } -} - -/// This function creates a new Pipe. It returns the two ends of the pipe, for the actor and the dispatcher. -pub fn new() -> (ActorPipe, DispatcherPipe) { - let (input_sender, input_receiver) = channel::unbounded(); - let (output_sender, output_receiver) = channel::unbounded(); - - let pipe_actor = Pipe { - recv: input_receiver, - send: output_sender, - }; - - let pipe_dispatcher = Pipe { - recv: output_receiver, - send: input_sender, - }; - - (pipe_actor, pipe_dispatcher) -} diff --git a/node/tools/src/main.rs b/node/tools/src/main.rs index bf79a9bd..a80bb6b9 100644 --- a/node/tools/src/main.rs +++ b/node/tools/src/main.rs @@ -1,5 +1,5 @@ //! Main binary for the consensus node. It reads the configuration, initializes all parts of the node and -//! manages communication between the actors. It is the main executable in this workspace. +//! manages communication between the components. It is the main executable in this workspace. use anyhow::Context as _; use clap::Parser; use std::{fs, fs::Permissions, io::IsTerminal as _, os::unix::fs::PermissionsExt, path::PathBuf};