Skip to content

Commit

Permalink
Merge pull request #73 from tansu-io/72-move-all-s3-support-into-dyno…
Browse files Browse the repository at this point in the history
…store

moving s3 into dynostore. in-memory store for testing
  • Loading branch information
shortishly authored Oct 15, 2024
2 parents 2499ff9 + 618fd47 commit 8cee628
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 1,135 deletions.
35 changes: 23 additions & 12 deletions tansu-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
use std::{path::PathBuf, str::FromStr, time::Duration};

use clap::Parser;
use object_store::aws::{AmazonS3Builder, S3ConditionalPut};
use object_store::{
aws::{AmazonS3Builder, S3ConditionalPut},
memory::InMemory,
};
use tansu_server::{broker::Broker, coordinator::group::administrator::Controller, Error, Result};
use tansu_storage::{pg::Postgres, s3::S3, StorageContainer};
use tansu_storage::{dynostore::DynoStore, pg::Postgres, StorageContainer};
use tokio::task::JoinSet;
use tracing::debug;
use tracing_subscriber::{fmt::format::FmtSpan, prelude::*, EnvFilter};
Expand Down Expand Up @@ -125,19 +128,27 @@ async fn main() -> Result<()> {
"s3" => {
let bucket_name = args.storage_engine.value.host_str().unwrap_or("tansu");

let object_store_builder = AmazonS3Builder::from_env()
AmazonS3Builder::from_env()
.with_bucket_name(bucket_name)
.with_conditional_put(S3ConditionalPut::ETagMatch);

S3::new(
args.kafka_cluster_id.as_str(),
args.kafka_node_id,
object_store_builder,
)
.map(StorageContainer::S3)
.map_err(Into::into)
.with_conditional_put(S3ConditionalPut::ETagMatch)
.build()
.map(|object_store| {
DynoStore::new(
args.kafka_cluster_id.as_str(),
args.kafka_node_id,
object_store,
)
})
.map(StorageContainer::DynoStore)
.map_err(Into::into)
}

"memory" => Ok(StorageContainer::DynoStore(DynoStore::new(
args.kafka_cluster_id.as_str(),
args.kafka_node_id,
InMemory::new(),
))),

_unsupported => Err(Error::UnsupportedStorageUrl(args.storage_engine.value)),
}?;

