Skip to content

Commit

Permalink
Introduce API to subscribe to peer connection events (#625)
Browse files Browse the repository at this point in the history
* Introduce API to subscribe to peer connection events

* Add entry to CHANGELOG.md
  • Loading branch information
adzialocha authored Jun 26, 2024
1 parent 04b12ae commit abee607
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Node API to subscribe to peer connection events [#625](https://github.com/p2panda/aquadoggo/pull/625)
- Support private network secured by pre-shared key [#635](https://github.com/p2panda/aquadoggo/pull/635)

### Changed
Expand Down
29 changes: 29 additions & 0 deletions aquadoggo/src/api/api.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use anyhow::{bail, Result};
use tokio::sync::mpsc::Receiver;

use crate::api::{migrate, LockFile};
use crate::bus::{ServiceMessage, ServiceSender};
use crate::context::Context;

#[derive(Debug, Clone)]
pub enum NodeEvent {
PeerConnected,
PeerDisconnected,
}

/// Interface to interact with the node in a programmatic, "low-level" way.
#[derive(Debug)]
pub struct NodeInterface {
Expand Down Expand Up @@ -42,4 +49,26 @@ impl NodeInterface {

Ok(did_migration_happen)
}

pub async fn subscribe(&self) -> Receiver<NodeEvent> {
let mut rx = self.tx.subscribe();
let (events_tx, events_rx) = tokio::sync::mpsc::channel::<NodeEvent>(256);

tokio::task::spawn(async move {
loop {
match rx.recv().await {
Ok(ServiceMessage::PeerConnected(_)) => {
let _ = events_tx.send(NodeEvent::PeerConnected).await;
}
Ok(ServiceMessage::PeerDisconnected(_)) => {
let _ = events_tx.send(NodeEvent::PeerDisconnected).await;
}
Ok(_) => continue,
Err(_) => break,
}
}
});

events_rx
}
}
2 changes: 1 addition & 1 deletion aquadoggo/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod config_file;
mod lock_file;
mod migration;

pub use api::NodeInterface;
pub use api::{NodeEvent, NodeInterface};
pub use config_file::ConfigFile;
pub use lock_file::LockFile;
pub use migration::migrate;
9 changes: 8 additions & 1 deletion aquadoggo/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

use anyhow::Result;
use p2panda_rs::identity::KeyPair;
use tokio::sync::mpsc::Receiver;

use crate::api::NodeInterface;
use crate::api::{NodeEvent, NodeInterface};
use crate::bus::ServiceMessage;
use crate::config::Configuration;
use crate::context::Context;
Expand Down Expand Up @@ -131,4 +132,10 @@ impl Node {
pub async fn migrate(&self, lock_file: LockFile) -> Result<bool> {
self.api.migrate(lock_file).await
}

/// Subscribe to channel reporting on significant node events which can be interesting for
/// clients, for example when peers connect or disconnect.
pub async fn subscribe(&self) -> Receiver<NodeEvent> {
self.api.subscribe().await
}
}

0 comments on commit abee607

Please sign in to comment.