From bb58d292956906ae9012166868fbbba9d679a5d3 Mon Sep 17 00:00:00 2001 From: Elijah Wilson Date: Mon, 27 Nov 2017 11:54:15 -0800 Subject: [PATCH] Franz library init --- .gitignore | 107 ++++++++++++++++ LICENSE | 21 +++ Makefile | 20 +++ README.md | 79 ++++++++++++ conf.py | 169 +++++++++++++++++++++++++ examples/example_rabbitmq_consumer.py | 9 ++ examples/example_rabbitmq_producer.py | 16 +++ franz/__init__.py | 12 ++ franz/base.py | 40 ++++++ franz/event.py | 16 +++ franz/exceptions.py | 10 ++ franz/kafka/__init__.py | 3 + franz/kafka/consumer.py | 46 +++++++ franz/kafka/producer.py | 68 ++++++++++ franz/rabbitmq/__init__.py | 7 + franz/rabbitmq/consumer.py | 106 ++++++++++++++++ franz/rabbitmq/parameters.py | 33 +++++ franz/rabbitmq/producer.py | 70 ++++++++++ index.rst | 28 ++++ profiling/__init__.py | 0 profiling/kafka/__init__.py | 0 profiling/kafka/consume_messages.py | 30 +++++ profiling/kafka/produce_messages.py | 26 ++++ profiling/rabbitmq/__init__.py | 0 profiling/rabbitmq/consume_messages.py | 54 ++++++++ profiling/rabbitmq/produce_messages.py | 42 ++++++ profiling/utils.py | 35 +++++ publish.sh | 10 ++ requirements.txt | 6 + setup.py | 43 +++++++ tests/test_consumer.py | 37 ++++++ tests/test_producer.py | 48 +++++++ 32 files changed, 1191 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 Makefile create mode 100644 README.md create mode 100644 conf.py create mode 100644 examples/example_rabbitmq_consumer.py create mode 100644 examples/example_rabbitmq_producer.py create mode 100644 franz/__init__.py create mode 100644 franz/base.py create mode 100644 franz/event.py create mode 100644 franz/exceptions.py create mode 100644 franz/kafka/__init__.py create mode 100644 franz/kafka/consumer.py create mode 100644 franz/kafka/producer.py create mode 100644 franz/rabbitmq/__init__.py create mode 100644 franz/rabbitmq/consumer.py create mode 100644 franz/rabbitmq/parameters.py create mode 100644 franz/rabbitmq/producer.py create mode 100644 index.rst create mode 100644 profiling/__init__.py create mode 100644 profiling/kafka/__init__.py create mode 100644 profiling/kafka/consume_messages.py create mode 100644 profiling/kafka/produce_messages.py create mode 100644 profiling/rabbitmq/__init__.py create mode 100644 profiling/rabbitmq/consume_messages.py create mode 100644 profiling/rabbitmq/produce_messages.py create mode 100644 profiling/utils.py create mode 100755 publish.sh create mode 100644 requirements.txt create mode 100644 setup.py create mode 100644 tests/test_consumer.py create mode 100644 tests/test_producer.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8ebb541 --- /dev/null +++ b/.gitignore @@ -0,0 +1,107 @@ +# Created by .ignore support plugin (hsz.mobi) +### Python template +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +.static_storage/ +.media/ +local_settings.py + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ +_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..72dbf10 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2017 Carta, Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..6a0006e --- /dev/null +++ b/Makefile @@ -0,0 +1,20 @@ +# Minimal makefile for Sphinx documentation +# + +# You can set these variables from the command line. +SPHINXOPTS = +SPHINXBUILD = sphinx-build +SPHINXPROJ = franz +SOURCEDIR = . +BUILDDIR = _build + +# Put it first so that "make" without argument is like "make help". +help: + @$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) + +.PHONY: help Makefile + +# Catch-all target: route all unknown targets to Sphinx using the new +# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS). +%: Makefile + @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) diff --git a/README.md b/README.md new file mode 100644 index 0000000..c5b01aa --- /dev/null +++ b/README.md @@ -0,0 +1,79 @@ +# Franz + +A lite wrapper around [Kafka](https://kafka.apache.org/) and [RabbitMQ](https://www.rabbitmq.com/). + +# Usage + +## Installation +- `pip -e git+git@github.com:eshares/franz.git@master#egg=franz` + - Change `@master` to a version or commit if required. + +## RabbitMQ +### Sending a message +```python +import random +import time + +import franz + + +class FranzData(franz.FranzEvent): + def serialize(self): + return {'data': time.time()} + + +with franz.RabbitProducer(exchange='topic_link') as p: + while True: + key = random.choice(['hello.world', 'hello.bob']) + p.send_message(key, FranzData()) + time.sleep(1) +``` + +### Consuming messages +```python +import franz + +def callback(ch, method, properties, body): + print('[+] {} from {}'.format(body, method.routing_key)) + +with franz.RabbitConsumer('hello.*', exchange='topic_link') as c: + c.consume_messages(callback) +``` + + +## Kafka +### Sending a message +```python +import franz +from myapp.models import SomeModel # SomeModel must inherit `franz.FranzEvent` + +instance = SomeModel.objects.get(pk=1) +producer = franz.KafkaProducer() +producer.send_message('TopicA', instance) +``` + +### Consuming messages +```python +import franz + +consumer = franz.KafkaConsumer('TopicA') +for message in consumer: + print(message.value) +``` + +### Kafka/Docker Resources + +- [Docker image](https://github.com/spotify/docker-kafka) +- [Helpful article](https://howtoprogram.xyz/2016/07/21/using-apache-kafka-docker/) +- Create topic +``` +./kafka-topics.sh --create --topic test --replication-factor 1 --partitions 1 --zookeeper 0.0.0.0:2181 +``` +- Consuming +``` +./kafka-console-consumer.sh --topic test --from-beginning --zookeeper 0.0.0.0:2181 +``` +- Producing +``` +./kafka-console-producer.sh --topic test --broker-list 0.0.0.0:9092 +``` diff --git a/conf.py b/conf.py new file mode 100644 index 0000000..c8148af --- /dev/null +++ b/conf.py @@ -0,0 +1,169 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# franz documentation build configuration file, created by +# sphinx-quickstart on Mon Nov 6 15:43:02 2017. +# +# This file is execfile()d with the current directory set to its +# containing dir. +# +# Note that not all possible configuration values are present in this +# autogenerated file. +# +# All configuration values have a default; values that are commented out +# serve to show the default. + +# If extensions (or modules to document with autodoc) are in another directory, +# add these directories to sys.path here. If the directory is relative to the +# documentation root, use os.path.abspath to make it absolute, like shown here. +# +# import os +# import sys +# sys.path.insert(0, os.path.abspath('.')) + + +# -- General configuration ------------------------------------------------ + +# If your documentation needs a minimal Sphinx version, state it here. +# +# needs_sphinx = '1.0' + +# Add any Sphinx extension module names here, as strings. They can be +# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom +# ones. +extensions = ['sphinx.ext.autodoc', 'sphinx.ext.autosummary', 'numpydoc'] + +# Add any paths that contain templates here, relative to this directory. +templates_path = ['_templates'] + +# The suffix(es) of source filenames. +# You can specify multiple suffix as a list of string: +# +# source_suffix = ['.rst', '.md'] +source_suffix = '.rst' + +# The master toctree document. +master_doc = 'index' + +# General information about the project. +project = 'franz' +copyright = '2017, Eli Wilson' +author = 'Eli Wilson' + +# The version info for the project you're documenting, acts as replacement for +# |version| and |release|, also used in various other places throughout the +# built documents. +# +# The short X.Y version. +version = '0.0.1' +# The full version, including alpha/beta/rc tags. +release = '0.0.1' + +# The language for content autogenerated by Sphinx. Refer to documentation +# for a list of supported languages. +# +# This is also used if you do content translation via gettext catalogs. +# Usually you set "language" from the command line for these cases. +language = None + +# List of patterns, relative to source directory, that match files and +# directories to ignore when looking for source files. +# This patterns also effect to html_static_path and html_extra_path +exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store'] + +# The name of the Pygments (syntax highlighting) style to use. +pygments_style = 'sphinx' + +# If true, `todo` and `todoList` produce output, else they produce nothing. +todo_include_todos = False + + +# -- Options for HTML output ---------------------------------------------- + +# The theme to use for HTML and HTML Help pages. See the documentation for +# a list of builtin themes. +# +html_theme = 'alabaster' + +# Theme options are theme-specific and customize the look and feel of a theme +# further. For a list of options available for each theme, see the +# documentation. +# +# html_theme_options = {} + +# Add any paths that contain custom static files (such as style sheets) here, +# relative to this directory. They are copied after the builtin static files, +# so a file named "default.css" will overwrite the builtin "default.css". +html_static_path = ['_static'] + +# Custom sidebar templates, must be a dictionary that maps document names +# to template names. +# +# This is required for the alabaster theme +# refs: http://alabaster.readthedocs.io/en/latest/installation.html#sidebars +html_sidebars = { + '**': [ + 'relations.html', # needs 'show_related': True theme option to display + 'searchbox.html', + ] +} + + +# -- Options for HTMLHelp output ------------------------------------------ + +# Output file base name for HTML help builder. +htmlhelp_basename = 'franzdoc' + + +# -- Options for LaTeX output --------------------------------------------- + +latex_elements = { + # The paper size ('letterpaper' or 'a4paper'). + # + # 'papersize': 'letterpaper', + + # The font size ('10pt', '11pt' or '12pt'). + # + # 'pointsize': '10pt', + + # Additional stuff for the LaTeX preamble. + # + # 'preamble': '', + + # Latex figure (float) alignment + # + # 'figure_align': 'htbp', +} + +# Grouping the document tree into LaTeX files. List of tuples +# (source start file, target name, title, +# author, documentclass [howto, manual, or own class]). +latex_documents = [ + (master_doc, 'franz.tex', 'franz Documentation', + 'Eli Wilson', 'manual'), +] + + +# -- Options for manual page output --------------------------------------- + +# One entry per manual page. List of tuples +# (source start file, name, description, authors, manual section). +man_pages = [ + (master_doc, 'franz', 'franz Documentation', + [author], 1) +] + + +# -- Options for Texinfo output ------------------------------------------- + +# Grouping the document tree into Texinfo files. List of tuples +# (source start file, target name, title, author, +# dir menu entry, description, category) +texinfo_documents = [ + (master_doc, 'franz', 'franz Documentation', + author, 'franz', 'One line description of project.', + 'Miscellaneous'), +] + +# -- Options for Numpydoc -------------------------------------------------- +numpydoc_show_class_members = False diff --git a/examples/example_rabbitmq_consumer.py b/examples/example_rabbitmq_consumer.py new file mode 100644 index 0000000..25fa913 --- /dev/null +++ b/examples/example_rabbitmq_consumer.py @@ -0,0 +1,9 @@ +import franz + + +def callback(ch, method, properties, body): + print('[+] {} from {}'.format(body, method.routing_key)) + + +with franz.RabbitConsumer('hello.*', exchange='topic_link') as c: + c.consume_messages(callback) diff --git a/examples/example_rabbitmq_producer.py b/examples/example_rabbitmq_producer.py new file mode 100644 index 0000000..9acbd5b --- /dev/null +++ b/examples/example_rabbitmq_producer.py @@ -0,0 +1,16 @@ +import random +import time + +import franz + + +class FranzData(franz.FranzEvent): + def serialize(self): + return {'data': time.time()} + + +with franz.RabbitProducer(exchange='topic_link') as p: + while True: + key = random.choice(['hello.world', 'hello.bob']) + p.send_message(key, FranzData()) + time.sleep(1) diff --git a/franz/__init__.py b/franz/__init__.py new file mode 100644 index 0000000..7f77a53 --- /dev/null +++ b/franz/__init__.py @@ -0,0 +1,12 @@ +# flake8: noqa +__version__ = "0.0.1" + +from .event import FranzEvent +from .exceptions import InvalidMessage, SerializationError +from .kafka import KafkaConsumer, KafkaProducer +from .rabbitmq import ( + RabbitConnectionParameters, + RabbitConsumer, + RabbitConsumerType, + RabbitProducer, +) diff --git a/franz/base.py b/franz/base.py new file mode 100644 index 0000000..8291722 --- /dev/null +++ b/franz/base.py @@ -0,0 +1,40 @@ +import bson + +from franz import FranzEvent, InvalidMessage + + +class BaseProducer: + def send_message(self, topic, message): + self._check_message_is_valid_event(message) + self._send_message_to_topic(topic, message) + + def _send_message_to_topic(self, topic, message): + raise NotImplementedError( + "_send_message_to_topic should be overridden in subclasses." + ) + + @staticmethod + def serialize_message(message): + """ + Serializes an object. It must subclass :class:`FranzEvent`. + + Parameters + ---------- + message : FranzEvent + The object to be serialized. + + Returns + ------- + bytes + """ + return bson.dumps(message.serialize()) + + @staticmethod + def _check_message_is_valid_event(message): + if not isinstance(message, FranzEvent): + raise InvalidMessage( + "{} is not a valid message. " + "Must subclass `franz.FranzEvent`.".format( + type(message), + ) + ) diff --git a/franz/event.py b/franz/event.py new file mode 100644 index 0000000..133d9d3 --- /dev/null +++ b/franz/event.py @@ -0,0 +1,16 @@ +class FranzEvent: + """ + Classes that want to be sent as messages to Kafka + should inherit from this class. + """ + + def serialize(self): + """ + Converts the current python object to one that bson can serialize. + Complex objects should define their own `.serialize()` method. + + Returns + ------- + dict + """ + return self.__dict__.copy() diff --git a/franz/exceptions.py b/franz/exceptions.py new file mode 100644 index 0000000..b58bed9 --- /dev/null +++ b/franz/exceptions.py @@ -0,0 +1,10 @@ +class BaseFranzException(Exception): + pass + + +class InvalidMessage(BaseFranzException): + pass + + +class SerializationError(BaseFranzException): + pass diff --git a/franz/kafka/__init__.py b/franz/kafka/__init__.py new file mode 100644 index 0000000..9eebdf2 --- /dev/null +++ b/franz/kafka/__init__.py @@ -0,0 +1,3 @@ +# flake8: noqa +from .consumer import Consumer as KafkaConsumer +from .producer import Producer as KafkaProducer diff --git a/franz/kafka/consumer.py b/franz/kafka/consumer.py new file mode 100644 index 0000000..014d429 --- /dev/null +++ b/franz/kafka/consumer.py @@ -0,0 +1,46 @@ +import bson + +from kafka import KafkaConsumer + + +class Consumer: + """ + A simple wrapper around KafkaConsumer. + + Examples + -------- + >>> from franz import KafkaConsumer + >>> consumer = KafkaConsumer('TopicA', 'Topic-*') + >>> for message in consumer: + >>> print(message.value) + """ + def __init__(self, *topics, server='localhost'): + """ + Parameters + ---------- + topics : tuple + String arguments that are exact topic names like 'MyTopic' or + pattern names like 'MyTopic-*' + """ + self.topics = topics + self.consumer = KafkaConsumer( + value_deserializer=bson.loads, + bootstrap_servers=server, + ) + self.consumer.subscribe(pattern=self._make_topic_regex()) + + def __iter__(self): + return self.consumer.__iter__() + + def _make_topic_regex(self): + """ + Makes an OR regex for all topics passed in. KafkaConsumer doesn't allow + for patterns and regular topics by default so this allows support for + both. + + Returns + ------- + str + The OR'd regex for all topics. + """ + return '|'.join("({})".format(topic) for topic in self.topics) diff --git a/franz/kafka/producer.py b/franz/kafka/producer.py new file mode 100644 index 0000000..f10b543 --- /dev/null +++ b/franz/kafka/producer.py @@ -0,0 +1,68 @@ +from kafka import KafkaProducer +from kafka.producer.future import FutureRecordMetadata + +from franz import FranzEvent, base + + +class Producer(base.BaseProducer): + """ + A simple wrapper around kafka.KafkaProducer. + + Examples + -------- + >>> from franz import KafkaProducer + >>> from myapp.models import SomeModel + >>> instance = SomeModel.objects.get(pk=1) + >>> p = KafkaProducer() + >>> p.send_message('test', instance) + """ + + def __init__(self, host='localhost', port=None): + self.host = host + self.port = port + + self.producer = KafkaProducer( + bootstrap_servers=self.connection_string, + value_serializer=Producer.serialize_message, + ) + + @property + def connection_string(self): + return '{host}{port}'.format( + host=self.host, + port=':{}'.format(self.port) if self.port else '', + ) + + def _send_message_to_topic(self, topic, message): + """ + Send a message to a Kafka topic. + + Parameters + ---------- + topic : str + The kafka topic where the message should be sent to. + message : FranzEvent + The message to be sent. + + Raises + ------ + franz.InvalidMessage + """ + message_result = self.producer.send(topic, message) + self.check_for_message_exception(message_result) + return message_result + + @classmethod + def check_for_message_exception(cls, message_result): + """ + Makes sure there isn't an error when sending the message. + Kafka will silently catch exceptions and not bubble them up. + + Parameters + ---------- + message_result : FutureRecordMetadata + """ + exception = message_result.exception + + if exception: + raise exception diff --git a/franz/rabbitmq/__init__.py b/franz/rabbitmq/__init__.py new file mode 100644 index 0000000..9c1cfdc --- /dev/null +++ b/franz/rabbitmq/__init__.py @@ -0,0 +1,7 @@ +# flake8: noqa +from .consumer import ( + Consumer as RabbitConsumer, + ConsumerType as RabbitConsumerType, +) +from .parameters import RabbitConnectionParameters +from .producer import Producer as RabbitProducer diff --git a/franz/rabbitmq/consumer.py b/franz/rabbitmq/consumer.py new file mode 100644 index 0000000..c3d9026 --- /dev/null +++ b/franz/rabbitmq/consumer.py @@ -0,0 +1,106 @@ +import bson +import pika + +from .parameters import RabbitConnectionParameters + + +class ConsumerType: + # User constants + SINGLE_PROCESS = 'single' # Single consumer + MULTIPLE_FAST_PROCESSES = 'multiple-fast' # Multiple fast consumers + MULTIPLE_SLOW_PROCESSES = 'multiple-slow' # Multiple slow consumers + + # Prefetch values + _SINGLE_FAST_CONSUMER = 0 + _MULTIPLE_FAST_CONSUMERS = 20 + _MULTIPLE_SLOW_CONSUMERS = 1 + + SPEED_TO_PREFETCH = { + SINGLE_PROCESS: _SINGLE_FAST_CONSUMER, + MULTIPLE_FAST_PROCESSES: _MULTIPLE_FAST_CONSUMERS, + MULTIPLE_SLOW_PROCESSES: _MULTIPLE_SLOW_CONSUMERS, + } + + +class Consumer: + """ + A simple consumer wrapper for pika/RabbitMQ. + + Examples + -------- + >>> from franz import RabbitConsumer + >>> def callback(ch, method, properties, body): + >>> print(body) + >>> with RabbitConsumer('myapp.logs.critical') as consumer: + >>> consumer.consume_messages(callback) + """ + + def __init__(self, *topics, exchange='', queue='', parameters=None, + consumer_type=ConsumerType.SINGLE_PROCESS): + """ + Parameters + ---------- + topics : tuple + The topics to consume from. + exchange : str + The RabbitMQ exchange to consume messages from. + queue : str + The RabbitMQ queue to consume messages from. + parameters : RabbitConnectionParameters + The parameters to connect to the RabbitMQ server. + consumer_type : str + The type of consumer. A single consumer, multiple fast consumers, + multiple slow consumers. See ConsumerType constants. + """ + self._topics = topics + self._exchange = exchange + self._queue_name = queue + self._parameters = parameters + + self._connection = pika.BlockingConnection(self.parameters) + self._channel = self._connection.channel() + self._channel.basic_qos( + prefetch_count=ConsumerType.SPEED_TO_PREFETCH[consumer_type], + ) + + self._channel.exchange_declare( + exchange=self._exchange, + exchange_type='topic', + durable=True, + ) + + self._channel.queue_declare( + queue=self._queue_name, + durable=True, + ) + + for topic in self._topics: + self._channel.queue_bind( + exchange=self._exchange, + queue=self._queue_name, + routing_key=topic, + ) + + @property + def parameters(self): + if self._parameters is None: + self._parameters = RabbitConnectionParameters() + return self._parameters.connection_parameters + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self._channel.stop_consuming() + self._connection.close() + + def consume_messages(self, consumer_callback): + def callback_wrapper(ch, method, properties, body): + consumer_callback(ch, method, properties, bson.loads(body)) + ch.basic_ack(delivery_tag=method.delivery_tag) + + self._channel.basic_consume( + callback_wrapper, + queue=self._queue_name, + ) + self._channel.start_consuming() diff --git a/franz/rabbitmq/parameters.py b/franz/rabbitmq/parameters.py new file mode 100644 index 0000000..c37d6a9 --- /dev/null +++ b/franz/rabbitmq/parameters.py @@ -0,0 +1,33 @@ +import pika + + +class RabbitConnectionParameters: + _DEFAULT = pika.ConnectionParameters._DEFAULT + + def __init__( + self, host='localhost', port=5672, virtual_host='/', + username=None, password=None, + ): + self.host = host + self.port = port + self.virtual_host = virtual_host + self.username = username + self.password = password + + @property + def connection_parameters(self): + return pika.ConnectionParameters( + host=self.host, + port=self.port, + credentials=self.credentials, + virtual_host=self.virtual_host, + ) + + @property + def credentials(self): + if self.username and self.password: + return pika.PlainCredentials( + username=self.username, + password=self.password, + ) + return self._DEFAULT diff --git a/franz/rabbitmq/producer.py b/franz/rabbitmq/producer.py new file mode 100644 index 0000000..ae98030 --- /dev/null +++ b/franz/rabbitmq/producer.py @@ -0,0 +1,70 @@ +import pika + +from franz import base +from .parameters import RabbitConnectionParameters + + +class Producer(base.BaseProducer): + """ + A simple producer wrapper for pika/RabbitMQ. + + Examples + -------- + >>> from franz import RabbitProducer + >>> from myapp.models import SomeModel + >>> instance = SomeModel.objects.get(pk=1) + >>> with RabbitProducer(exchange='logs') as p: + >>> p.send_message('myapp.logs.critical', instance) + """ + + PERSISTENT_DELIVERY_MODE = 2 + + def __init__(self, parameters=None, exchange=''): + self._parameters = parameters + self._exchange = exchange + self._connection = pika.BlockingConnection(self.parameters) + + self._channel = self._connection.channel() + self._properties = pika.BasicProperties( + delivery_mode=self.PERSISTENT_DELIVERY_MODE, + ) + + self._channel.exchange_declare( + exchange=self._exchange, + exchange_type='topic', + durable=True, + ) + + @property + def parameters(self): + if self._parameters is None: + self._parameters = RabbitConnectionParameters() + return self._parameters.connection_parameters + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self._connection.close() + + def _send_message_to_topic(self, topic, message): + """ + Send a message to RabbitMQ based on the routing key (topic). + + Parameters + ---------- + topic : str + The routing key (topic) where the message should be sent to. + message : FranzEvent + The message to be sent. + + Raises + ------ + franz.InvalidMessage + """ + self._channel.basic_publish( + self._exchange, + topic, + self.serialize_message(message), + properties=self._properties, + ) diff --git a/index.rst b/index.rst new file mode 100644 index 0000000..d815767 --- /dev/null +++ b/index.rst @@ -0,0 +1,28 @@ +.. franz documentation master file, created by + sphinx-quickstart on Mon Nov 6 15:43:02 2017. + You can adapt this file completely to your liking, but it should at least + contain the root `toctree` directive. + +Welcome to franz's documentation! +================================= + +.. toctree:: + :maxdepth: 2 + :caption: Contents: + +.. autoclass:: franz.Consumer + :members: + +.. autoclass:: franz.Producer + :members: + +.. autoclass:: franz.FranzEvent + :members: + + +Indices and tables +================== + +* :ref:`genindex` +* :ref:`modindex` +* :ref:`search` diff --git a/profiling/__init__.py b/profiling/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/profiling/kafka/__init__.py b/profiling/kafka/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/profiling/kafka/consume_messages.py b/profiling/kafka/consume_messages.py new file mode 100644 index 0000000..c6022ed --- /dev/null +++ b/profiling/kafka/consume_messages.py @@ -0,0 +1,30 @@ +import argparse + +import franz + +from profiling.utils import timeit + +if __name__ == '__main__': + + @timeit + def consume_messages(): + consumer = franz.KafkaConsumer('test') + parser = argparse.ArgumentParser( + description='Consumes franz events from kafka stream' + ) + parser.add_argument( + '--messages', '-m', metavar='N', + type=int, default=1000 + ) + args = parser.parse_args() + count = 0 + try: + for messages in consumer: + print('recvd') + count += 1 + if count == args.messages: + raise Exception("Stopping to wait for more messages.") + except Exception: + pass + + consume_messages() diff --git a/profiling/kafka/produce_messages.py b/profiling/kafka/produce_messages.py new file mode 100644 index 0000000..e841d42 --- /dev/null +++ b/profiling/kafka/produce_messages.py @@ -0,0 +1,26 @@ +import argparse + +import franz +from profiling.utils import timeit, Message + + +if __name__ == '__main__': + producer = franz.KafkaProducer() + parser = argparse.ArgumentParser( + description='Generates franz events into kafka stream' + ) + parser.add_argument('--size', '-s', metavar='X', type=int, default=256) + parser.add_argument('--fields', '-f', metavar='N', type=int, default=5) + parser.add_argument('--messages', '-m', metavar='M', type=int, default=1000) + + args = parser.parse_args() + + message = Message(args.size, args.fields) + + @timeit + def make_messages(): + for _ in range(args.messages): + producer.send_message('test', message) + + print('sending {:,} messages.....'.format(args.messages)) + make_messages() diff --git a/profiling/rabbitmq/__init__.py b/profiling/rabbitmq/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/profiling/rabbitmq/consume_messages.py b/profiling/rabbitmq/consume_messages.py new file mode 100644 index 0000000..848c67c --- /dev/null +++ b/profiling/rabbitmq/consume_messages.py @@ -0,0 +1,54 @@ +import argparse + +import franz + +from profiling.utils import timeit + +if __name__ == '__main__': + count = 0 + + @timeit + def consume_messages(): + parser = argparse.ArgumentParser( + description='Consumes franz events from kafka stream' + ) + parser.add_argument( + '--messages', '-m', metavar='N', + type=int, default=500 + ) + parser.add_argument('--host', metavar='H', type=str, + default='localhost') + parser.add_argument('--username', '-u', metavar='U', type=str, + default=None) + parser.add_argument('--password', '-p', metavar='P', type=str, + default=None) + parser.add_argument('--vhost', '-v', metavar='P', type=str, + default='/') + parser.add_argument('--exchange', '-e', metavar='E', type=str, + default='') + args = parser.parse_args() + + params = franz.RabbitConnectionParameters( + host=args.host, + virtual_host=args.vhost, + username=args.username, + password=args.password, + ) + + def callback(ch, method, properties, body): + print('recvd message') + + global count + if args.messages == count: + raise Exception('done') + count += 1 + + with franz.RabbitConsumer( + 'test', + exchange=args.exchange, + parameters=params, + queue='slack', + ) as c: + c.consume_messages(callback) + + consume_messages() diff --git a/profiling/rabbitmq/produce_messages.py b/profiling/rabbitmq/produce_messages.py new file mode 100644 index 0000000..54beda0 --- /dev/null +++ b/profiling/rabbitmq/produce_messages.py @@ -0,0 +1,42 @@ +import argparse + +import franz +from profiling.utils import timeit, Message + + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description='Generates franz events into rabbitmq stream' + ) + parser.add_argument('--size', '-s', metavar='X', type=int, default=256) + parser.add_argument('--fields', '-f', metavar='N', type=int, default=5) + parser.add_argument('--messages', '-m', metavar='M', type=int, default=1000) + + parser.add_argument('--host', metavar='H', type=str, default='localhost') + parser.add_argument('--username', '-u', metavar='U', type=str, default=None) + parser.add_argument('--password', '-p', metavar='P', type=str, default=None) + parser.add_argument('--vhost', '-v', metavar='P', type=str, default='/') + parser.add_argument('--exchange', '-e', metavar='E', type=str, default='') + + args = parser.parse_args() + + params = franz.RabbitConnectionParameters( + host=args.host, + virtual_host=args.vhost, + username=args.username, + password=args.password, + ) + + message = Message(args.size, args.fields) + + @timeit + def make_messages(): + with franz.RabbitProducer( + parameters=params, + exchange=args.exchange + ) as producer: + for _ in range(args.messages): + producer.send_message('test', message) + + print('sending {:,} messages.....'.format(args.messages)) + make_messages() diff --git a/profiling/utils.py b/profiling/utils.py new file mode 100644 index 0000000..b58dacb --- /dev/null +++ b/profiling/utils.py @@ -0,0 +1,35 @@ +from datetime import datetime + +import faker + +import franz + + +def timeit(func): + def _wrap(*args, **kwargs): + start = datetime.now() + ret = func(*args, **kwargs) + elapsed = datetime.now() - start + print("Call {}({}, {}) took {}".format( + func.__name__, args, kwargs, elapsed + )) + return ret + return _wrap + + +class Message(franz.FranzEvent): + fake = faker.Faker() + + def __init__(self, size=256, fields=1): + self.size = size or 1 + self.fields = fields or 0 + self.data = { + self.fake.first_name(): self.fake.name() + for _ in range(self.fields) + } + self.data.update({ + 'bin': self.fake.binary(length=self.size), + }) + + def serialize(self): + return self.data diff --git a/publish.sh b/publish.sh new file mode 100755 index 0000000..cd78856 --- /dev/null +++ b/publish.sh @@ -0,0 +1,10 @@ +#!/usr/bin/env bash +set -euf -o pipefail + +echo "==================== UPLOADING TO PYPITEST (https://testpypi.python.org/pypi) ====================" +python setup.py sdist upload -r pypitest + + echo "==================== UPLOADING TO PYPI (https://pypi.python.org/pypi) ============================" + python setup.py sdist upload -r pypi + +echo "~ * ~ * ~ D O N E ~ * ~ * ~" diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..92923c6 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +kafka-python==1.3.5 +pytest==3.2.1 +bson==0.5.0 +Sphinx==1.6.5 +Faker==0.8.6 +pika==0.11.0 diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..53b0351 --- /dev/null +++ b/setup.py @@ -0,0 +1,43 @@ +import os +from setuptools import setup + +from franz import __version__ + + +def read(fname): + return open(os.path.join(os.path.dirname(__file__), fname)).read() + + +setup( + name="franz", + version=__version__, + url="https://github.com/eshares/franz", + download_url="https://github.com/eshares/franz/tarball/{version}".format( + version=__version__, + ), + author="Carta, Inc.", + author_email="engineering@carta.com", + description=( + "Event broker built on top of kafka and rabbitmq; used to handle micro" + " services message exchange." + ), + long_description=read('README.md'), + license="MIT", + keywords="microservices broker event kafka rabbitmq", + install_requires=[ + "kafka-python==1.3.5", + "bson==0.5.0", + "pika==0.11.0", + ], + packages=[ + "franz", + ], + classifiers=[ + 'Intended Audience :: Developers', + 'Topic :: Software Development :: Libraries', + 'License :: OSI Approved :: MIT License', + 'Programming Language :: Python :: 3', + 'Programming Language :: Python :: 3.3', + 'Programming Language :: Python :: 3.4', + ], +) diff --git a/tests/test_consumer.py b/tests/test_consumer.py new file mode 100644 index 0000000..8b1f67c --- /dev/null +++ b/tests/test_consumer.py @@ -0,0 +1,37 @@ +import re +from unittest import TestCase +from kafka import KafkaConsumer + +from franz import Consumer + + +class TestConsumer(TestCase): + + def setUp(self): + self.topics = ['ResponseEvent', 'ModelChange-Certificate'] + self.consumer = Consumer(*self.topics) + + def test_topic_subscription(self): + inner_consumer = self.consumer.consumer + + self.assertIsNotNone(inner_consumer) + self.assertIsInstance(inner_consumer, KafkaConsumer) + + def test_make_topic_regex(self): + assert self.consumer._make_topic_regex() == r'(ResponseEvent)|(ModelChange-Certificate)' + + def test_complicated_topic_regex(self): + consumer = Consumer('ModelChange-*', 'ResponseEvent') + assert consumer._make_topic_regex() == r'(ModelChange-*)|(ResponseEvent)' + + def test_complicated_topic_regex_matches(self): + consumer = Consumer('ModelChange-*') + pattern = re.compile(consumer._make_topic_regex()) + assert pattern.match('ModelChange-Certificate') is not None + assert pattern.match('ModelChange-OptionGrant') is not None + + def test_regex_matches_literals(self): + pattern = re.compile(self.consumer._make_topic_regex()) + assert pattern.match('ResponseEvent') is not None + assert pattern.match('ModelChange-Certificate') is not None + assert pattern.match('ModelChange-OptionGrant') is None diff --git a/tests/test_producer.py b/tests/test_producer.py new file mode 100644 index 0000000..6fb5588 --- /dev/null +++ b/tests/test_producer.py @@ -0,0 +1,48 @@ +from unittest import TestCase +from unittest.mock import patch + +from kafka import KafkaProducer + +from franz import Producer, FranzEvent, InvalidMessage, SerializationError + + +class TestProducer(TestCase): + + def setUp(self): + self.topic = 'test' + self.producer = Producer() + + def test_producer_creation(self): + producer = self.producer.producer + self.assertIsInstance(producer, KafkaProducer) + + def test_send_message(self): + class FranzBytes(FranzEvent, bytes): + pass + + message = FranzBytes(b'testing message') + + with patch.object(self.producer.producer, 'send') as mock: + self.producer.send_message( + self.topic, + message + ) + mock.assert_called_with(self.topic, message) + + def test_send_message_without_subclassing_franz_event_raises_exception(self): + with self.assertRaises(InvalidMessage): + self.producer.send_message(self.topic, b'hi there') + + def test_send_message_with_valid_object_subclass_but_valid_object(self): + class Test: + pass + + class Unserializable(FranzEvent): + def serialize(self): + return { + 'a': 'b', + 'c': Test(), + } + + with self.assertRaises(SerializationError): + self.producer.send_message(self.topic, Unserializable())