Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement Fluvio::connect_with_config, add FluvioConfig #300

Merged
merged 3 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions fluvio/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from ._fluvio_python import (
Fluvio as _Fluvio,
FluvioConfig as _FluvioConfig,
ConsumerConfig as _ConsumerConfig,
PartitionConsumer as _PartitionConsumer,
PartitionConsumerStream as _PartitionConsumerStream,
Expand Down Expand Up @@ -275,6 +276,59 @@ def send_all(self, records: typing.List[typing.Tuple[bytes, bytes]]):
return self._inner.send_all(records_inner)


class FluvioConfig:
"""Configuration for Fluvio client"""

_inner: _FluvioConfig

def __init__(self, inner: _FluvioConfig):
self._inner = inner

@classmethod
def load(cls):
"""get current cluster config from default profile"""
return cls(_FluvioConfig.load())

@classmethod
def new(cls, addr: str):
"""Create a new cluster configuration with no TLS."""
return cls(_FluvioConfig.new(addr))

def set_endpoint(self, endpoint: str):
"""set endpoint"""
self._inner.set_endpoint(endpoint)

def set_use_spu_local_address(self, val: bool):
"""set wheather to use spu local address"""
self._inner.set_use_spu_local_address(val)

def disable_tls(self):
"""disable tls for this config"""
self._inner.disable_tls()

def set_anonymous_tls(self):
"""set the config to use anonymous tls"""
self._inner.set_anonymous_tls()

def set_inline_tls(self, domain: str, key: str, cert: str, ca_cert: str):
"""specify inline tls parameters"""
self._inner.set_inline_tls(domain, key, cert, ca_cert)

def set_tls_file_paths(
self, domain: str, key_path: str, cert_path: str, ca_cert_path: str
):
"""specify paths to tls files"""
self._inner.set_tls_file_paths(domain, key_path, cert_path, ca_cert_path)

def set_client_id(self, client_id: str):
"""set client id"""
self._inner.set_client_id(client_id)

def unset_client_id(self):
"""remove the configured client id from config"""
self._inner.unset_client_id()


class Fluvio:
"""An interface for interacting with Fluvio streaming."""

Expand All @@ -295,6 +349,11 @@ def connect(cls):
"""
return cls(_Fluvio.connect())

@classmethod
def connect_with_config(cls, config: FluvioConfig):
"""Creates a new Fluvio client using the given configuration"""
return cls(_Fluvio.connect_with_config(config._inner))

def partition_consumer(self, topic: str, partition: int) -> PartitionConsumer:
"""Creates a new `PartitionConsumer` for the given topic and partition

Expand Down
6 changes: 5 additions & 1 deletion integration-tests/test_fluvio_python.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from string import ascii_lowercase
from fluvio import Fluvio, Offset, ConsumerConfig, SmartModuleKind
from fluvio import Fluvio, Offset, ConsumerConfig, SmartModuleKind, FluvioConfig
import unittest
import uuid
import os
Expand Down Expand Up @@ -373,6 +373,10 @@ def test_connect(self):
# A very simple test
Fluvio.connect()

def test_connect_with_config(self):
config = FluvioConfig.load()
Fluvio.connect_with_config(config)

def test_produce(self):
fluvio = Fluvio.connect()

Expand Down
16 changes: 16 additions & 0 deletions src/glue.rs.in
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ foreign_class!(class Fluvio {
self_type Fluvio;
private constructor = empty;
fn _Fluvio::connect() -> Result<Fluvio, FluvioError>;
fn _Fluvio::connect_with_config(config: &FluvioConfig) -> Result<Fluvio, FluvioError>;
fn _Fluvio::partition_consumer(
&self,
_: String,
Expand All @@ -13,6 +14,21 @@ foreign_class!(class Fluvio {
) -> Result<TopicProducer, FluvioError>;
});

foreign_class!(class FluvioConfig {
self_type FluvioConfig;
private constructor = empty;
fn FluvioConfig::load() -> Result<FluvioConfig, FluvioError>;
fn FluvioConfig::new(addr: &str) -> FluvioConfig;
fn FluvioConfig::set_endpoint(&mut self, endpoint: &str);
fn FluvioConfig::set_use_spu_local_address(&mut self, val: bool);
fn FluvioConfig::disable_tls(&mut self);
fn FluvioConfig::set_anonymous_tls(&mut self);
fn FluvioConfig::set_inline_tls(&mut self, domain: &str, key: &str, cert: &str, ca_cert: &str);
fn FluvioConfig::set_tls_file_paths(&mut self, domain: &str, key_path: &str, cert_path: &str, ca_cert_path: &str);
fn FluvioConfig::set_client_id(&mut self, id: &str);
fn FluvioConfig::unset_client_id(&mut self);
});

foreign_class!(
#[derive(Clone)]
class ProducerBatchRecord {
Expand Down
78 changes: 77 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#![allow(non_snake_case, unused)]
use fluvio::config::{TlsCerts, TlsConfig, TlsPaths, TlsPolicy};
use fluvio::consumer::{
ConsumerConfig as NativeConsumerConfig, ConsumerConfigBuilder,
SmartModuleContextData as NativeSmartModuleContextData, SmartModuleExtraParams,
SmartModuleInvocation, SmartModuleInvocationWasm, SmartModuleKind as NativeSmartModuleKind,
};
use fluvio::dataplane::link::ErrorCode;
use fluvio::FluvioConfig as NativeFluvioConfig;
use fluvio::{consumer::Record, Fluvio, Offset, PartitionConsumer, TopicProducer};
use fluvio_future::{
io::{Stream, StreamExt},
Expand All @@ -20,23 +22,97 @@ use cloud::{CloudClient, CloudLoginError};
mod error;

mod _Fluvio {

use super::*;

pub fn connect() -> Result<Fluvio, FluvioError> {
Ok(run_block_on(Fluvio::connect())?)
}

pub fn connect_with_config(config: &FluvioConfig) -> Result<Fluvio, FluvioError> {
Ok(run_block_on(Fluvio::connect_with_config(&config.inner))?)
}

pub fn partition_consumer(
fluvio: &Fluvio,
topic: String,
partition: u32,
) -> Result<PartitionConsumer, FluvioError> {
Ok(run_block_on(fluvio.partition_consumer(topic, partition))?)
}

pub fn topic_producer(fluvio: &Fluvio, topic: String) -> Result<TopicProducer, FluvioError> {
Ok(run_block_on(fluvio.topic_producer(topic))?)
}
}

pub struct FluvioConfig {
inner: NativeFluvioConfig,
}

impl FluvioConfig {
/// Load config file from default config dir
pub fn load() -> Result<FluvioConfig, FluvioError> {
let inner = NativeFluvioConfig::load()?;

Ok(FluvioConfig { inner })
}

/// Create without tls
pub fn new(addr: &str) -> FluvioConfig {
let inner = NativeFluvioConfig::new(addr);

FluvioConfig { inner }
}

pub fn set_endpoint(&mut self, endpoint: &str) {
self.inner.endpoint = endpoint.to_owned();
}

pub fn set_use_spu_local_address(&mut self, val: bool) {
self.inner.use_spu_local_address = val;
}

pub fn disable_tls(&mut self) {
self.inner.tls = TlsPolicy::Disabled;
}

pub fn set_anonymous_tls(&mut self) {
self.inner.tls = TlsPolicy::Anonymous;
}

pub fn set_inline_tls(&mut self, domain: &str, key: &str, cert: &str, ca_cert: &str) {
self.inner.tls = TlsPolicy::Verified(TlsConfig::Inline(TlsCerts {
domain: domain.to_owned(),
key: key.to_owned(),
cert: cert.to_owned(),
ca_cert: ca_cert.to_owned(),
}));
}

pub fn set_tls_file_paths(
&mut self,
domain: &str,
key_path: &str,
cert_path: &str,
ca_cert_path: &str,
) {
self.inner.tls = TlsPolicy::Verified(TlsConfig::Files(TlsPaths {
domain: domain.to_owned(),
key: key_path.into(),
cert: cert_path.into(),
ca_cert: ca_cert_path.into(),
}));
}

pub fn set_client_id(&mut self, id: &str) {
self.inner.client_id = Some(id.to_owned());
}

pub fn unset_client_id(&mut self) {
self.inner.client_id = None;
}
}

pub struct ConsumerConfig {
pub builder: ConsumerConfigBuilder,
pub smartmodules: Vec<SmartModuleInvocation>,
Expand Down
Loading