Skip to content

Commit

Permalink
Issue 16 : API for list scope and list stream (#20)
Browse files Browse the repository at this point in the history
API's for list scope and list stream
---------

Signed-off-by: anju_das <[email protected]>
Co-authored-by: anju_das <[email protected]>
  • Loading branch information
anju-c-das and anju_das authored Feb 6, 2024
1 parent f0446dd commit 3a825a5
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 1 deletion.
78 changes: 77 additions & 1 deletion src/stream_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ cfg_if! {
use pravega_client::client_factory::ClientFactory;
use pravega_client_shared::*;
use pravega_client_config::{ClientConfig, ClientConfigBuilder};
use pravega_controller_client::paginator::*;
use pyo3::prelude::*;
use pyo3::PyResult;
use pyo3::{exceptions, PyObjectProtocol};
Expand All @@ -25,6 +26,7 @@ cfg_if! {
use tracing::info;
use pravega_client::event::reader_group::ReaderGroupConfigBuilder;
use crate::stream_reader_group::StreamReaderGroupConfig;
use futures::StreamExt;
}
}

Expand Down Expand Up @@ -229,6 +231,80 @@ impl StreamManager {
}
}

///
/// list scope
///
#[pyo3(text_signature = "($self)")]
pub fn list_scope<'p>(&self, _py: Python<'p>) -> PyResult<Vec<String>> {
let controller = self.cf.controller_client();
let scope_result = list_scopes(controller);
futures::pin_mut!(scope_result);
let mut scope_vector = Vec::new();
// Used tokio::runtime::Runtime to block on the async code
let result = tokio::runtime::Runtime::new().unwrap().block_on(async {
while let Some(sc) = scope_result.next().await {
scope_vector.push(sc);
}
Ok::<_, PyErr>(())
});

// to check for errors in the async block
if let Err(e) = result {
return Err(e);
}
let mut scope_vector_act: Vec<String> = Vec::new();
for scope_val in scope_vector {
match scope_val {
Ok(scope) => {
scope_vector_act.push(scope.name)
},
Err(e) => {
return Err(exceptions::PyValueError::new_err(format!("{:?}", e)));
},
}
}
Ok(scope_vector_act)
}

///
/// list streams
///
#[pyo3(text_signature = "($self, scope_name)")]
pub fn list_stream<'p>(&self, scope_name: &str, _py: Python<'p>) -> PyResult<Vec<String>> {
let controller = self.cf.controller_client();
let stream_result = list_streams(
Scope {
name: scope_name.to_string(),
},
controller,
);
futures::pin_mut!(stream_result);
let mut stream_vector = Vec::new();
let result = tokio::runtime::Runtime::new().unwrap().block_on(async {
while let Some(sc) = stream_result.next().await {
stream_vector.push(sc);
}
Ok::<_, PyErr>(())
});

// to check for errors in the async block
if let Err(e) = result {
return Err(e);
}
let mut stream_vector_act: Vec<String> = Vec::new();
for stream_val in stream_vector {
match stream_val {
Ok(scoped_stream) => {
stream_vector_act.push(scoped_stream.stream.name)
},
Err(e) => {
return Err(exceptions::PyValueError::new_err(format!("{:?}", e)));
},
}
}
Ok(stream_vector_act)
}

///
/// Create a Stream in Pravega.
///
Expand Down Expand Up @@ -646,4 +722,4 @@ impl PyObjectProtocol for StreamManager {
fn __repr__(&self) -> PyResult<String> {
Ok(format!("StreamManager({})", self.to_str()))
}
}
}
17 changes: 17 additions & 0 deletions tests/pravega_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,23 @@ def test_tags(self):
tags = stream_manager.get_stream_tags(scope, "testStream1")
self.assertEqual(["t4", "t5"], tags)

# test for list_scope API
scope_list = stream_manager.list_scope()
self.assertTrue(len(scope_list) > 0, "The scope list is empty")
# test for list_stream API
scope_name = str(scope_list[0])
self.assertTrue(scope_name, "The scope name is empty")
stream_list = stream_manager.list_stream(scope_name)
self.assertTrue(len(stream_list) > 0, "The stream list is empty")
# test with false scope name for list_stream API
false_scope = "falsescope"
stream_list1 = stream_manager.list_stream(false_scope)
self.assertEqual(stream_list1, [])
# test with empty scope name for list_stream API
empty_string = ""
stream_list2 = stream_manager.list_stream(empty_string)
self.assertEqual(stream_list2, [])

def test_writeEvent(self):
scope = ''.join(secrets.choice(string.ascii_lowercase + string.digits)
for i in range(10))
Expand Down

0 comments on commit 3a825a5

Please sign in to comment.