Skip to content

Commit

Permalink
Kpl fire and forget (#333)
Browse files Browse the repository at this point in the history
* add fire and forget for objects writes to kinesis, bypassing original objects downstream

* ode review and  formatting

Co-authored-by: d.semenov <[email protected]>
  • Loading branch information
semenodm and d.semenov authored May 23, 2020
1 parent b3ca628 commit a12252d
Showing 1 changed file with 30 additions and 0 deletions.
30 changes: 30 additions & 0 deletions fs2-aws/src/main/scala/fs2/aws/kinesis/publisher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,16 @@ object publisher {
Stream.eval(producer.putData(streamName, partitionKey, byteArray))
}

def writeObjectAndBypass[F[_]: Sync, I](
streamName: String,
producer: KinesisProducerClient[F],
encoder: I => ByteBuffer
): Pipe[F, (String, I), (I, ListenableFuture[UserRecordResult])] =
_.evalMap {
case (partitionKey, entity) =>
producer.putData(streamName, partitionKey, encoder(entity)).map(entity -> _)
}

// Register the returned future, returning the UserRecordResult

/** Writes the (partitionKey, ByteBuffer) to a Kinesis stream via a Pipe
Expand Down Expand Up @@ -103,6 +113,26 @@ object publisher {
.map(i -> _)
}

/** Writes the (partitionKey, payload) to a Kinesis stream via a Pipe
*
* @tparam F effect type of the stream
* @tparam I type of payload
* @param streamName the name of the Kinesis stream to write to
* @param parallelism number of concurrent writes to race simultaneously
* @param producer kinesis producer client to use
* @param encoder implicit I => ByteBuffer encoder
* @return a Pipe that accepts a tuple consisting of the partition key string and an entity and returns original entity
*/
def writeAndForgetObjectToKinesis[F[_]: Concurrent, I](
streamName: String,
parallelism: Int = 10,
producer: KinesisProducerClient[F] = new KinesisProducerClientImpl[F]
)(
implicit ec: ExecutionContext,
encoder: I => ByteBuffer
): Pipe[F, (String, I), I] =
_.through(writeObjectAndBypass(streamName, producer, encoder)).map { case (evt, _) => evt }

/** Writes the bytestream to a Kinesis stream via a Sink
*
* @tparam F effect type of the stream
Expand Down

0 comments on commit a12252d

Please sign in to comment.