From 3836ecebae6f2fd88c1a558aac68d0788e398df3 Mon Sep 17 00:00:00 2001 From: Alexey Danilevich Date: Mon, 8 Nov 2021 20:28:40 +0300 Subject: [PATCH] 0.7.55 (#55) --- Cargo.toml | 86 +++++++++---------- src/client.rs | 24 +++--- src/common.rs | 226 +++++++++++++++++++++++++------------------------- src/node.rs | 188 +++++++++++++++++++++-------------------- src/server.rs | 14 ++-- 5 files changed, 269 insertions(+), 269 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 07ac17f..060e5f3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,55 +1,51 @@ - [package] - -edition = '2018' -build = './common/build/build.rs' -name = 'adnl' -version = '0.7.53' -description = 'ADNL library' +edition = "2018" +build = "./common/build/build.rs" +name = "adnl" +version = "0.7.55" +description = "ADNL library" [workspace] [dependencies] - -aes-ctr = '0.6.0' -arrayref = '0.3.5' -async-trait = '0.1.22' -base64 = '0.11.0' -curve25519-dalek = '2.0.0' -ed25519 = '1.0.1' -ed25519-dalek = '1.0.0-pre.4' -external-ip = { version = '4.1.0', optional = true } -failure = '0.1.6' -futures = '0.3.1' -log = '0.4.11' -num_cpus = '1.13' -rand = '0.7.2' -serde = { version = '1.0.105', features = [ 'derive', 'rc' ] } -serde_json = '1.0.41' -sha2 = '0.8.0' -socket2 = '0.3.19' -stream-cancel = '0.8.0' -tokio = { version = '1.5.0', features = [ 'io-util', 'macros', 'net', 'rt-multi-thread' ] } -tokio-io-timeout = '1.1.1' -tokio-stream = { version = '0.1.4', features = [ 'net' ] } -x25519-dalek = '0.6.0' -zstd = { version = '0.8', optional = true, features = [ 'pkg-config' ] } -ton_api = { git = 'https://github.com/tonlabs/ton-labs-tl.git', package = 'ton_api', tag = '0.2.110' } -ton_types = { git = 'https://github.com/tonlabs/ton-labs-types.git', tag = '1.10.10' } -lockfree = { git = 'https://github.com/tonlabs/lockfree.git' } +aes-ctr = "0.6.0" +async-trait = "0.1.22" +base64 = "0.13" +curve25519-dalek = "3.2" +ed25519 = "1.0" +ed25519-dalek = "1.0" +external-ip = { version = "4.1.0", optional = true } +failure = "0.1.6" +futures = "0.3.1" +log = "0.4" +num_cpus = "1.13" +rand = "0.7" +serde = { version = "1.0.105", features = ["derive", "rc"] } +serde_json = "1.0.41" +sha2 = "0.9" +socket2 = "0.3.19" +stream-cancel = "0.8.0" +tokio = { version = "1.5.0", features = ["io-util", "macros", "net", "rt-multi-thread"] } +tokio-io-timeout = "1.1.1" +tokio-stream = { version = "0.1.4", features = ["net"] } +x25519-dalek = "1.2" +zstd = { version = "0.8", optional = true, features = ["pkg-config"] } + +ton_api = { git = 'https://github.com/tonlabs/ton-labs-tl.git', package = 'ton_api', tag = '0.2.111' } +ton_types = { git = 'https://github.com/tonlabs/ton-labs-types.git', tag = '1.10.11' } +lockfree = { git = "https://github.com/tonlabs/lockfree.git" } [dev-dependencies] - -external-ip = '4.1.0' -hex = '^0' -log4rs = '0.8.3' +external-ip = "4.1" +hex = "0.4" +log4rs = "1.0" [features] +default = ["static_workers"] +client = [] +server = [] +node = ["external-ip"] +compression = ["zstd"] +static_workers = [] +telemetry = [] -default = [ 'static_workers' ] -client = [ ] -server = [ ] -node = [ 'external-ip' ] -compression = [ 'zstd' ] -static_workers = [ ] -telemetry = [ ] diff --git a/src/client.rs b/src/client.rs index 8109911..734d6e3 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,21 +1,18 @@ use crate::{ - dump, common::{ AdnlHandshake, AdnlStream, AdnlStreamCrypto, deserialize, get256, - KeyOption, KeyOptionJson, Query, serialize, TaggedTlObject, TARGET, Timeouts + KeyOption, KeyOptionJson, Query, serialize, TaggedTlObject, Timeouts } }; use rand::Rng; -use std::{net::SocketAddr, time::{Duration, SystemTime}}; -use ton_api::{ - ton::{ - TLObject, adnl::{Message as AdnlMessage, Pong as AdnlPongBoxed}, - rpc::adnl::Ping as AdnlPing - } +use std::{convert::TryInto, net::SocketAddr, time::{Duration, SystemTime}}; +use ton_api::ton::{ + TLObject, adnl::{Message as AdnlMessage, Pong as AdnlPongBoxed}, + rpc::adnl::Ping as AdnlPing }; #[cfg(feature = "telemetry")] use ton_api::{BoxedSerialize, ConstructorNumber}; -use ton_types::{fail, Result}; +use ton_types::{error, fail, Result}; #[derive(serde::Deserialize, serde::Serialize)] pub struct AdnlClientConfigJson { @@ -113,7 +110,7 @@ impl AdnlClient { //socket.bind(&"0.0.0.0:0".parse::()?.into())?; socket.connect_timeout( &config.server_address.into(), - config.timeouts.write().unwrap() + config.timeouts.write() )?; let mut stream = AdnlStream::from_stream_with_timeouts( @@ -171,7 +168,7 @@ impl AdnlClient { } let answer = deserialize(&buf[..])? .downcast::() - .map_err(|answer| failure::format_err!("Unsupported ADNL message {:?}", answer))?; + .map_err(|answer| error!("Unsupported ADNL message {:?}", answer))?; match answer { AdnlMessage::Adnl_Message_Answer(answer) => if &query_id == get256(&answer.query_id) { deserialize(&answer.answer) @@ -188,15 +185,14 @@ impl AdnlClient { ) -> Result { let mut rng = rand::thread_rng(); let mut buf: Vec = (0..160).map(|_| rng.gen()).collect(); - let nonce = arrayref::array_ref!(buf, 0, 160); - dump!(trace, TARGET, "Nonce", nonce); + let nonce = buf.as_slice().try_into()?; let ret = AdnlStreamCrypto::with_nonce_as_client(nonce); if let Some(client_key) = &config.client_key { AdnlHandshake::build_packet(&mut buf, client_key, &config.server_key)? } else { AdnlHandshake::build_packet( &mut buf, - &KeyOption::from_ed25519_secret_key(ed25519_dalek::SecretKey::generate(&mut rng)), + &KeyOption::from_ed25519_secret_key(ed25519_dalek::SecretKey::generate(&mut rng))?, &config.server_key )? } diff --git a/src/common.rs b/src/common.rs index 6865e12..2ee8a1e 100644 --- a/src/common.rs +++ b/src/common.rs @@ -4,7 +4,7 @@ use ed25519::signature::{Signature, Verifier}; use rand::Rng; use sha2::Digest; use std::{ - fmt::{self, Debug, Display, Formatter}, hash::Hash, + convert::TryInto, fmt::{self, Debug, Display, Formatter}, hash::Hash, sync::{Arc, atomic::{AtomicU64, AtomicUsize, Ordering}}, time::{Duration, Instant, SystemTime, UNIX_EPOCH} }; @@ -28,7 +28,7 @@ use ton_api::{ }; use ton_types::{fail, Result}; -#[cfg(any(feature = "client", feature = "node", feature = "server"))] +#[cfg(any(feature = "node", feature = "server"))] pub(crate) const TARGET: &str = "adnl"; #[macro_export] @@ -60,35 +60,6 @@ macro_rules! dump { } } -#[macro_export] -macro_rules! from_slice { - ($x:ident, 32) => { - [ - $x[ 0], $x[ 1], $x[ 2], $x[ 3], $x[ 4], $x[ 5], $x[ 6], $x[ 7], $x[ 8], $x[ 9], - $x[10], $x[11], $x[12], $x[13], $x[14], $x[15], $x[16], $x[17], $x[18], $x[19], - $x[20], $x[21], $x[22], $x[23], $x[24], $x[25], $x[26], $x[27], $x[28], $x[29], - $x[30], $x[31] - ] - }; - ($x:ident, $ix:expr, 16, $y: ident, $iy:expr, 16) => { - [ - $x[$ix + 0], $x[$ix + 1], $x[$ix + 2], $x[$ix + 3], $x[$ix + 4], $x[$ix + 5], - $x[$ix + 6], $x[$ix + 7], $x[$ix + 8], $x[$ix + 9], $x[$ix + 10], $x[$ix + 11], - $x[$ix + 12], $x[$ix + 13], $x[$ix + 14], $x[$ix + 15], - $y[$iy + 0], $y[$iy + 1], $y[$iy + 2], $y[$iy + 3], $y[$iy + 4], $y[$iy + 5], - $y[$iy + 6], $y[$iy + 7], $y[$iy + 8], $y[$iy + 9], $y[$iy + 10], $y[$iy + 11], - $y[$iy + 12], $y[$iy + 13], $y[$iy + 14], $y[$iy + 15], - ] - }; - ($x:ident, $ix:expr, 4, $y: ident, $iy:expr, 12) => { - [ - $x[$ix + 0], $x[$ix + 1], $x[$ix + 2], $x[$ix + 3], - $y[$iy + 0], $y[$iy + 1], $y[$iy + 2], $y[$iy + 3], $y[$iy + 4], $y[$iy + 5], - $y[$iy + 6], $y[$iy + 7], $y[$iy + 8], $y[$iy + 9], $y[$iy + 10], $y[$iy + 11], - ] - } -} - #[macro_export] macro_rules! trace { ($target:expr, $func:expr) => { @@ -113,12 +84,35 @@ pub struct AdnlCryptoUtils; impl AdnlCryptoUtils { /// Build AES-based cipher with clearing key data + pub fn build_cipher_secure(secret: &[u8; 32], digest: &[u8; 32]) -> aes_ctr::Aes256Ctr { + let x = secret; + let y = digest; + // let mut key = from_slice!(x, 0, 16, y, 16, 16); + let mut key = [ + x[ 0], x[ 1], x[ 2], x[ 3], x[ 4], x[ 5], x[ 6], x[ 7], + x[ 8], x[ 9], x[10], x[11], x[12], x[13], x[14], x[15], + y[16], y[17], y[18], y[19], y[20], y[21], y[22], y[23], + y[24], y[25], y[26], y[27], y[28], y[29], y[30], y[31] + ]; + // let mut ctr = from_slice!(y, 0, 4, x, 20, 12); + let mut ctr = [ + y[ 0], y[ 1], y[ 2], y[ 3], x[20], x[21], x[22], x[23], + x[24], x[25], x[26], x[27], x[28], x[29], x[30], x[31] + ]; + let ret = Self::build_cipher_internal(&key, &ctr); + key.iter_mut().for_each(|a| *a = 0); + ctr.iter_mut().for_each(|a| *a = 0); + ret + } + +/* pub fn build_cipher_secure(key: &mut [u8], ctr: &mut [u8]) -> aes_ctr::Aes256Ctr { let ret = Self::build_cipher_internal(key, ctr); key.iter_mut().for_each(|a| *a = 0); ctr.iter_mut().for_each(|a| *a = 0); ret } +*/ /// Build AES-based cipher without clearing key data pub fn build_cipher_unsecure( @@ -160,28 +154,22 @@ impl AdnlHandshake { local: &KeyOption, other: &KeyOption ) -> Result<()> { - - let checksum = { - let checksum = sha2::Sha256::digest(&buf[..]); - let checksum = checksum.as_slice(); - from_slice!(checksum, 32) - }; - + let checksum = sha2::Sha256::digest(buf); let len = buf.len(); buf.resize(len + 96, 0); buf[..].copy_within(..len, 96); buf[..32].copy_from_slice(other.id().data()); buf[32..64].copy_from_slice(local.pub_key()?); buf[64..96].copy_from_slice(&checksum); - let mut shared_secret = AdnlCryptoUtils::calc_shared_secret( local.pvt_key()?, other.pub_key()? ); - Self::build_packet_cipher(&mut shared_secret, &checksum) - .apply_keystream(&mut buf[96..]); + Self::build_packet_cipher( + &mut shared_secret, + checksum.as_slice().try_into()? + ).apply_keystream(&mut buf[96..]); Ok(()) - } /// Parse handshake packet @@ -198,15 +186,17 @@ impl AdnlHandshake { if key.val().id().data().eq(&buf[0..32]) { let mut shared_secret = AdnlCryptoUtils::calc_shared_secret( key.val().pvt_key()?, - arrayref::array_ref!(buf, 32, 32) + buf[32..64].try_into()? ); let range = if let Some(len) = len { 96..96 + len } else { 96..buf.len() }; - Self::build_packet_cipher(&mut shared_secret, arrayref::array_ref!(buf, 64, 32)) - .apply_keystream(&mut buf[range]); + Self::build_packet_cipher( + &mut shared_secret, + buf[64..96].try_into()? + ).apply_keystream(&mut buf[range]); if !sha2::Sha256::digest(&buf[96..]).as_slice().eq(&buf[64..96]) { fail!("Bad handshake packet checksum"); } @@ -217,17 +207,30 @@ impl AdnlHandshake { Ok(None) } - #[cfg(any(feature = "client", feature = "server", feature = "node"))] + #[cfg(any(feature = "client", feature = "node", feature = "server"))] fn build_packet_cipher( shared_secret: &mut [u8; 32], checksum: &[u8; 32] - ) -> aes_ctr::Aes256Ctr { - let x = &shared_secret[..]; - let y = &checksum[..]; - let mut aes_key_bytes = from_slice!(x, 0, 16, y, 16, 16); - let mut aes_ctr_bytes = from_slice!(y, 0, 4, x, 20, 12); + ) -> aes_ctr::Aes256Ctr { +/* + let x = shared_secret; + let y = checksum; + //let mut aes_key_bytes = from_slice!(x, 0, 16, y, 16, 16); + let mut aes_key_bytes = [ + x[ 0], x[ 1], x[ 2], x[ 3], x[ 4], x[ 5], x[ 6], x[ 7], + x[ 8], x[ 9], x[10], x[11], x[12], x[13], x[14], x[15], + y[16], y[17], y[18], y[19], y[20], y[21], y[22], y[23], + y[24], y[25], y[26], y[27], y[28], y[29], y[30], y[31] + ]; + //let mut aes_ctr_bytes = from_slice!(y, 0, 4, x, 20, 12); + let mut aes_ctr_bytes = [ + y[ 0], y[ 1], y[ 2], y[ 3], x[20], x[21], x[22], x[23], + x[24], x[25], x[26], x[27], x[28], x[29], x[30], x[31] + ]; +*/ + let ret = AdnlCryptoUtils::build_cipher_secure(shared_secret, checksum); shared_secret.iter_mut().for_each(|a| *a = 0); - AdnlCryptoUtils::build_cipher_secure(&mut aes_key_bytes, &mut aes_ctr_bytes) + ret } } @@ -293,8 +296,8 @@ impl AdnlStream { /// Constructor pub fn from_stream_with_timeouts(stream: tokio::net::TcpStream, timeouts: &Timeouts) -> Self { let mut stream = tokio_io_timeout::TimeoutStream::new(stream); - stream.set_write_timeout(timeouts.write()); - stream.set_read_timeout(timeouts.read()); + stream.set_write_timeout(Some(timeouts.write())); + stream.set_read_timeout(Some(timeouts.read())); Self(stream) } /// Read from stream @@ -357,7 +360,7 @@ impl AdnlStreamCrypto { buf.resize(len + 36, 0); buf[..].copy_within(..len, 36); buf[..4].copy_from_slice(&((len + 64) as u32).to_le_bytes()); - buf[4..36].copy_from_slice(&nonce); + buf[4..36].copy_from_slice(&nonce); buf.extend_from_slice(sha2::Sha256::digest(&buf[4..]).as_slice()); self.cipher_send.apply_keystream(&mut buf[..]); stream.write(buf).await?; @@ -477,22 +480,21 @@ impl KeyOption { pub const KEY_ED25519: i32 = 1209251014; /// Create from Ed25519 expanded secret key - pub fn from_ed25519_expanded_secret_key(exp_key: ed25519_dalek::ExpandedSecretKey) -> Self { + pub fn from_ed25519_expanded_secret_key(exp_key: ed25519_dalek::ExpandedSecretKey) -> Result { let pub_key = ed25519_dalek::PublicKey::from(&exp_key).to_bytes(); - let exp_key = &exp_key.to_bytes(); - let pvt_key = &exp_key[..32]; - let pvt_key = from_slice!(pvt_key, 32); - let exp_key = &exp_key[32..]; - let exp_key = from_slice!(exp_key, 32); - Self { + let exp_key = exp_key.to_bytes(); + let pvt_key = exp_key[..32].try_into()?; + let exp_key = exp_key[32..64].try_into()?; + let ret = Self { id: Self::calc_id(Self::KEY_ED25519, &pub_key), keys: [Some(pub_key), Some(pvt_key), Some(exp_key)], type_id: Self::KEY_ED25519 - } + }; + Ok(ret) } /// Create from Ed25519 secret key - pub fn from_ed25519_secret_key(key: ed25519_dalek::SecretKey) -> Self { + pub fn from_ed25519_secret_key(key: ed25519_dalek::SecretKey) -> Result { Self::from_ed25519_expanded_secret_key(ed25519_dalek::ExpandedSecretKey::from(&key)) } @@ -501,19 +503,21 @@ impl KeyOption { if src.pub_key.is_some() { fail!("No public key expected"); }; - let key = if let Some(key) = &src.pvt_key { - base64::decode(key)? - } else { - fail!("No private key"); - }; - if key.len() != 32 { - fail!("Bad private key"); - } - if src.type_id == Self::KEY_ED25519 { - let sec_key = ed25519_dalek::SecretKey::from_bytes(&key[..32])?; - Ok(Self::from_ed25519_secret_key(sec_key)) - } else { - fail!("Type-id {} is not supported for private key", src.type_id); + match src.type_id { + Self::KEY_ED25519 => match &src.pvt_key { + Some(key) => { + let key = base64::decode(key)?; + if key.len() != 32 { + fail!("Bad private key"); + } + let sec_key = ed25519_dalek::SecretKey::from_bytes( + key.as_slice().try_into()? + )?; + Self::from_ed25519_secret_key(sec_key) + } + None => fail!("No private key") + } + _ => fail!("Type-id {} is not supported for private key", src.type_id) } } @@ -522,23 +526,22 @@ impl KeyOption { if src.pvt_key.is_some() { fail!("No private key expected"); }; - let key = if let Some(key) = &src.pub_key { - base64::decode(key)? - } else { - fail!("No public key"); - }; - if key.len() != 32 { - fail!("Bad public key"); - } - let key = &key[..32]; - let pub_key = from_slice!(key, 32); - Ok( - Self { - id: Self::calc_id(src.type_id, &pub_key), - keys: [Some(pub_key), None, None], - type_id: src.type_id + match &src.pub_key { + Some(key) => { + let key = base64::decode(key)?; + if key.len() != 32 { + fail!("Bad public key"); + } + let pub_key: [u8; 32] = key.as_slice().try_into()?; + let ret = Self { + id: Self::calc_id(src.type_id, &pub_key), + keys: [Some(pub_key), None, None], + type_id: src.type_id + }; + Ok(ret) } - ) + None => fail!("No public key") + } } /// Create from TL object with public key @@ -552,10 +555,10 @@ impl KeyOption { /// Create from TL serialized public key pub fn from_tl_serialized_public_key(src: &[u8]) -> Result { - let pub_key = deserialize(src)? - .downcast::() - .map_err(|key| failure::format_err!("Unsupported PublicKey data {:?}", key))?; - Self::from_tl_public_key(&pub_key) + match deserialize(src)?.downcast::() { + Ok(pub_key) => Self::from_tl_public_key(&pub_key), + Err(key) => fail!("Unsupported PublicKey data {:?}", key) + } } /// Create from type and private key @@ -572,7 +575,7 @@ impl KeyOption { pub_key: None, pvt_key: Some(base64::encode(pvt_key)) }; - Ok((json, Self::from_ed25519_secret_key(sec_key))) + Ok((json, Self::from_ed25519_secret_key(sec_key)?)) } /// Create from type and public key @@ -595,7 +598,7 @@ impl KeyOption { pub_key: None, pvt_key: Some(base64::encode(&sec_key.to_bytes())) }; - Ok((json, Self::from_ed25519_secret_key(sec_key))) + Ok((json, Self::from_ed25519_secret_key(sec_key)?)) } /// Get key id @@ -675,11 +678,9 @@ impl KeyOption { /// Calculate key ID fn calc_id(type_id: i32, pub_key: &[u8; 32]) -> Arc { let mut sha = sha2::Sha256::new(); - sha.input(&type_id.to_le_bytes()); - sha.input(pub_key); - let buf = sha.result_reset(); - let src = buf.as_slice(); - KeyId::from_data(from_slice!(src, 32)) + sha.update(&type_id.to_le_bytes()); + sha.update(pub_key); + KeyId::from_data(sha.finalize().into()) } } @@ -938,6 +939,7 @@ pub struct TaggedObject { #[cfg(feature = "telemetry")] pub tag: u32 } + pub type TaggedAdnlMessage = TaggedObject; pub type TaggedByteSlice<'a> = TaggedObject<&'a[u8]>; pub type TaggedByteVec = TaggedObject>; @@ -954,12 +956,12 @@ pub struct Timeouts { impl Timeouts { pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(20); /// Read timeout - pub fn read(&self) -> Option { - Some(self.read) + pub fn read(&self) -> Duration { + self.read } /// Write timeout - pub fn write(&self) -> Option { - Some(self.write) + pub fn write(&self) -> Duration { + self.write } } @@ -1173,9 +1175,7 @@ pub fn hash(object: T) -> Result<[u8; 32]> { /// Calculate hash of TL object, boxed option pub fn hash_boxed(object: &T) -> Result<[u8; 32]> { let data = serialize(object)?; - let buf = sha2::Sha256::digest(&data[..]); - let hash = buf.as_slice(); - Ok(from_slice!(hash, 32)) + Ok(sha2::Sha256::digest(&data).into()) } /// Serialize TL object into bytes @@ -1198,14 +1198,14 @@ pub fn serialize_inplace(buf: &mut Vec, object: &T) -> Re } /// Serialize TL object into bytes -pub fn serialize_boxed(object: &T) -> Result> { +pub fn serialize_unboxed(object: &T) -> Result> { let mut buf = Vec::new(); Serializer::new(&mut buf).write_into_boxed(object)?; Ok(buf) } /// Serialize TL object into bytes in-place -pub fn serialize_boxed_inplace(buf: &mut Vec, object: &T) -> Result<()> { +pub fn serialize_unboxed_inplace(buf: &mut Vec, object: &T) -> Result<()> { buf.truncate(0); Serializer::new(buf).write_into_boxed(object) } diff --git a/src/node.rs b/src/node.rs index 00ae1e4..e75cf95 100644 --- a/src/node.rs +++ b/src/node.rs @@ -1,8 +1,8 @@ use crate::{ - declare_counted, from_slice, + declare_counted, common::{ add_counted_object_to_map, add_counted_object_to_map_with_update, - add_unbound_object_to_map, add_unbound_object_to_map_with_update, + add_unbound_object_to_map, add_unbound_object_to_map_with_update, AdnlCryptoUtils, AdnlHandshake, AdnlPeers, AdnlPingSubscriber, deserialize, get256, hash, CountedObject, Counter, KeyId, KeyOption, KeyOptionJson, Query, QueryCache, QueryId, serialize, Subscriber, TARGET, TaggedAdnlMessage, TaggedByteSlice, @@ -20,7 +20,7 @@ use sha2::Digest; use socket2::{Domain, SockAddr, Socket, Type}; use std::{ cmp::{max, min, Ordering}, collections::VecDeque, fmt::{self, Debug, Display, Formatter}, - io::ErrorKind, net::{IpAddr, Ipv4Addr, SocketAddr}, + convert::TryInto, io::ErrorKind, net::{IpAddr, Ipv4Addr, SocketAddr}, sync::{Arc, Condvar, Mutex, atomic::{self, AtomicI32, AtomicU32, AtomicU64, AtomicUsize}}, time::{Duration, Instant}, thread }; @@ -396,7 +396,7 @@ impl AdnlChannel { if buf.len() < 64 { fail!("Channel message is too short: {}", buf.len()) } - Self::process_data(buf, &side.secret); + Self::process_data(buf, &side.secret)?; if !sha2::Sha256::digest(&buf[64..]).as_slice().eq(&buf[32..64]) { fail!("Bad channel message checksum"); } @@ -413,18 +413,13 @@ impl AdnlChannel { } fn encrypt(buf: &mut Vec, side: &SubchannelSide) -> Result<()> { - let checksum = { - let checksum = sha2::Sha256::digest(&buf[..]); - let checksum = checksum.as_slice(); - from_slice!(checksum, 32) - }; + let checksum = sha2::Sha256::digest(buf); let len = buf.len(); buf.resize(len + 64, 0); - buf[..].copy_within(..len, 64); + buf[..].copy_within(..len, 64); buf[..32].copy_from_slice(&side.id); - buf[32..64].copy_from_slice(&checksum[..]); - Self::process_data(buf, &side.secret); - Ok(()) + buf[32..64].copy_from_slice(&checksum); + Self::process_data(buf, &side.secret) } fn encrypt_ordinary(&self, buf: &mut Vec) -> Result<()> { @@ -447,14 +442,14 @@ impl AdnlChannel { &self.recv.priority.id } - fn process_data(buf: &mut Vec, secret: &[u8; 32]) { - let digest = &buf[32..64]; - let mut key = from_slice!(secret, 0, 16, digest, 16, 16); - let mut ctr = from_slice!(digest, 0, 4, secret, 20, 12); - AdnlCryptoUtils::build_cipher_secure(&mut key[..], &mut ctr[..]) - .apply_keystream(&mut buf[64..]); + fn process_data(buf: &mut Vec, secret: &[u8; 32]) -> Result<()> { + AdnlCryptoUtils::build_cipher_secure( + secret, + buf[32..64].try_into()? + ).apply_keystream(&mut buf[64..]); + Ok(()) } - + } struct AdnlNodeAddress { @@ -1269,6 +1264,25 @@ impl PeerState { } +struct Peers { + channels_send: Arc, + channels_wait: Arc, + map_of: lockfree::map::Map, Peer> +} + +impl Peers { + fn with_incinerator( + incinerator: &lockfree::map::SharedIncin, Arc> + ) -> Arc { + let ret = Peers { + map_of: lockfree::map::Map::new(), + channels_send: Arc::new(lockfree::map::Map::with_incin(incinerator.clone())), + channels_wait: Arc::new(lockfree::map::Map::with_incin(incinerator.clone())) + }; + Arc::new(ret) + } +} + struct Queue { queue: lockfree::queue::Queue, #[cfg(feature = "telemetry")] @@ -1631,7 +1645,6 @@ declare_counted!( type ChannelId = [u8; 32]; type ChannelsRecv = lockfree::map::Map; type ChannelsSend = lockfree::map::Map, Arc>; -type Peers = lockfree::map::Map, Peer>; type TransferId = [u8; 32]; #[cfg(feature = "telemetry")] @@ -1833,9 +1846,8 @@ struct AdnlAlloc { /// ADNL node pub struct AdnlNode { + channels_incinerator: lockfree::map::SharedIncin, Arc>, channels_recv: Arc, - channels_send: Arc, - channels_wait: Arc, config: AdnlNodeConfig, peers: lockfree::map::Map, Arc>, queries: Arc, @@ -1867,10 +1879,11 @@ impl AdnlNode { /// Constructor pub async fn with_config(mut config: AdnlNodeConfig) -> Result> { + let incinerator = lockfree::map::SharedIncin::new(); let peers = lockfree::map::Map::new(); let mut added = false; for key in config.keys.iter() { - peers.insert(key.val().id().clone(), Arc::new(lockfree::map::Map::new())); + peers.insert(key.val().id().clone(), Peers::with_incinerator(&incinerator)); added = true } if !added { @@ -1889,7 +1902,6 @@ impl AdnlNode { } let (queue_send_loopback_sender, queue_send_loopback_reader) = tokio::sync::mpsc::unbounded_channel(); - let incinerator = lockfree::map::SharedIncin::new(); #[cfg(feature = "telemetry")] let telemetry = { let ordinary = TelemetryByStage::with_priority(false); @@ -1949,9 +1961,8 @@ impl AdnlNode { transfers: Arc::new(AtomicU64::new(0)) }; let ret = Self { + channels_incinerator: incinerator, channels_recv: Arc::new(lockfree::map::Map::new()), - channels_send: Arc::new(lockfree::map::Map::with_incin(incinerator.clone())), - channels_wait: Arc::new(lockfree::map::Map::with_incin(incinerator)), config, peers, queries: Arc::new(lockfree::map::Map::new()), @@ -2333,7 +2344,7 @@ impl AdnlNode { add_unbound_object_to_map( &self.peers, ret.clone(), - || Ok(Arc::new(lockfree::map::Map::new())) + || Ok(Peers::with_incinerator(&self.channels_incinerator)) )?; Ok(ret) } @@ -2359,7 +2370,7 @@ impl AdnlNode { let IpAddress(peer_ip_address) = peer_ip_address; let mut error = None; let mut ret = peer_key.id().clone(); - let result = self.peers(local_key)?.insert_with( + let result = self.peers(local_key)?.map_of.insert_with( ret.clone(), |key, inserted, found| if let Some((_, found)) = found { ret = key.clone(); @@ -2459,7 +2470,7 @@ impl AdnlNode { let peers = self.peers.get(local_key).ok_or_else( || error!("Try to remove peer {} from unknown local key {}", peer_key, local_key) )?; - Ok(peers.val().remove(peer_key).is_some()) + Ok(peers.val().map_of.remove(peer_key).is_some()) } /// Node IP address @@ -2647,20 +2658,22 @@ impl AdnlNode { } /// Reset peers - pub fn reset_peers(&self, peers: &AdnlPeers) -> Result<()> { - let peer_list = self.peers(peers.local())?; - let peer = peer_list.get(peers.other()).ok_or_else( - || error!("Try to reset unknown peer pair {} -> {}", peers.local(), peers.other()) + pub fn reset_peers(&self, to_reset: &AdnlPeers) -> Result<()> { + let local_key = to_reset.local(); + let other_key = to_reset.other(); + let peers = self.peers(local_key)?; + let peer = peers.map_of.get(other_key).ok_or_else( + || error!("Try to reset unknown peer pair {} -> {}", local_key, other_key) )?; - log::warn!(target: TARGET, "Resetting peer pair {} -> {}", peers.local(), peers.other()); + log::warn!(target: TARGET, "Resetting peer pair {} -> {}", local_key, other_key); let peer = peer.val(); let address = AdnlNodeAddress::from_ip_address_and_key( IpAddress(peer.address.ip_address.load(atomic::Ordering::Relaxed)), peer.address.key.clone() )?; - self.channels_wait - .remove(peers.other()) - .or_else(|| self.channels_send.remove(peers.other())) + peers.channels_wait + .remove(other_key) + .or_else(|| peers.channels_send.remove(other_key)) .and_then( |removed| { let peer = Peer { @@ -2670,13 +2683,13 @@ impl AdnlNode { #[cfg(feature = "telemetry")] self, #[cfg(feature = "telemetry")] - peers + to_reset ), send_state: PeerState::for_send( #[cfg(feature = "telemetry")] self, #[cfg(feature = "telemetry")] - peers + to_reset ), counter: self.allocated.peers.clone().into() }; @@ -2684,7 +2697,7 @@ impl AdnlNode { self.telemetry.allocated.peers.update( self.allocated.peers.load(atomic::Ordering::Relaxed) ); - peer_list.insert(peers.other().clone(), peer); + peers.map_of.insert(other_key.clone(), peer); self.drop_receive_subchannels(removed.val()) } ); @@ -2704,22 +2717,23 @@ impl AdnlNode { #[cfg(feature = "telemetry")] tag: data.tag }; - match self.send_message_to_peer(msg, &peers, false)? { + let (_, repeat) = self.send_message_to_peer(msg, &peers, false)?; + match repeat { MessageRepeat::Unapplicable => Ok(()), x => fail!("INTERNAL ERROR: bad repeat {:?} in ADNL custom message", x) } } async fn add_subchannels(&self, channel: Arc, wait: bool) -> Result<()> { - let peer = self.peers(&channel.local_key)?; - let peer = peer.get(&channel.other_key).ok_or_else( + let peers = self.peers(&channel.local_key)?; + let peer = peers.map_of.get(&channel.other_key).ok_or_else( || error!("Cannot add subchannels to unknown peer {}", channel.other_key) )?; let peer = peer.val(); let added = if wait { let mut prev = None; let added = add_counted_object_to_map_with_update( - &self.channels_wait, + &peers.channels_wait, channel.other_key.clone(), |found| { prev = if let Some(found) = found { @@ -2735,7 +2749,7 @@ impl AdnlNode { )?; if added { prev.or_else( - || if let Some(removed) = self.channels_send.remove(&channel .other_key) { + || if let Some(removed) = peers.channels_send.remove(&channel.other_key) { Some(removed.val().clone()) } else { None @@ -2747,7 +2761,7 @@ impl AdnlNode { added } else { add_counted_object_to_map_with_update( - &self.channels_send, + &peers.channels_send, channel.other_key.clone(), |found| { if let Some(found) = found { @@ -2761,9 +2775,9 @@ impl AdnlNode { }; if !added { let ch = if wait { - self.channels_wait.get(&channel.other_key) + peers.channels_wait.get(&channel.other_key) } else { - self.channels_send.get(&channel.other_key) + peers.channels_send.get(&channel.other_key) }; if let Some(ch) = ch { let ch = ch.val(); @@ -2830,15 +2844,15 @@ impl AdnlNode { if dst_reinit_date.is_some() != reinit_date.is_some() { fail!("Destination and source reinit dates mismatch") } - let peer = self.peers(&local_key)?; + let peers = self.peers(&local_key)?; let peer = if other_key.is_some() { - if let Some(channel) = self.channels_send.get(&ret) { - peer.get(&channel.val().other_key) + if let Some(channel) = peers.channels_send.get(&ret) { + peers.map_of.get(&channel.val().other_key) } else { fail!("Unknown channel, ID {:x?}", ret) } } else { - peer.get(&ret) + peers.map_of.get(&ret) }; let peer = if let Some(peer) = peer { peer @@ -2932,7 +2946,7 @@ impl AdnlNode { let local_key = peers.local(); let other_key = peers.other(); let peer = self.peers(local_key)?; - let peer = if let Some(peer) = peer.get(other_key) { + let peer = if let Some(peer) = peer.map_of.get(other_key) { peer } else { fail!("Channel {} with unknown peer {} -> {}", context, local_key, other_key) @@ -2985,8 +2999,9 @@ impl AdnlNode { } // Ensure both sides of channel established if channel.flags.load(atomic::Ordering::Relaxed) & AdnlChannel::ESTABLISHED == 0 { - if let Some(removed) = self.channels_wait.remove(&channel.other_key) { - let result = self.channels_send.reinsert(removed); + let peers = self.peers(&channel.local_key)?; + if let Some(removed) = peers.channels_wait.remove(&channel.other_key) { + let result = peers.channels_send.reinsert(removed); if let lockfree::map::Insertion::Failed(_) = result { fail!("Internal error when register send channel"); } @@ -3166,7 +3181,7 @@ log::warn!(target: TARGET, "On recv create channel in {}", channel.local_key); _ => fail!("Unsupported ADNL message {:?}", msg) }; if let Some(msg) = msg { - let repeat = self.send_message_to_peer(msg, peers, priority)?; + let (_, repeat) = self.send_message_to_peer(msg, peers, priority)?; if priority { if let MessageRepeat::NotNeeded = &repeat { return Ok(()) @@ -3292,7 +3307,7 @@ log::warn!(target: TARGET, "On recv create channel in {}", channel.local_key); }; let pkt = deserialize(&packet.buf[..])? .downcast::() - .map_err(|pkt| failure::format_err!("Unsupported ADNL packet format {:?}", pkt))? + .map_err(|pkt| error!("Unsupported ADNL packet format {:?}", pkt))? .only(); let other_key = if let Some(key) = self.check_packet( &pkt, @@ -3312,7 +3327,7 @@ log::warn!(target: TARGET, "On recv create channel in {}", channel.local_key); }; let peers = Arc::new(AdnlPeers::with_keys(local_key, other_key)); #[cfg(feature = "telemetry")] - if let Some(peer) = self.peers(peers.local())?.get(peers.other()) { + if let Some(peer) = self.peers(peers.local())?.map_of.get(peers.other()) { peer.val().update_recv_stats(received_len as u64, peers.local()) } if let Some(msg) = pkt.message { @@ -3380,8 +3395,7 @@ log::warn!(target: TARGET, "On recv create channel in {}", channel.local_key); self.queue_send_loopback_packets.send((msg.object, peers.local().clone()))?; (None, MessageRepeat::Unapplicable) } else { - let channel = self.channels_send.get(peers.other()).map(|guard| guard.val().clone()); - (channel, self.send_message_to_peer(msg, &peers, priority)?) + self.send_message_to_peer(msg, &peers, priority)? }; self.queue_monitor_queries.push( (timeout.unwrap_or(Self::TIMEOUT_QUERY_MAX_MS), query_id) @@ -3400,7 +3414,7 @@ log::warn!(target: TARGET, "On recv create channel in {}", channel.local_key); msg: TaggedAdnlMessage, peers: &AdnlPeers, priority: bool - ) -> Result { + ) -> Result<(Option>, MessageRepeat)> { const SIZE_ANSWER_MSG: usize = 44; const SIZE_CONFIRM_CHANNEL_MSG: usize = 72; @@ -3430,17 +3444,18 @@ log::warn!(target: TARGET, "On recv create channel in {}", channel.local_key); log::trace!(target: TARGET, "Send message {:?}", msg.object); - let peer = self.peers(peers.local())?; - let peer = if let Some(peer) = peer.get(peers.other()) { + let src = self.key_by_id(peers.local())?; + let dst = peers.other(); + let peers = self.peers(peers.local())?; + let peer = if let Some(peer) = peers.map_of.get(dst) { peer } else { - fail!("Unknown peer {}", peers.other()) + fail!("Unknown peer {}", dst) }; let peer = peer.val(); - let src = self.key_by_id(peers.local())?; - let dst = peers.other(); - let channel = self.channels_send.get(dst); - let create_channel_msg = if channel.is_none() && self.channels_wait.get(dst).is_none() { + let channel = peers.channels_send.get(dst).map(|guard| guard.val().clone()); + + let create_channel_msg = if channel.is_none() && peers.channels_wait.get(dst).is_none() { log::debug!(target: TARGET, "Create channel {} -> {}", src.id(), dst); Some( CreateChannel { @@ -3464,53 +3479,48 @@ log::warn!(target: TARGET, "On recv create channel in {}", channel.local_key); AdnlMessage::Adnl_Message_Query(query) => query.query.len() + SIZE_QUERY_MSG, _ => fail!("Unexpected message to send {:?}", msg.object) }; - let channel = if let Some(ref channel) = channel { - Some(channel.val()) - } else { - None - }; - if size <= Self::MAX_ADNL_MESSAGE { + let repeat = if size <= Self::MAX_ADNL_MESSAGE { if let Some(create_channel_msg) = create_channel_msg { log::trace!(target: TARGET, "Send with message {:?}", create_channel_msg); self.send_packet( peer, &src, - channel, + channel.as_ref(), None, Some(vec![create_channel_msg, msg.object]), priority, #[cfg(feature = "telemetry")] msg.tag - ) + )? } else { self.send_packet( peer, &src, - channel, + channel.as_ref(), Some(msg.object), None, priority, #[cfg(feature = "telemetry")] msg.tag - ) + )? } } else { let data = serialize(&msg.object)?; let hash = sha2::Sha256::digest(&data); - let hash = arrayref::array_ref!(hash.as_slice(), 0, 32); + let hash: &[u8; 32] = hash.as_slice().try_into()?; let mut offset = 0; let mut repeat = if let Some(create_channel_msg) = create_channel_msg { let part_msg = build_part_message( - &data[..], - hash, + &data[..], + hash, &mut offset, Self::MAX_ADNL_MESSAGE - SIZE_CREATE_CHANNEL_MSG ); self.send_packet( peer, &src, - channel, + channel.as_ref(), None, Some(vec![create_channel_msg, part_msg]), priority, @@ -3523,14 +3533,14 @@ log::warn!(target: TARGET, "On recv create channel in {}", channel.local_key); while offset < data.len() { let part_msg = build_part_message( &data[..], - hash, + hash, &mut offset, Self::MAX_ADNL_MESSAGE ); let upd = self.send_packet( peer, &src, - channel, + channel.as_ref(), Some(part_msg), None, priority, @@ -3543,8 +3553,9 @@ log::warn!(target: TARGET, "On recv create channel in {}", channel.local_key); fail!("INTERNAL ERROR: bad repeat in ADNL message part") } }; - Ok(repeat) - } + repeat + }; + Ok((channel, repeat)) } @@ -3750,8 +3761,7 @@ log::warn!(target: TARGET, "On recv create channel in {}", channel.local_key); fail!("Invalid ADNL part transfer: parts mismatch") } } - let hash = sha2::Sha256::digest(&buf); - if arrayref::array_ref!(hash.as_slice(), 0, 32) != transfer_id { + if !sha2::Sha256::digest(&buf).as_slice().eq(transfer_id) { fail!("Bad hash of ADNL transfer {}", base64::encode(transfer_id)) } let msg = deserialize(&buf)? diff --git a/src/server.rs b/src/server.rs index 0eae16f..c884b50 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,11 +1,11 @@ use crate::{ - dump, from_slice, + dump, common::{ AdnlHandshake, AdnlPeers, AdnlPingSubscriber, AdnlStream, AdnlStreamCrypto, deserialize, KeyId, KeyOption, KeyOptionJson, Query, serialize_inplace, Subscriber, TARGET, Timeouts } }; -use std::{net::SocketAddr, sync::Arc, time::Duration}; +use std::{convert::TryInto, net::SocketAddr, sync::Arc, time::Duration}; use stream_cancel::StreamExt; use futures::prelude::*; use ton_api::ton::adnl::Message as AdnlMessage; @@ -145,7 +145,7 @@ impl AdnlServerThread { crypto.receive(&mut buf, &mut stream).await?; let msg = deserialize(&buf[..])? .downcast::() - .map_err(|msg| failure::format_err!("Unsupported ADNL message {:?}", msg))?; + .map_err(|msg| error!("Unsupported ADNL message {:?}", msg))?; let (consumed, reply) = match &msg { AdnlMessage::Adnl_Message_Query(query) => Query::process_adnl(&subscribers, &query, &peers).await?, @@ -166,8 +166,7 @@ impl AdnlServerThread { key: &lockfree::map::Map, Arc>, buf: &mut Vec ) -> Result<(AdnlStreamCrypto, AdnlPeers)> { - let other_key = &buf[32..64]; - let other_key = from_slice!(other_key, 32); + let other_key = buf[32..64].try_into()?; let local_key = AdnlHandshake::parse_packet(key, buf, Some(160))?.ok_or_else( || error!("Unknown ADNL server key, cannot decrypt") )?; @@ -176,9 +175,8 @@ impl AdnlServerThread { &other_key ).id().clone(); dump!(trace, TARGET, "Nonce", &buf[..160]); - let ret = AdnlStreamCrypto::with_nonce_as_server( - arrayref::array_mut_ref!(buf, 0, 160) - ); + let nonce: &mut [u8; 160] = buf.as_mut_slice().try_into()?; + let ret = AdnlStreamCrypto::with_nonce_as_server(nonce); buf.drain(0..160); Ok((ret, AdnlPeers::with_keys(local_key, other_key))) }