A simple Python AWS Kinesis Producer.
- Error handling and retrying with exponential backoff
- Automatic batching and flush callbacks
- Threaded execution
Inspired by the AWS blog post Implementing Efficient and Reliable Producers with the Amazon Kinesis Producer Library.
You can use pip
to install Kiner.
pip install kiner
To use Kiner, you'll need to have AWS authentication credentials configured
as stated in the boto3
documentation
from kiner.producer import KinesisProducer
p = KinesisProducer('stream-name', batch_size=500, max_retries=5, threads=10)
for i in range(10000):
p.put_record(i)
p.close()
To be notified when data is flushed to AWS Kinesis, provide a flush_callback
from uuid import uuid4
from kiner.producer import KinesisProducer
def on_flush(count, last_flushed_at, Data=b'', PartitionKey='', Metadata=()):
print(f"""
Flushed {count} messages at timestamp {last_flushed_at}
Last message was {Metadata['id']} paritioned by {PartitionKey} ({len(Data)} bytes)
""")
p = KinesisProducer('stream-name', flush_callback=on_flush)
for i in range(10000):
p.put_record(i, metadata={'id': uuid4()}, partition_key=f"{i % 2}")
p.close()
- Logo design by @area55git