Skip to content

Commit

Permalink
chore: refactor for better code modules & docs
Browse files Browse the repository at this point in the history
- lint and format code
- cleanup docs generation, pickup lib.rs module code by using __any__
  • Loading branch information
digikata committed Nov 12, 2024
1 parent b465ac3 commit 8cb0eb2
Show file tree
Hide file tree
Showing 13 changed files with 3,977 additions and 4,229 deletions.
7,277 changes: 3,373 additions & 3,904 deletions docs/fluvio.html

Large diffs are not rendered by default.

305 changes: 298 additions & 7 deletions docs/fluvio/_fluvio_python.html

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions docs/fluvio/cloud.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/search.js

Large diffs are not rendered by default.

326 changes: 64 additions & 262 deletions fluvio/__init__.py
Original file line number Diff line number Diff line change
@@ -1,92 +1,83 @@
import typing
from enum import Enum

from ._fluvio_python import (
Fluvio as _Fluvio,
FluvioConfig as _FluvioConfig,
Offset,
FluvioAdmin as _FluvioAdmin,
# consumer types
ConsumerConfig as _ConsumerConfig,
ConsumerConfigExt,
ConsumerConfigExtBuilder,
OffsetManagementStrategy,
PartitionConsumer as _PartitionConsumer,
MultiplePartitionConsumer as _MultiplePartitionConsumer,
PartitionSelectionStrategy as _PartitionSelectionStrategy,
PartitionConsumerStream as _PartitionConsumerStream,
AsyncPartitionConsumerStream as _AsyncPartitionConsumerStream,
ProduceOutput as _ProduceOutput,
RecordMetadata as _RecordMetadata,
# producer types
TopicProducer as _TopicProducer,
ProduceOutput as _ProduceOutput,
ProducerBatchRecord as _ProducerBatchRecord,
# admin and misc types
SmartModuleKind as _SmartModuleKind,
Record as _Record,
Offset,
FluvioAdmin as _FluvioAdmin,
TopicSpec as _TopicSpec,
PartitionMap as _PartitionMap,
CommonCreateRequest as _CommonCreateRequest,
MetadataTopicSpec as _MetadataTopicSpec,
WatchTopicStream as _WatchTopicStream,
MetaUpdateTopicSpec as _MetaUpdateTopicSpec,
MessageMetadataTopicSpec as _MessageMetadataTopicSpec,
SmartModuleSpec as _SmartModuleSpec,
MetadataSmartModuleSpec as _MetadataSmartModuleSpec,
WatchSmartModuleStream as _WatchSmartModuleStream,
MessageMetadataSmartModuleSpec as _MessageMetadataSmartModuleSpec,
MetaUpdateSmartModuleSpec as _MetaUpdateSmartModuleSpec,
MetadataPartitionSpec as _MetadataPartitionSpec,
)
from enum import Enum
from ._fluvio_python import Error as FluviorError # noqa: F401
import typing

# typing experiment
from ._fluvio_python import fluvio_consumer_with_config

class Record:
"""The individual record for a given stream."""

_inner: _Record

def __init__(self, inner: _Record):
self._inner = inner

def offset(self) -> int:
"""The offset from the initial offset for a given stream."""
return self._inner.offset()

def value(self) -> typing.List[int]:
"""Returns the contents of this Record's value"""
return self._inner.value()

def value_string(self) -> str:
"""The UTF-8 decoded value for this record."""
return self._inner.value_string()

def key(self) -> typing.List[int]:
"""Returns the contents of this Record's key, if it exists"""
return self._inner.key()

def key_string(self) -> str:
"""The UTF-8 decoded key for this record."""
return self._inner.key_string()

def timestamp(self) -> int:
"""Timestamp of this record."""
return self._inner.timestamp()


class RecordMetadata:
"""Metadata of a record send to a topic."""

_inner: _RecordMetadata

def __init__(self, inner: _RecordMetadata):
self._inner = inner
from ._fluvio_python import Error as FluviorError # noqa: F401

