Skip to content

Commit

Permalink
chore: refactor for better code modularity and docs generation
Browse files Browse the repository at this point in the history
  • Loading branch information
digikata committed Nov 12, 2024
1 parent b465ac3 commit dfe23b7
Show file tree
Hide file tree
Showing 4 changed files with 286 additions and 236 deletions.
290 changes: 54 additions & 236 deletions fluvio/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import typing
from enum import Enum

from ._fluvio_python import (
Fluvio as _Fluvio,
FluvioConfig as _FluvioConfig,
Expand Down Expand Up @@ -32,62 +35,65 @@
MetaUpdateSmartModuleSpec as _MetaUpdateSmartModuleSpec,
MetadataPartitionSpec as _MetadataPartitionSpec,
)
from enum import Enum
from ._fluvio_python import Error as FluviorError # noqa: F401
import typing

# typing experiment
from ._fluvio_python import fluvio_consumer_with_config
from ._fluvio_python import Error as FluviorError # noqa: F401

class Record:
"""The individual record for a given stream."""

_inner: _Record

def __init__(self, inner: _Record):
self._inner = inner

def offset(self) -> int:
"""The offset from the initial offset for a given stream."""
return self._inner.offset()

def value(self) -> typing.List[int]:
"""Returns the contents of this Record's value"""
return self._inner.value()

def value_string(self) -> str:
"""The UTF-8 decoded value for this record."""
return self._inner.value_string()

def key(self) -> typing.List[int]:
"""Returns the contents of this Record's key, if it exists"""
return self._inner.key()

def key_string(self) -> str:
"""The UTF-8 decoded key for this record."""
return self._inner.key_string()

def timestamp(self) -> int:
"""Timestamp of this record."""
return self._inner.timestamp()

from .record import Record, RecordMetadata
from .specs import (
CommonCreateRequest,

class RecordMetadata:
"""Metadata of a record send to a topic."""
SmartModuleSpec,
TopicSpec,

_inner: _RecordMetadata
PartitionMap,

def __init__(self, inner: _RecordMetadata):
self._inner = inner
MessageMetadataTopicSpec,

def offset(self) -> int:
"""Return the offset of the sent record in the topic/partition."""
return self._inner.offset()
MetadataPartitionSpec,
MetadataSmartModuleSpec,
MetadataTopicSpec,

def partition_id(self) -> int:
"""Return the partition index the record was sent to."""
return self._inner.partition_id()
MetaUpdateSmartModuleSpec,
MetaUpdateTopicSpec,
)

# this structures the module a bit and allows pydoc to generate better docs
# with better ordering of types and functions
__all__ = [
# top level types
"Fluvio",
"FluvioConfig",
"FluvioAdmin",
"Record",
"RecordMetadata",
"Offset",

# producer
"TopicProducer",
"ProduceOutput",

# consumer
"ConsumerConfigExt",
"ConsumerConfigExtBuilder",
"ConsumerConfig",
"PartitionConsumer",
"MultiplePartitionConsumer",

# specs
"CommonCreateRequest",
"SmartModuleSpec",
"TopicSpec",
"PartitionMap",

"MessageMetadataTopicSpec",
"MetadataPartitionSpec",
"MetadataTopicSpec",
"MetaUpdateTopicSpec",

# Misc
"PartitionSelectionStrategy",
"SmartModuleKind",
]

class ProduceOutput:
"""Returned by of `TopicProducer.send` call allowing access to sent record metadata."""
Expand Down Expand Up @@ -117,45 +123,6 @@ async def async_wait(self) -> typing.Optional[RecordMetadata]:
"""
return await self._inner.async_wait()


# class Offset:
# """Describes the location of an event stored in a Fluvio partition."""

# _inner: _Offset

# @classmethod
# def absolute(cls, index: int):
# """Creates an absolute offset with the given index"""
# return cls(_Offset.absolute(index))

# @classmethod
# def beginning(cls):
# """Creates a relative offset starting at the beginning of the saved log"""
# return cls(_Offset.beginning())

# @classmethod
# def end(cls):
# """Creates a relative offset pointing to the newest log entry"""
# return cls(_Offset.end())

# @classmethod
# def from_beginning(cls, offset: int):
# """Creates a relative offset a fixed distance after the oldest log
# entry
# """
# return cls(_Offset.from_beginning(offset))

# @classmethod
# def from_end(cls, offset: int):
# """Creates a relative offset a fixed distance before the newest log
# entry
# """
# return cls(_Offset.from_end(offset))

# def __init__(self, inner: _Offset):
# self._inner = inner


