From a12252dc394921c428cab3d47d0b5f5d665d74a5 Mon Sep 17 00:00:00 2001 From: semenodm Date: Sat, 23 May 2020 18:23:26 -0400 Subject: [PATCH] Kpl fire and forget (#333) * add fire and forget for objects writes to kinesis, bypassing original objects downstream * ode review and formatting Co-authored-by: d.semenov --- .../scala/fs2/aws/kinesis/publisher.scala | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/fs2-aws/src/main/scala/fs2/aws/kinesis/publisher.scala b/fs2-aws/src/main/scala/fs2/aws/kinesis/publisher.scala index f3c573eb..b7ee037b 100644 --- a/fs2-aws/src/main/scala/fs2/aws/kinesis/publisher.scala +++ b/fs2-aws/src/main/scala/fs2/aws/kinesis/publisher.scala @@ -25,6 +25,16 @@ object publisher { Stream.eval(producer.putData(streamName, partitionKey, byteArray)) } + def writeObjectAndBypass[F[_]: Sync, I]( + streamName: String, + producer: KinesisProducerClient[F], + encoder: I => ByteBuffer + ): Pipe[F, (String, I), (I, ListenableFuture[UserRecordResult])] = + _.evalMap { + case (partitionKey, entity) => + producer.putData(streamName, partitionKey, encoder(entity)).map(entity -> _) + } + // Register the returned future, returning the UserRecordResult /** Writes the (partitionKey, ByteBuffer) to a Kinesis stream via a Pipe @@ -103,6 +113,26 @@ object publisher { .map(i -> _) } + /** Writes the (partitionKey, payload) to a Kinesis stream via a Pipe + * + * @tparam F effect type of the stream + * @tparam I type of payload + * @param streamName the name of the Kinesis stream to write to + * @param parallelism number of concurrent writes to race simultaneously + * @param producer kinesis producer client to use + * @param encoder implicit I => ByteBuffer encoder + * @return a Pipe that accepts a tuple consisting of the partition key string and an entity and returns original entity + */ + def writeAndForgetObjectToKinesis[F[_]: Concurrent, I]( + streamName: String, + parallelism: Int = 10, + producer: KinesisProducerClient[F] = new KinesisProducerClientImpl[F] + )( + implicit ec: ExecutionContext, + encoder: I => ByteBuffer + ): Pipe[F, (String, I), I] = + _.through(writeObjectAndBypass(streamName, producer, encoder)).map { case (evt, _) => evt } + /** Writes the bytestream to a Kinesis stream via a Sink * * @tparam F effect type of the stream