Skip to content

Commit

Permalink
Refactor connector cli options and processor.rs to handle batch events.
Browse files Browse the repository at this point in the history
  • Loading branch information
hippalus committed Jan 27, 2025
1 parent 63b9bc5 commit 13c23e9
Show file tree
Hide file tree
Showing 11 changed files with 197 additions and 219 deletions.
4 changes: 2 additions & 2 deletions docker-compose-distributed-test-with-kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ services:
context: .
dockerfile: Dockerfile.kafka
platform: linux/amd64
command: [ "parseable", "s3-store", "connectors", "kafka-sink" ]
command: [ "parseable", "s3-store", ]
expose:
- "8000"
environment:
Expand Down Expand Up @@ -141,7 +141,7 @@ services:
context: .
dockerfile: Dockerfile.kafka
platform: linux/amd64
command: [ "parseable", "s3-store", "connectors", "kafka-sink" ]
command: [ "parseable", "s3-store", ]
expose:
- "8000"
environment:
Expand Down
2 changes: 1 addition & 1 deletion docker-compose-test-with-kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ services:
context: .
dockerfile: Dockerfile.kafka
platform: linux/amd64
command: [ "parseable", "s3-store", "connectors", "kafka-sink" ]
command: [ "parseable", "s3-store", ]
ports:
- "8000:8000"
environment:
Expand Down
31 changes: 22 additions & 9 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use url::Url;
feature = "rdkafka-ssl-vendored",
feature = "rdkafka-sasl"
))]
use crate::connectors::common::config::ConnectorConfig;
use crate::connectors::kafka::config::KafkaConfig;

use crate::{
oidc::{self, OpenidConfig},
Expand Down Expand Up @@ -91,6 +91,13 @@ pub struct LocalStoreArgs {
pub options: Options,
#[command(flatten)]
pub storage: FSConfig,
#[cfg(any(
feature = "rdkafka-ssl",
feature = "rdkafka-ssl-vendored",
feature = "rdkafka-sasl"
))]
#[command(flatten)]
pub kafka: KafkaConfig,
}

#[derive(Parser)]
Expand All @@ -99,6 +106,13 @@ pub struct S3StoreArgs {
pub options: Options,
#[command(flatten)]
pub storage: S3Config,
#[cfg(any(
feature = "rdkafka-ssl",
feature = "rdkafka-ssl-vendored",
feature = "rdkafka-sasl"
))]
#[command(flatten)]
pub kafka: KafkaConfig,
}

#[derive(Parser)]
Expand All @@ -107,6 +121,13 @@ pub struct BlobStoreArgs {
pub options: Options,
#[command(flatten)]
pub storage: AzureBlobConfig,
#[cfg(any(
feature = "rdkafka-ssl",
feature = "rdkafka-ssl-vendored",
feature = "rdkafka-sasl"
))]
#[command(flatten)]
pub kafka: KafkaConfig,
}

#[derive(Parser, Debug)]
Expand Down Expand Up @@ -309,14 +330,6 @@ pub struct Options {

#[arg(long, env = "P_MS_CLARITY_TAG", help = "Tag for MS Clarity")]
pub ms_clarity_tag: Option<String>,

#[cfg(any(
feature = "rdkafka-ssl",
feature = "rdkafka-ssl-vendored",
feature = "rdkafka-sasl"
))]
#[command(flatten)]
pub connector: Option<ConnectorConfig>,
}

#[derive(Parser, Debug)]
Expand Down
103 changes: 0 additions & 103 deletions src/connectors/common/config.rs

This file was deleted.

25 changes: 24 additions & 1 deletion src/connectors/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

use clap::ValueEnum;
use rdkafka::error::{KafkaError, RDKafkaErrorCode};
use std::str::FromStr;
use thiserror::Error;
use tokio::runtime;
use tokio::runtime::Builder;

pub mod config;
pub mod processor;
pub mod shutdown;

Expand Down Expand Up @@ -77,6 +79,27 @@ impl ConnectorError {
}
}

#[derive(ValueEnum, Default, Clone, Debug, PartialEq, Eq, Hash)]
pub enum BadData {
#[default]
Fail,
Drop,
Dlt, //TODO: Implement Dead Letter Topic support when needed
}

impl FromStr for BadData {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"drop" => Ok(BadData::Drop),
"fail" => Ok(BadData::Fail),
"dlt" => Ok(BadData::Dlt),
_ => Err(format!("Invalid bad data policy: {}", s)),
}
}
}

pub fn build_runtime(worker_threads: usize, thread_name: &str) -> anyhow::Result<runtime::Runtime> {
Builder::new_multi_thread()
.enable_all()
Expand Down
Loading

0 comments on commit 13c23e9

Please sign in to comment.