def offset(self) -> int:
"""Return the offset of the sent record in the topic/partition."""
return self._inner.offset()
from .record import Record, RecordMetadata
from .specs import (
# support types
CommonCreateRequest,
PartitionMap,
# specs
SmartModuleSpec,
TopicSpec,
MessageMetadataTopicSpec,
MetadataPartitionSpec,
MetadataSmartModuleSpec,
MetadataTopicSpec,
MetaUpdateSmartModuleSpec,
MetaUpdateTopicSpec,
)

def partition_id(self) -> int:
"""Return the partition index the record was sent to."""
return self._inner.partition_id()
# this structures the module a bit and allows pydoc to generate better docs
# with better ordering of types and functions
# inclusion in __all__, will pull the PyO3 rust inline docs into
# the pdoc generated documentation
__all__ = [
# top level types
"Fluvio",
"FluvioConfig",
"FluvioAdmin",
"Record",
"RecordMetadata",
"Offset",
# producer
"TopicProducer",
"ProduceOutput",
# consumer
"ConsumerConfigExt",
"ConsumerConfigExtBuilder",
"ConsumerConfig",
"PartitionConsumer",
"MultiplePartitionConsumer",
# specs
"CommonCreateRequest",
"SmartModuleSpec",
"TopicSpec",
"PartitionMap",
"MessageMetadataTopicSpec",
"MetadataPartitionSpec",
"MetadataTopicSpec",
"MetaUpdateTopicSpec",
# Misc
"PartitionSelectionStrategy",
"SmartModuleKind",
]


class ProduceOutput:
Expand Down Expand Up @@ -118,44 +109,6 @@ async def async_wait(self) -> typing.Optional[RecordMetadata]:
return await self._inner.async_wait()


# class Offset:
# """Describes the location of an event stored in a Fluvio partition."""

# _inner: _Offset

# @classmethod
# def absolute(cls, index: int):
# """Creates an absolute offset with the given index"""
# return cls(_Offset.absolute(index))

# @classmethod
# def beginning(cls):
# """Creates a relative offset starting at the beginning of the saved log"""
# return cls(_Offset.beginning())

# @classmethod
# def end(cls):
# """Creates a relative offset pointing to the newest log entry"""
# return cls(_Offset.end())

# @classmethod
# def from_beginning(cls, offset: int):
# """Creates a relative offset a fixed distance after the oldest log
# entry
# """
# return cls(_Offset.from_beginning(offset))

# @classmethod
# def from_end(cls, offset: int):
# """Creates a relative offset a fixed distance before the newest log
# entry
# """
# return cls(_Offset.from_end(offset))

# def __init__(self, inner: _Offset):
# self._inner = inner


class SmartModuleKind(Enum):
"""
Use of this is to explicitly set the kind of a smartmodule. Not required
Expand Down Expand Up @@ -313,9 +266,7 @@ def stream_with_config(
`Iterator[Record]`
"""
return self._generator(
self._inner.stream_with_config(offset, config._inner)
)
return self._generator(self._inner.stream_with_config(offset, config._inner))

