Skip to content

Commit

Permalink
feat: add peer recon ring (#616)
Browse files Browse the repository at this point in the history
* feat: add peer recon ring

With this change there is now a Recon ring for sharing information about
peers. Peers will synchronized all known peers with connected peers.
Each peer will self-publish into the ring periodically.

Fixes: #604
Fixes: #607

* update for offset/limit changes

* nit, rename interest -> peer
  • Loading branch information
nathanielc authored Dec 5, 2024
1 parent 9e118c6 commit c1e7545
Show file tree
Hide file tree
Showing 36 changed files with 1,832 additions and 259 deletions.
34 changes: 34 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ members = [
"metrics",
"one",
"p2p",
"peer-svc",
"pipeline",
"recon",
"sql",
Expand Down Expand Up @@ -76,6 +77,7 @@ ceramic-metadata = { path = "./metadata" }
ceramic-metrics = { path = "./metrics" }
ceramic-one = { path = "./one" }
ceramic-p2p = { path = "./p2p" }
ceramic-peer-svc = { path = "./peer-svc" }
ceramic-pipeline = { path = "./pipeline" }
ceramic-sql = { path = "./sql" }
ceramic-validation = { path = "./validation" }
Expand Down
2 changes: 1 addition & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub mod interest;
mod jwk;
mod network;
mod node_id;
mod peer;
pub mod peer;
mod range;
mod serialize_ext;
pub mod signer;
Expand Down
52 changes: 32 additions & 20 deletions core/src/node_id.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::fmt::Display;
use std::{fs, path::PathBuf, str::FromStr};
use std::{fmt::Display, path::Path, str::FromStr};

use anyhow::{anyhow, Context, Ok, Result};
use anyhow::{anyhow, Context, Result};
use cid::{multihash::Multihash, Cid};
use libp2p_identity::PeerId;
use rand::Rng;
Expand Down Expand Up @@ -159,14 +158,10 @@ impl Display for NodeId {
}

/// A [`NodeId`] with its private key.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct NodeKey {
id: NodeId,
// It would be preferable to not store the private_key_bytes directly and instead use only the
// key_pair. However to use JWK we need to keep the private_key_bytes around.
// Maybe in future versions of ssi_jwk we can change this.
private_key_bytes: [u8; 32],
key_pair: Ed25519KeyPair,
did: DidDocument,
}

Expand All @@ -179,11 +174,10 @@ impl Eq for NodeKey {}

impl NodeKey {
/// Construct a new key with both private and public keys.
fn new(id: NodeId, private_key_bytes: [u8; 32], key_pair: Ed25519KeyPair) -> Self {
fn new(id: NodeId, private_key_bytes: [u8; 32]) -> Self {
Self {
id,
private_key_bytes,
key_pair,
did: id.did(),
}
}
Expand All @@ -204,12 +198,24 @@ impl NodeKey {
self.id
}

/// Read an Ed25519 key from a directory
pub fn try_from_dir(key_dir: PathBuf) -> Result<NodeKey> {
let key_path = key_dir.join("id_ed25519_0");
let content = fs::read_to_string(key_path)?;
let seed = ssh_key::private::PrivateKey::from_str(&content)
.map_err(|e| anyhow::anyhow!("failed to parse private key: {}", e))?
/// Read an Ed25519 key from a directory or create a new key if not found in the directory.
pub async fn try_from_dir(key_dir: impl AsRef<Path>) -> Result<NodeKey> {
let key_path = key_dir.as_ref().join("id_ed25519_0");
let private_key = match tokio::fs::read_to_string(&key_path).await {
Ok(content) => ssh_key::private::PrivateKey::from_str(&content)
.map_err(|e| anyhow::anyhow!("failed to parse private key: {}", e))?,
Err(_) => {
let key = ssh_key::private::PrivateKey::random(
&mut rand::rngs::OsRng,
ssh_key::Algorithm::Ed25519,
)?;
// Write out contents to file for next time
let content = key.to_openssh(ssh_key::LineEnding::default())?;
tokio::fs::write(&key_path, content).await?;
key
}
};
let seed = private_key
.key_data()
.ed25519()
.map_or(Err(anyhow::anyhow!("failed to parse ed25519 key")), |key| {
Expand All @@ -223,7 +229,6 @@ impl NodeKey {
public_ed25519_key_bytes,
},
seed,
key_pair,
))
}
/// Create an Ed25519 key pair from a secret. The secret can be formatted in two ways:
Expand Down Expand Up @@ -279,7 +284,7 @@ impl NodeKey {
let id = NodeId {
public_ed25519_key_bytes,
};
Ok(NodeKey::new(id, secret, key_pair))
Ok(NodeKey::new(id, secret))
}
/// Create a NodeId using a random Ed25519 key pair
///
Expand All @@ -299,12 +304,19 @@ impl NodeKey {
public_ed25519_key_bytes,
},
random_secret,
key_pair,
)
}
/// Sign data with this key
pub fn sign(&self, data: &[u8]) -> Signature {
self.key_pair.sign(data)
let key_pair = Ed25519KeyPair::from_seed_unchecked(&self.private_key_bytes)
.expect("private key bytes should already be validated");
key_pair.sign(data)
}

/// Construct a [`libp2p_identity::Keypair`] from this node key.
pub fn p2p_keypair(&self) -> libp2p_identity::Keypair {
libp2p_identity::Keypair::ed25519_from_bytes(self.private_key_bytes)
.expect("private key bytes should already be validated")
}
}

Expand Down
156 changes: 147 additions & 9 deletions core/src/peer.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,31 @@
//! Peer structures for managing known peers the network.
//! [`PeerEntry`] is be signed by the peer such that [`PeerEntry`] structs can be gossipped around
//! the network safely.
use anyhow::{anyhow, bail};
use multiaddr::{Multiaddr, PeerId};
use serde::{Deserialize, Serialize};
use ssi::jws::DecodedJWS;

use crate::{node_id::NodeKey, signer::Signer, DeserializeExt as _, NodeId, SerializeExt as _};

const MIN_EXPIRATION: u64 = 0;
// 11 9s is the maximum value we can encode into the string representation of a PeerKey.
const MAX_EXPIRATION: u64 = 99_999_999_999;

/// Peer entry that is signed and can be shared.
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct PeerEntry {
id: NodeId,
// Number of seconds after UNIX epoch when this entry is no longer valid.
expiration: u32,
expiration: u64,
addresses: Vec<Multiaddr>,
}

impl PeerEntry {
/// Construct an entry about a peer with address that is no longer valid after expiration seconds after the
/// UNIX epoch.
pub fn new(local_id: NodeId, expiration: u32, addresses: Vec<Multiaddr>) -> Self {
pub fn new(local_id: NodeId, expiration: u64, addresses: Vec<Multiaddr>) -> Self {
let peer_id = local_id.peer_id();
Self {
id: local_id,
Expand Down Expand Up @@ -59,7 +67,7 @@ impl PeerEntry {
}

/// Report the number of seconds after the UNIX epoch when this entry is no longer valid.
pub fn expiration(&self) -> u32 {
pub fn expiration(&self) -> u64 {
self.expiration
}

Expand All @@ -84,10 +92,38 @@ fn ensure_multiaddr_has_p2p(addr: Multiaddr, peer_id: PeerId) -> Multiaddr {
/// Encoded [`PeerEntry`] prefixed with its expiration.
/// The sort order matters as its used in a Recon ring.
/// The key is valid utf-8 of the form `<expiration>.<jws>`;
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Clone, Debug, PartialOrd, Ord, PartialEq, Eq, Serialize, Deserialize)]
pub struct PeerKey(String);

impl std::fmt::Display for PeerKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}

impl TryFrom<Vec<u8>> for PeerKey {
type Error = anyhow::Error;

fn try_from(value: Vec<u8>) -> Result<Self, Self::Error> {
let key = Self(String::from_utf8(value)?);
let _ = key.to_entry()?;
Ok(key)
}
}

impl PeerKey {
/// Return a builder for constructing a PeerKey from its parts.
pub fn builder() -> Builder<Init> {
Builder { state: Init }
}
/// Return the raw bytes of the peer key.
pub fn as_slice(&self) -> &[u8] {
self.0.as_bytes()
}
/// Report if this key contains an jws section.
pub fn has_jws(&self) -> bool {
self.0.contains('.')
}
/// Construct a signed key from a [`PeerEntry`].
pub fn from_entry(entry: &PeerEntry, node_key: &NodeKey) -> anyhow::Result<Self> {
if entry.id() != node_key.id() {
Expand All @@ -102,11 +138,7 @@ impl PeerKey {
}
/// Decode and verify key as a [`PeerEntry`].
pub fn to_entry(&self) -> anyhow::Result<PeerEntry> {
let (expiration, jws) = self
.0
.split_once('.')
.ok_or_else(|| anyhow!("peer key must contain a '.'"))?;
let expiration: u32 = expiration.parse()?;
let (expiration, jws) = self.split_expiration()?;
let peer = PeerEntry::from_jws(jws)?;
if expiration != peer.expiration {
Err(anyhow!(
Expand All @@ -117,6 +149,112 @@ impl PeerKey {
Ok(peer)
}
}
fn split_expiration(&self) -> anyhow::Result<(u64, &str)> {
let (expiration, jws) = self
.0
.split_once('.')
.ok_or_else(|| anyhow!("peer key must contain a '.'"))?;
let expiration = expiration.parse()?;
Ok((expiration, jws))
}
}

/// Builder provides an ordered API for constructing a PeerKey
#[derive(Debug)]
pub struct Builder<S: BuilderState> {
state: S,
}
/// The state of the builder
pub trait BuilderState {}

/// Initial state of the builder.
#[derive(Debug)]
pub struct Init;
impl BuilderState for Init {}

/// Build state where the expiration is known.
pub struct WithExpiration {
expiration: u64,
}
impl BuilderState for WithExpiration {}

/// Build state where the peer id is known.
pub struct WithId<'a> {
node_key: &'a NodeKey,
expiration: u64,
}
impl<'a> BuilderState for WithId<'a> {}

/// Build state where the addresses are known.
pub struct WithAddresses<'a> {
node_key: &'a NodeKey,
expiration: u64,
addresses: Vec<Multiaddr>,
}
impl<'a> BuilderState for WithAddresses<'a> {}

impl Builder<Init> {
/// Set the expiration to earliest possible value.
pub fn with_min_expiration(self) -> Builder<WithExpiration> {
Builder {
state: WithExpiration {
expiration: MIN_EXPIRATION,
},
}
}
/// Set the expiration to the latest possible value.
pub fn with_max_expiration(self) -> Builder<WithExpiration> {
Builder {
state: WithExpiration {
expiration: MAX_EXPIRATION,
},
}
}
/// Set the expiration as the number of seconds since the UNIX epoch.
pub fn with_expiration(self, expiration: u64) -> Builder<WithExpiration> {
Builder {
state: WithExpiration { expiration },
}
}
}
impl Builder<WithExpiration> {
/// Finish the build producing a partial [`PeerKey`].
pub fn build_fencepost(self) -> PeerKey {
PeerKey(format!("{:0>11}", self.state.expiration))
}
/// Set the peer id. Note, a NodeKey is required so the [`PeerEntry`] can be signed.
pub fn with_id(self, id: &NodeKey) -> Builder<WithId> {
Builder {
state: WithId {
node_key: id,
expiration: self.state.expiration,
},
}
}
}
impl<'a> Builder<WithId<'a>> {
/// Set the addresses where the peer can be reached.
pub fn with_addresses(self, addresses: Vec<Multiaddr>) -> Builder<WithAddresses<'a>> {
Builder {
state: WithAddresses {
node_key: self.state.node_key,
expiration: self.state.expiration,
addresses,
},
}
}
}
impl<'a> Builder<WithAddresses<'a>> {
/// Finish the build producing a [`PeerKey`].
pub fn build(self) -> PeerKey {
let entry = PeerEntry::new(
self.state.node_key.id(),
self.state.expiration,
self.state.addresses,
);
PeerKey::from_entry(&entry, self.state.node_key)
.expect("builder should not build invalid peer key")
}
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit c1e7545

Please sign in to comment.