class SmartModuleKind(Enum):
"""
Use of this is to explicitly set the kind of a smartmodule. Not required
Expand Down Expand Up @@ -723,155 +690,6 @@ def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record
yield Record(item)
item = stream.next()

class PartitionMap:
_inner: _PartitionMap

def __init__(self, inner: _PartitionMap):
self._inner = inner

@classmethod
def new(cls, partition: int, replicas: typing.List[int]):
return cls(_PartitionMap.new(partition, replicas))


class TopicSpec:
_inner: _TopicSpec

def __init__(self, inner: _TopicSpec):
self._inner = inner

@classmethod
def new_assigned(cls, partition_maps: typing.List[PartitionMap]):
partition_maps = [x._inner for x in partition_maps]
return cls(_TopicSpec.new_computed(partition_maps))

@classmethod
def new_computed(cls, partitions: int, replication: int, ignore: bool):
return cls(_TopicSpec.new_computed(partitions, replication, ignore))


class CommonCreateRequest:
_inner: _CommonCreateRequest

def __init__(self, inner: _CommonCreateRequest):
self._inner = inner

@classmethod
def new(cls, name: str, dry_run: bool, timeout: int):
return cls(_CommonCreateRequest.new(name, dry_run, timeout))


class MetadataTopicSpec:
_inner: _MetadataTopicSpec

def __init__(self, inner: _MetadataTopicSpec):
self._inner = inner

def name(self) -> str:
return self._inner.name()


class MessageMetadataTopicSpec:
_inner: _MessageMetadataTopicSpec

def __init__(self, inner: _MessageMetadataTopicSpec):
self._inner = inner

def is_update(self) -> bool:
return self._inner.is_update()

def is_delete(self) -> bool:
return self._inner.is_delete()

def metadata_topic_spec(self) -> MetadataTopicSpec:
return MetadataTopicSpec(self._inner.metadata_topic_spec())


class MetaUpdateTopicSpec:
_inner: _MetaUpdateTopicSpec

def __init__(self, inner: _MetaUpdateTopicSpec):
self._inner = inner

def all(self) -> typing.List[MetadataTopicSpec]:
inners = self._inner.all()
return [MetadataTopicSpec(i) for i in inners]

def changes(self) -> typing.List[MessageMetadataTopicSpec]:
inners = self._inner.changes()
return [MessageMetadataTopicSpec(i) for i in inners]

def epoch(self) -> int:
return self._inner.epoch()


class SmartModuleSpec:
_inner: _SmartModuleSpec

def __init__(self, inner: _SmartModuleSpec):
self._inner = inner

@classmethod
def new(cls, path: str):
f = open(path, mode="rb")
data = f.read()
f.close()
return cls(_SmartModuleSpec.with_binary(data))


class MetadataSmartModuleSpec:
_inner: _MetadataSmartModuleSpec

def __init__(self, inner: _MetadataSmartModuleSpec):
self._inner = inner

def name(self) -> str:
return self._inner.name()


class MessageMetadataSmartModuleSpec:
_inner: _MessageMetadataSmartModuleSpec

def __init__(self, inner: _MessageMetadataSmartModuleSpec):
self._inner = inner

def is_update(self) -> bool:
return self._inner.is_update()

def is_delete(self) -> bool:
return self._inner.is_delete()

def metadata_smart_module_spec(self) -> MetadataSmartModuleSpec:
return MetadataSmartModuleSpec(self._inner.metadata_smart_module_spec())


class MetaUpdateSmartModuleSpec:
_inner: _MetaUpdateSmartModuleSpec

def __init__(self, inner: _MetaUpdateSmartModuleSpec):
self._inner = inner

def all(self) -> typing.List[MetadataSmartModuleSpec]:
inners = self._inner.all()
return [MetadataSmartModuleSpec(i) for i in inners]

def changes(self) -> typing.List[MessageMetadataSmartModuleSpec]:
inners = self._inner.changes()
return [MessageMetadataSmartModuleSpec(i) for i in inners]

def epoch(self) -> int:
return self._inner.epoch()


class MetadataPartitionSpec:
_inner: _MetadataPartitionSpec

def __init__(self, inner: _MetadataPartitionSpec):
self._inner = inner

def name(self) -> str:
return self._inner.name()


class FluvioAdmin:
_inner: _FluvioAdmin
Expand Down
55 changes: 55 additions & 0 deletions fluvio/record.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from ._fluvio_python import (
Record as _Record,
RecordMetadata as _RecordMetadata,
ProduceOutput as _ProduceOutput,
)
import typing

class Record:
"""The individual record for a given stream."""

_inner: _Record

def __init__(self, inner: _Record):
self._inner = inner

def offset(self) -> int:
"""The offset from the initial offset for a given stream."""
return self._inner.offset()

def value(self) -> typing.List[int]:
"""Returns the contents of this Record's value"""
return self._inner.value()

def value_string(self) -> str:
"""The UTF-8 decoded value for this record."""
return self._inner.value_string()

def key(self) -> typing.List[int]:
"""Returns the contents of this Record's key, if it exists"""
return self._inner.key()

def key_string(self) -> str:
"""The UTF-8 decoded key for this record."""
return self._inner.key_string()

def timestamp(self) -> int:
"""Timestamp of this record."""
return self._inner.timestamp()


class RecordMetadata:
"""Metadata of a record send to a topic."""

_inner: _RecordMetadata

def __init__(self, inner: _RecordMetadata):
self._inner = inner

def offset(self) -> int:
"""Return the offset of the sent record in the topic/partition."""
return self._inner.offset()

def partition_id(self) -> int:
"""Return the partition index the record was sent to."""
return self._inner.partition_id()
Loading

0 comments on commit dfe23b7

Please sign in to comment.