Skip to content

Commit

Permalink
fix allowed schema ID logic; add test (#603)
Browse files Browse the repository at this point in the history
* fix allowed schema ID logic; add test

* Update CHANGELOG.md

* run cargo fmt; fix PR number
  • Loading branch information
jmanm authored Nov 29, 2023
1 parent 58ef15b commit 162c350
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 9 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- Fix bug where known schemas are not replicated between nodes [#603](https://github.com/p2panda/aquadoggo/pull/603).

## [0.7.0]

### Added
Expand Down
30 changes: 29 additions & 1 deletion aquadoggo/src/replication/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl SyncIngest {
// If the node has been configured with an allow-list of supported schema ids, check that
// the sent operation follows one of our supported schema
if self.schema_provider.is_allow_list_active()
&& self
&& !self
.schema_provider
.supported_schema_ids()
.await
Expand Down Expand Up @@ -115,6 +115,7 @@ mod tests {
use crate::replication::errors::IngestError;
use crate::replication::SyncIngest;
use crate::test_utils::{test_runner_with_manager, TestNodeManager};
use crate::{AllowList, Configuration};

#[rstest]
fn reject_duplicate_entries(
Expand Down Expand Up @@ -142,4 +143,31 @@ mod tests {
assert!(matches!(result, Err(IngestError::DuplicateEntry(_))));
})
}

#[rstest]
fn allow_supported_schema_ids(
schema: Schema,
encoded_entry: EncodedEntry,
encoded_operation: EncodedOperation,
) {
test_runner_with_manager(move |manager: TestNodeManager| async move {
let config = Configuration {
allow_schema_ids: AllowList::Set(vec![schema.id().clone()]),
..Configuration::default()
};
let node = manager.create_with_config(config).await;

assert!(node.context.schema_provider.is_allow_list_active());

let _ = node.context.schema_provider.update(schema.clone()).await;
let (tx, _rx) = broadcast::channel(8);
let ingest = SyncIngest::new(node.context.schema_provider.clone(), tx.clone());

let result = ingest
.handle_entry(&node.context.store, &encoded_entry, &encoded_operation)
.await;

assert!(result.is_ok());
});
}
}
14 changes: 6 additions & 8 deletions aquadoggo/src/test_utils/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,20 @@ impl TestNodeManager {
}

pub async fn create(&self) -> TestNode {
self.create_with_config(Configuration::default()).await
}

pub async fn create_with_config(&self, config: Configuration) -> TestNode {
let (_config, pool) = initialize_sqlite_db().await;

// Initialise test store using pool.
let store = SqlStore::new(pool.clone());

// Construct node config supporting any schema.
let cfg = Configuration::default();
let schema_provider = SchemaProvider::new(vec![], config.allow_schema_ids.clone());

// Construct the actual test node
let test_node = TestNode {
context: Context::new(
store.clone(),
KeyPair::new(),
cfg,
SchemaProvider::default(),
),
context: Context::new(store.clone(), KeyPair::new(), config, schema_provider),
};

self.pools.lock().await.push(pool);
Expand Down

0 comments on commit 162c350

Please sign in to comment.