async def async_stream_with_config(
self, offset: Offset, config: ConsumerConfig
Expand Down Expand Up @@ -664,14 +615,14 @@ def connect_with_config(cls, config: FluvioConfig):
"""Creates a new Fluvio client using the given configuration"""
return cls(_Fluvio.connect_with_config(config._inner))

def consumer_with_config(self, config: ConsumerConfigExt) -> typing.Iterator[Record]:
def consumer_with_config(
self, config: ConsumerConfigExt
) -> typing.Iterator[Record]:
"""Creates consumer with settings defined in config
This is the recommended way to create a consume records.
"""
return self._generator(
self._inner.consumer_with_config(config)
)
return self._generator(self._inner.consumer_with_config(config))

def topic_producer(self, topic: str) -> TopicProducer:
"""
Expand Down Expand Up @@ -723,155 +674,6 @@ def _generator(self, stream: _PartitionConsumerStream) -> typing.Iterator[Record
yield Record(item)
item = stream.next()

class PartitionMap:
_inner: _PartitionMap

def __init__(self, inner: _PartitionMap):
self._inner = inner

@classmethod
def new(cls, partition: int, replicas: typing.List[int]):
return cls(_PartitionMap.new(partition, replicas))


class TopicSpec:
_inner: _TopicSpec

def __init__(self, inner: _TopicSpec):
self._inner = inner

@classmethod
def new_assigned(cls, partition_maps: typing.List[PartitionMap]):
partition_maps = [x._inner for x in partition_maps]
return cls(_TopicSpec.new_computed(partition_maps))

@classmethod
def new_computed(cls, partitions: int, replication: int, ignore: bool):
return cls(_TopicSpec.new_computed(partitions, replication, ignore))


class CommonCreateRequest:
_inner: _CommonCreateRequest

def __init__(self, inner: _CommonCreateRequest):
self._inner = inner

@classmethod
def new(cls, name: str, dry_run: bool, timeout: int):
return cls(_CommonCreateRequest.new(name, dry_run, timeout))


class MetadataTopicSpec:
_inner: _MetadataTopicSpec

def __init__(self, inner: _MetadataTopicSpec):
self._inner = inner

def name(self) -> str:
return self._inner.name()


class MessageMetadataTopicSpec:
_inner: _MessageMetadataTopicSpec

def __init__(self, inner: _MessageMetadataTopicSpec):
self._inner = inner

def is_update(self) -> bool:
return self._inner.is_update()

def is_delete(self) -> bool:
return self._inner.is_delete()

def metadata_topic_spec(self) -> MetadataTopicSpec:
return MetadataTopicSpec(self._inner.metadata_topic_spec())


class MetaUpdateTopicSpec:
_inner: _MetaUpdateTopicSpec

def __init__(self, inner: _MetaUpdateTopicSpec):
self._inner = inner

def all(self) -> typing.List[MetadataTopicSpec]:
inners = self._inner.all()
return [MetadataTopicSpec(i) for i in inners]

def changes(self) -> typing.List[MessageMetadataTopicSpec]:
inners = self._inner.changes()
return [MessageMetadataTopicSpec(i) for i in inners]

def epoch(self) -> int:
return self._inner.epoch()


class SmartModuleSpec:
_inner: _SmartModuleSpec

def __init__(self, inner: _SmartModuleSpec):
self._inner = inner

@classmethod
def new(cls, path: str):
f = open(path, mode="rb")
data = f.read()
f.close()
return cls(_SmartModuleSpec.with_binary(data))


class MetadataSmartModuleSpec:
_inner: _MetadataSmartModuleSpec

def __init__(self, inner: _MetadataSmartModuleSpec):
self._inner = inner

def name(self) -> str:
return self._inner.name()


class MessageMetadataSmartModuleSpec:
_inner: _MessageMetadataSmartModuleSpec

def __init__(self, inner: _MessageMetadataSmartModuleSpec):
self._inner = inner

def is_update(self) -> bool:
return self._inner.is_update()

def is_delete(self) -> bool:
return self._inner.is_delete()

def metadata_smart_module_spec(self) -> MetadataSmartModuleSpec:
return MetadataSmartModuleSpec(self._inner.metadata_smart_module_spec())


class MetaUpdateSmartModuleSpec:
_inner: _MetaUpdateSmartModuleSpec

def __init__(self, inner: _MetaUpdateSmartModuleSpec):
self._inner = inner

def all(self) -> typing.List[MetadataSmartModuleSpec]:
inners = self._inner.all()
return [MetadataSmartModuleSpec(i) for i in inners]

def changes(self) -> typing.List[MessageMetadataSmartModuleSpec]:
inners = self._inner.changes()
return [MessageMetadataSmartModuleSpec(i) for i in inners]

def epoch(self) -> int:
return self._inner.epoch()


class MetadataPartitionSpec:
_inner: _MetadataPartitionSpec

def __init__(self, inner: _MetadataPartitionSpec):
self._inner = inner

def name(self) -> str:
return self._inner.name()


class FluvioAdmin:
_inner: _FluvioAdmin
Expand Down
Loading

0 comments on commit 8cb0eb2

Please sign in to comment.