Skip to content

Commit

Permalink
v0.7.8 - improved kafka support
Browse files Browse the repository at this point in the history
  • Loading branch information
davidohana committed Sep 3, 2020
1 parent 5e117f8 commit 91ffa68
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 12 deletions.
23 changes: 18 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ Drain3 currently supports 3 persistence modes:

- **Kafka** - The snapshot is saved in a dedicated topic used only for snapshots - the last message in this topic
is the last snapshot that will be loaded after restart.
For Kafka persistence, you need to provide: `topic_name` and `server_name`.
For Kafka persistence, you need to provide: `server_list` and `topic_name`.

- **Redis** - The snapshot is saved to a key in Redis database (contributed by @matabares).

Expand All @@ -160,7 +160,7 @@ Note: If you decide to use Kafka or Redis persistence, you should install releva
explicitly, since it is declared as an extra (optional) dependency, by either:

```
pip3 install kafka
pip3 install kafka-python
```

```
Expand Down Expand Up @@ -190,7 +190,20 @@ Our project welcomes external contributions. Please refer to [CONTRIBUTING.md](C

## Change Log

* **0.7.7** - Corrected default Drain config values.
* **0.7.6** - Improvement in config file handling (Note: new sections were added instead of `DEFAULT` section)
* **0.7.5** - Made Kafka and Redis optional requirements
##### v0.7.8

* Using `kafka-python` package instead of `kafka` (newer).
* Added support for specifying additional configuration as `kwargs` in Kafka persistence handler.

##### v0.7.7

* Corrected default Drain config values.

##### v0.7.6

* Improvement in config file handling (Note: new sections were added instead of `DEFAULT` section)

##### v0.7.5

* Made Kafka and Redis optional requirements

10 changes: 6 additions & 4 deletions drain3/kafka_persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,19 @@


class KafkaPersistence(PersistenceHandler):
def __init__(self, server_list, topic, snapshot_poll_timeout_sec=60):
self.server_list = server_list
def __init__(self, bootstrap_servers, topic, snapshot_poll_timeout_sec=60, **kafka_client_options):
if bootstrap_servers:
kafka_client_options["bootstrap_servers"] = bootstrap_servers
self.topic = topic
self.producer = kafka.KafkaProducer(bootstrap_servers=server_list)
self.kafka_client_options = kafka_client_options
self.producer = kafka.KafkaProducer(**self.kafka_client_options)
self.snapshot_poll_timeout_sec = snapshot_poll_timeout_sec

def save_state(self, state):
self.producer.send(self.topic, value=state)

def load_state(self):
consumer = kafka.KafkaConsumer(bootstrap_servers=self.server_list)
consumer = kafka.KafkaConsumer(**self.kafka_client_options)
partition = kafka.TopicPartition(self.topic, 0)
consumer.assign([partition])
end_offsets = consumer.end_offsets([partition])
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
jsonpickle==1.4.1
kafka==1.3.5
kafka-python==2.0.1
redis==3.5.3
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
setup(
name='drain3',
packages=['drain3'],
version="0.7.7",
version="0.7.8",
license='MIT',
description="Persistent & streaming log template miner",
long_description=long_description,
Expand All @@ -23,7 +23,7 @@
'jsonpickle==1.4.1',
],
extras_require={
"kafka": ['kafka==1.3.5'],
"kafka": ['kafka-python==2.0.1'],
"redis": ['redis==3.5.3'],
},
classifiers=[
Expand Down

0 comments on commit 91ffa68

Please sign in to comment.