-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merged in DEV-11600-create-runner-for-pyconnect (pull request #4)
DEV-11600 create runner for pyconnect Approved-by: Felix Eggert <[email protected]>
- Loading branch information
Showing
27 changed files
with
779 additions
and
380 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
#!/bin/bash | ||
|
||
# flake 8 invocation | ||
python3 commithooks/pre-commit-flake.py || exit | ||
./.venv/bin/python commithooks/pre-commit-flake.py || exit |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
FROM python:3.6 | ||
|
||
RUN ["pip", "install", "pyconnect"] | ||
|
||
COPY . /app/ | ||
|
||
WORKDIR /app | ||
|
||
ENV PYCONNECT_BOOTSTRAP_SERVERS = 'broker:9092' | ||
ENV PYCONNECT_SCHEMA_REGISTRY = 'schema-registry:8082' | ||
ENV PYCONNECT_TOPICS = 'testtopic-sink' | ||
ENV PYCONNECT_SINK_DIRECTORY = '/tmp/filesink/' | ||
ENV PYCONNECT_FILENAME = 'sinkfile.json' | ||
ENV PYCONNECT_GROUP_ID = 'file_sink_test' | ||
|
||
ENTRYPOINT ["python", "file_sink.py", "--config", "env"] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
import json | ||
import logging | ||
import pathlib | ||
from typing import List, cast | ||
|
||
from confluent_kafka.cimpl import Message | ||
|
||
from pyconnect import PyConnectSink, SinkConfig | ||
from pyconnect.core import Status, message_repr | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class FileSinkConfig(SinkConfig): | ||
""" | ||
In addition to the fields from :class:`pyconnect.config.SinkConfig` this class provides the following fields: | ||
**sink_directory**: :class:`pathlib.Path` | ||
The directory where this sink shall put the file it writes all messages to. | ||
**sink_filename**: str | ||
The name of the file that this sink writes all messages to. | ||
""" | ||
__parsers = {'sink_directory': lambda p: pathlib.Path(p).absolute()} | ||
|
||
def __init__(self, conf_dict): | ||
conf_dict = conf_dict.copy() | ||
self['sink_directory'] = conf_dict.pop('sink_directory') | ||
self['sink_filename'] = conf_dict.pop('sink_filename') | ||
super().__init__(conf_dict) | ||
logger.debug(f'Configuration: {self!r}') | ||
|
||
|
||
class FileSink(PyConnectSink): | ||
""" | ||
A sink that writes all messages it receives to a single file. | ||
""" | ||
|
||
def __init__(self, config: FileSinkConfig): | ||
super().__init__(config) | ||
self._buffer: List[Message] = [] | ||
|
||
def on_message_received(self, msg: Message) -> None: | ||
logger.debug(f'Message Received: {message_repr(msg)}') | ||
self._buffer.append(msg) | ||
|
||
def on_startup(self): | ||
logger.debug(f'Creating parent directory: {self.config["sink_directory"]}') | ||
cast(pathlib.Path, self.config['sink_directory']).mkdir(parents=True, exist_ok=True) | ||
|
||
def on_flush(self) -> None: | ||
lines = [ | ||
json.dumps({'key': msg.key(), 'value': msg.value()}) + '\n' | ||
for msg in self._buffer | ||
] | ||
sinkfile = self.config['sink_directory'] / self.config['sink_filename'] | ||
logger.info(f'Writing {len(lines)} line(s) to {sinkfile}') | ||
with open(sinkfile, 'a') as outfile: | ||
outfile.writelines(lines) | ||
|
||
logger.debug('The following lines were written:') | ||
for line in lines: | ||
logger.debug(f'> {line!r}') | ||
|
||
self._buffer.clear() | ||
|
||
def on_no_message_received(self): | ||
# TODO the following should probably be two attributes like 'has_subscriptions' and 'all_partitions_at_eof' | ||
if len(self.eof_reached) > 0 and all(self.eof_reached.values()): | ||
logger.info('EOF reached, stopping.') | ||
return Status.STOPPED | ||
return None | ||
|
||
|
||
def main(): | ||
# TODO move to pyconnect.core.main(connector_cls, config_cls) | ||
import argparse | ||
|
||
parser = argparse.ArgumentParser() | ||
parser.add_argument('--config', choices=['env', 'yaml', 'json'], default='env', help='Defines where the config ' | ||
'is loaded from') | ||
parser.add_argument('--conf_file', default=None, help='When `conf` is yaml or json, then config is loaded' | ||
'from this file, default will be `./config.(yaml|json)` ' | ||
'depending on which kind of file you chose') | ||
parser.add_argument('--loglevel', choices=['NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], | ||
help='Set log level to given value, if "NOTSET" (default) no logging is active.', | ||
default='NOTSET') | ||
|
||
args = parser.parse_args() | ||
config: FileSinkConfig = None | ||
|
||
if args.loglevel != 'NOTSET': | ||
base_logger = logging.getLogger() | ||
loglevel = getattr(logging, args.loglevel) | ||
|
||
formatter = logging.Formatter('%(levelname)-8s - %(name)-12s - %(message)s') | ||
|
||
stream_handler = logging.StreamHandler() | ||
stream_handler.setLevel(loglevel) | ||
stream_handler.setFormatter(formatter) | ||
|
||
base_logger.setLevel(loglevel) | ||
base_logger.addHandler(stream_handler) | ||
|
||
if args.config == 'env': | ||
config = FileSinkConfig.from_env_variables() | ||
elif args.config == 'yaml': | ||
config = FileSinkConfig.from_yaml_file(args.conf_file or ('./config.' + args.config)) | ||
elif args.config == 'json': | ||
config = FileSinkConfig.from_json_file(args.conf_file or ('./config.' + args.config)) | ||
else: | ||
print('Illegal Argument for --config!') | ||
parser.print_help() | ||
exit(1) | ||
|
||
sink = FileSink(config) | ||
sink.run() | ||
|
||
|
||
if __name__ == '__main__': | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
FROM python:3.6 | ||
|
||
RUN 'pip install pyconnect' | ||
|
||
COPY . /app/ | ||
|
||
WORKDIR /app | ||
|
||
ENV PYCONNECT_BOOTSTRAP_SERVERS = 'broker:9092' | ||
ENV PYCONNECT_SCHEMA_REGISTRY = 'schema-registry:8082' | ||
ENV PYCONNECT_TOPIC = 'testtopic-source' | ||
ENV PYCONNECT_OFFSET_TOPIC = 'testtopic-source-offset' | ||
ENV PYCONNECT_SOURCE_DIRECTORY = '/tmp/filesource/', | ||
ENV PYCONNECT_SOURCE_FILENAME = 'sourcefile.json' | ||
|
||
ENTRYPOINT ['python', 'file_source.py', '--config', 'env'] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
import json | ||
import logging | ||
import pathlib | ||
from typing import Any, Optional, TextIO, Tuple | ||
|
||
from pyconnect import PyConnectSource, SourceConfig | ||
from pyconnect.core import Status | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class FileSourceConfig(SourceConfig): | ||
""" | ||
In addition to the fields from :class:`pyconnect.config.SourceConfig` this class provides the following fields: | ||
**source_directory**: :class:`pathlib.Path` | ||
The directory where this source looks for the file it reads all messages from. | ||
**source_filename**: str | ||
The name of the file that this source reads messages from. | ||
The file should contain lines of json objects like `{'key': Any, 'value': Any}` | ||
""" | ||
|
||
__parsers = {'source_directory': lambda p: pathlib.Path(p).absolute()} | ||
|
||
def __init__(self, conf_dict): | ||
conf_dict = conf_dict.copy() | ||
self['source_directory'] = conf_dict.pop('source_directory') | ||
self['source_filename'] = conf_dict.pop('source_filename') | ||
super().__init__(conf_dict) | ||
logger.debug(f'Configuration: {self!r}') | ||
|
||
|
||
class FileSource(PyConnectSource): | ||
""" | ||
A source that reads and publishes json objects from a file. | ||
""" | ||
def __init__(self, config: FileSourceConfig): | ||
super().__init__(config) | ||
self._file: Optional[TextIO] = None | ||
|
||
def on_startup(self): | ||
source_path = self.config['source_directory'] / self.config['source_filename'] | ||
logger.info(f'Opening file "{source_path}" for reading.') | ||
self._file = open(source_path, 'r') | ||
|
||
def seek(self, index: int) -> None: | ||
logger.info(f'Seeking to position: {index!r}') | ||
self._file.seek(index) | ||
|
||
def read(self) -> Tuple[Any, Any]: | ||
line = next(self._file) | ||
logger.debug(f'Read line: {line!r}') | ||
record = json.loads(line) | ||
return record['key'], record['value'] | ||
|
||
def on_eof(self) -> Status: | ||
logger.info('EOF reached, stopping.') | ||
return Status.STOPPED | ||
|
||
def get_index(self) -> int: | ||
index = self._file.tell() | ||
logger.debug(f'File object is at position: {index!r}') | ||
return index | ||
|
||
def close(self): | ||
try: | ||
super().close() | ||
finally: | ||
logger.info('Closing file object.') | ||
self._file.close() | ||
|
||
|
||
def main(): | ||
import argparse | ||
|
||
parser = argparse.ArgumentParser() | ||
parser.add_argument('--config', choices=['env', 'yaml', 'json'], default='env', help='Defines where the config ' | ||
'is loaded from') | ||
parser.add_argument('--conf_file', default=None, help='When `conf` is yaml or json, then config is loaded' | ||
'from this file, default will be `./config.(yaml|json)` ' | ||
'depending on which kind of file you chose') | ||
parser.add_argument('--loglevel', choices=['NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], | ||
help='Set log level to given value, if "NOTSET" (default) no logging is active.', | ||
default='NOTSET') | ||
|
||
args = parser.parse_args() | ||
config: FileSourceConfig = None | ||
|
||
if args.loglevel != 'NOTSET': | ||
base_logger = logging.getLogger() | ||
loglevel = getattr(logging, args.loglevel) | ||
|
||
formatter = logging.Formatter('%(levelname)-8s - %(name)-12s - %(message)s') | ||
|
||
stream_handler = logging.StreamHandler() | ||
stream_handler.setLevel(loglevel) | ||
stream_handler.setFormatter(formatter) | ||
|
||
base_logger.setLevel(loglevel) | ||
base_logger.addHandler(stream_handler) | ||
|
||
if args.config == 'env': | ||
config = FileSourceConfig.from_env_variables() | ||
elif args.config == 'yaml': | ||
config = FileSourceConfig.from_yaml_file(args.conf_file or ('./config.' + args.config)) | ||
elif args.config == 'json': | ||
config = FileSourceConfig.from_json_file(args.conf_file or ('./config.' + args.config)) | ||
else: | ||
print('Illegal Argument for --config!') | ||
parser.print_help() | ||
exit(1) | ||
|
||
source = FileSource(config) | ||
source.run() | ||
|
||
|
||
if __name__ == '__main__': | ||
main() |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.