Expand Down
17 changes: 0 additions & 17 deletions tansu-storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use dynostore::DynoStore;
use glob::{GlobError, PatternError};
use pg::Postgres;
use regex::Regex;
use s3::S3;
use serde::{Deserialize, Serialize};
use std::{
array::TryFromSliceError,
Expand Down Expand Up @@ -56,7 +55,6 @@ pub mod dynostore;
pub mod index;
pub mod os;
pub mod pg;
pub mod s3;
pub mod segment;

const NULL_TOPIC_ID: [u8; 16] = [0; 16];
Expand Down Expand Up @@ -593,7 +591,6 @@ pub enum UpdateError<T> {
#[derive(Clone, Debug)]
pub enum StorageContainer {
Postgres(Postgres),
S3(S3),
DynoStore(DynoStore),
}

Expand All @@ -602,15 +599,13 @@ impl Storage for StorageContainer {
async fn register_broker(&self, broker_registration: BrokerRegistationRequest) -> Result<()> {
match self {
Self::Postgres(pg) => pg.register_broker(broker_registration).await,
Self::S3(s3) => s3.register_broker(broker_registration).await,
Self::DynoStore(dyn_store) => dyn_store.register_broker(broker_registration).await,
}
}

async fn create_topic(&self, topic: CreatableTopic, validate_only: bool) -> Result<Uuid> {
match self {
Self::Postgres(pg) => pg.create_topic(topic, validate_only).await,
Self::S3(s3) => s3.create_topic(topic, validate_only).await,
Self::DynoStore(dyn_store) => dyn_store.create_topic(topic, validate_only).await,
}
}
Expand All @@ -621,31 +616,27 @@ impl Storage for StorageContainer {
) -> Result<Vec<DeleteRecordsTopicResult>> {
match self {
Self::Postgres(pg) => pg.delete_records(topics).await,
Self::S3(s3) => s3.delete_records(topics).await,
Self::DynoStore(dyn_store) => dyn_store.delete_records(topics).await,
}
}

async fn delete_topic(&self, name: &str) -> Result<u64> {
match self {
Self::Postgres(pg) => pg.delete_topic(name).await,
Self::S3(s3) => s3.delete_topic(name).await,
Self::DynoStore(dyn_store) => dyn_store.delete_topic(name).await,
}
}

async fn brokers(&self) -> Result<Vec<DescribeClusterBroker>> {
match self {
Self::Postgres(pg) => pg.brokers().await,
Self::S3(s3) => s3.brokers().await,
Self::DynoStore(dyn_store) => dyn_store.brokers().await,
}
}

async fn produce(&self, topition: &Topition, batch: deflated::Batch) -> Result<i64> {
match self {
Self::Postgres(pg) => pg.produce(topition, batch).await,
Self::S3(s3) => s3.produce(topition, batch).await,
Self::DynoStore(dyn_store) => dyn_store.produce(topition, batch).await,
}
}
Expand All @@ -659,7 +650,6 @@ impl Storage for StorageContainer {
) -> Result<deflated::Batch> {
match self {
Self::Postgres(pg) => pg.fetch(topition, offset, min_bytes, max_bytes).await,
Self::S3(s3) => s3.fetch(topition, offset, min_bytes, max_bytes).await,
Self::DynoStore(dyn_store) => {
dyn_store
.fetch(topition, offset, min_bytes, max_bytes)
Expand All @@ -671,7 +661,6 @@ impl Storage for StorageContainer {
async fn offset_stage(&self, topition: &Topition) -> Result<OffsetStage> {
match self {
Self::Postgres(pg) => pg.offset_stage(topition).await,
Self::S3(s3) => s3.offset_stage(topition).await,
Self::DynoStore(dyn_store) => dyn_store.offset_stage(topition).await,
}
}
Expand All @@ -682,7 +671,6 @@ impl Storage for StorageContainer {
) -> Result<Vec<(Topition, ListOffsetResponse)>> {
match self {
Self::Postgres(pg) => pg.list_offsets(offsets).await,
Self::S3(s3) => s3.list_offsets(offsets).await,
Self::DynoStore(dyn_store) => dyn_store.list_offsets(offsets).await,
}
}
Expand All @@ -695,7 +683,6 @@ impl Storage for StorageContainer {
) -> Result<Vec<(Topition, ErrorCode)>> {
match self {
Self::Postgres(pg) => pg.offset_commit(group_id, retention_time_ms, offsets).await,
Self::S3(s3) => s3.offset_commit(group_id, retention_time_ms, offsets).await,
Self::DynoStore(dyn_store) => {
dyn_store
.offset_commit(group_id, retention_time_ms, offsets)
Expand All @@ -712,7 +699,6 @@ impl Storage for StorageContainer {
) -> Result<BTreeMap<Topition, i64>> {
match self {
Self::Postgres(pg) => pg.offset_fetch(group_id, topics, require_stable).await,
Self::S3(s3) => s3.offset_fetch(group_id, topics, require_stable).await,
Self::DynoStore(dyn_store) => {
dyn_store
.offset_fetch(group_id, topics, require_stable)
Expand All @@ -724,7 +710,6 @@ impl Storage for StorageContainer {
async fn metadata(&self, topics: Option<&[TopicId]>) -> Result<MetadataResponse> {
match self {
Self::Postgres(pg) => pg.metadata(topics).await,
Self::S3(s3) => s3.metadata(topics).await,
Self::DynoStore(dyn_store) => dyn_store.metadata(topics).await,
}
}
Expand All @@ -737,7 +722,6 @@ impl Storage for StorageContainer {
) -> Result<DescribeConfigsResult> {
match self {
Self::Postgres(pg) => pg.describe_config(name, resource, keys).await,
Self::S3(s3) => s3.describe_config(name, resource, keys).await,
Self::DynoStore(dyn_store) => dyn_store.describe_config(name, resource, keys).await,
}
}
Expand All @@ -750,7 +734,6 @@ impl Storage for StorageContainer {
) -> Result<Version, UpdateError<GroupDetail>> {
match self {
Self::Postgres(pg) => pg.update_group(group_id, detail, version).await,
Self::S3(s3) => s3.update_group(group_id, detail, version).await,
Self::DynoStore(dyn_store) => dyn_store.update_group(group_id, detail, version).await,
}
}
Expand Down
Loading

0 comments on commit 8cee628

Please sign in to comment.