Skip to content

Commit

Permalink
Count down latch (#17)
Browse files Browse the repository at this point in the history
* implement CountDownLatch

* core test case

* finish core tests for latch

* lettuce and jedis countdown latch backend

* refactor try catches
  • Loading branch information
himadieievsv authored Jan 3, 2024
1 parent 58296a7 commit a8e02f6
Show file tree
Hide file tree
Showing 37 changed files with 1,499 additions and 88 deletions.
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ allprojects {
apply(plugin = "idea")

group = "io.redpulsar"
version = "0.4.1"
version = "0.1.1"

repositories {
mavenCentral()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package io.redpulsar.core.locks

import io.redpulsar.core.locks.abstracts.backends.CountDownLatchBackend
import io.redpulsar.core.locks.api.CallResult
import io.redpulsar.core.locks.api.CountDownLatch
import io.redpulsar.core.locks.excecutors.executeWithRetry
import io.redpulsar.core.locks.excecutors.waitAnyJobs
import io.redpulsar.core.utils.failsafe
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withTimeout
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import kotlin.coroutines.cancellation.CancellationException
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.minutes

class ListeningCountDownLatch(
private val name: String,
private val count: Int,
private val backends: List<CountDownLatchBackend>,
private val maxDuration: Duration = 5.minutes,
private val retryCount: Int = 3,
private val retryDelay: Duration = 100.milliseconds,
) : CountDownLatch {
private val scope: CoroutineScope = CoroutineScope(CoroutineName("listeningCountDownLatch") + Dispatchers.IO)
private val clientId: String = UUID.randomUUID().toString()
private val keySpace = "countdownlatch"
private val channelSpace = "channels"
private val currentCounter = AtomicInteger(count)
private val minimalMaxDuration = 100.milliseconds

init {
require(backends.isNotEmpty()) { "Backend instances must not be empty" }
require(count > 0) { "Count must be positive" }
require(name.isNotBlank()) { "Name must not be blank" }
require(maxDuration > minimalMaxDuration) { "Max duration must be greater that 0.1 second" }
require(retryDelay > 0.milliseconds) { "Retry delay must be positive" }
require(retryCount > 0) { "Retry count must be positive" }
}

override fun countDown(): CallResult {
// Skip if internal counter is already 0
if (currentCounter.get() <= 0) return CallResult.SUCCESS
val result = count()
return if (result.isEmpty()) {
undoCount()
CallResult.FAILED
} else {
currentCounter.decrementAndGet()
CallResult.SUCCESS
}
}

override fun await(): CallResult {
return await(maxDuration)
}

override fun await(timeout: Duration): CallResult {
require(timeout > minimalMaxDuration) { "Timeout must be greater that [minimalMaxDuration]" }
val job =
scope.async {
withTimeout(timeout.inWholeMilliseconds) {
val globalCount = getCount(this)
if (globalCount == Int.MIN_VALUE) return@withTimeout CallResult.FAILED
// Open latch if internal counter or global one is already 0 or less
if (currentCounter.get() <= 0 || globalCount <= 0) return@withTimeout CallResult.SUCCESS
val result = listen(timeout, this)
return@withTimeout if (result.isEmpty()) {
CallResult.FAILED
} else {
CallResult.SUCCESS
}
}
}
var result: CallResult
runBlocking {
result =
try {
job.await()
} catch (e: CancellationException) {
CallResult.FAILED
}
}
return result
}

/**
* Returns the current count. It will return range from 'count' to 0 or below.
* If some instance are not available and quorum wasn't reached, returns [Int.MIN_VALUE].
*/
override fun getCount(): Int {
return getCount(scope)
}

private fun getCount(scope: CoroutineScope): Int {
val result = checkCount(scope)
return if (result.isEmpty()) {
Int.MIN_VALUE
} else {
val maxValue =
result.map { it }
.groupBy { it }
.mapValues { it.value.size }
.maxBy { it.value }.key?.toInt()
if (maxValue != null) {
count - maxValue
} else {
Int.MIN_VALUE
}
}
}

private fun count(): List<String?> {
return backends.executeWithRetry(
scope = scope,
releaseTime = maxDuration,
retryCount = retryCount,
retryDelay = retryDelay,
) { backend ->
backend.count(
latchKeyName = buildKey(name),
channelName = buildKey(channelSpace, name),
clientId = clientId,
count = currentCounter.get(),
initialCount = count,
ttl = maxDuration * 2,
)
}
}

private fun undoCount() {
backends.executeWithRetry(
scope = scope,
releaseTime = maxDuration,
retryCount = retryCount,
retryDelay = retryDelay,
) { backend ->
backend.undoCount(
latchKeyName = buildKey(name),
clientId = clientId,
count = currentCounter.get(),
)
}
}

private fun checkCount(scope: CoroutineScope): List<Long?> {
return backends.executeWithRetry(
scope = scope,
releaseTime = maxDuration * 2,
retryCount = retryCount,
retryDelay = retryDelay,
) { backend ->
backend.checkCount(latchKeyName = buildKey(name))
}
}

private suspend fun listen(
timeout: Duration,
scope: CoroutineScope,
): List<String?> {
return backends.executeWithRetry(
scope = scope,
releaseTime = timeout,
retryCount = retryCount,
retryDelay = retryDelay,
// Allow non-quorum polling here. That might need to be changed as it could lead to unexpected behavior
// if multiple instances goes down or encounter network issue.
waiter = ::waitAnyJobs,
) { backend ->
failsafe(null) {
val flow = backend.listen(channelName = buildKey(channelSpace, name))
flow.first()
}
}
}

@Suppress("NOTHING_TO_INLINE")
private inline fun buildKey(vararg parts: String) = keySpace + ":" + parts.joinToString(":")
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.redpulsar.core.locks

import io.redpulsar.core.locks.abstracts.AbstractMultyInstanceLock
import io.redpulsar.core.locks.abstracts.LocksBackend
import io.redpulsar.core.locks.abstracts.backends.LocksBackend
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlin.time.Duration
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.redpulsar.core.locks

import io.redpulsar.core.locks.abstracts.AbstractMultyInstanceLock
import io.redpulsar.core.locks.abstracts.LocksBackend
import io.redpulsar.core.locks.abstracts.backends.LocksBackend
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlin.time.Duration
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.redpulsar.core.locks

import io.redpulsar.core.locks.abstracts.AbstractLock
import io.redpulsar.core.locks.abstracts.LocksBackend
import io.redpulsar.core.locks.abstracts.backends.LocksBackend
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import kotlin.time.Duration
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.redpulsar.core.locks.abstracts

import io.redpulsar.core.locks.abstracts.backends.LocksBackend
import io.redpulsar.core.locks.api.Lock
import mu.KotlinLogging
import java.util.UUID
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.redpulsar.core.locks.abstracts

import io.redpulsar.core.locks.abstracts.backends.LocksBackend
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.redpulsar.core.locks.abstracts

/**
* Utility abstraction for backends.
*/
abstract class Backend {
protected fun convertToString(result: Any?): String? =
when (result) {
is String -> result
is Any -> result.toString()
else -> (null)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package io.redpulsar.core.locks.abstracts.backends

import io.redpulsar.core.locks.abstracts.Backend
import kotlinx.coroutines.flow.Flow
import kotlin.time.Duration

/**
* An abstraction for underlying storage for distributed count down latch.
*/
abstract class CountDownLatchBackend : Backend() {
/**
* Ensures that count is idempotent.
* e.g. calling this method with the same arguments multiple times should not total counts more than once.
* Also, this method responsible for publishing message to channel if count is reached.
* Message body that is published to channel should be "open".
*/
abstract fun count(
latchKeyName: String,
channelName: String,
clientId: String,
count: Int,
initialCount: Int,
ttl: Duration,
): String?

abstract fun undoCount(
latchKeyName: String,
clientId: String,
count: Int,
): Long?

abstract fun checkCount(latchKeyName: String): Long?

/** Receive notification about count down latch is opened now. This is supposed to be a blocking call*/
abstract fun listen(channelName: String): Flow<String>
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package io.redpulsar.core.locks.abstracts
package io.redpulsar.core.locks.abstracts.backends

import io.redpulsar.core.locks.abstracts.Backend
import kotlin.time.Duration

/**
* An abstraction for underlying storage for distributed locks.
*/
abstract class LocksBackend {
abstract class LocksBackend : Backend() {
abstract fun setLock(
resourceName: String,
clientId: String,
Expand Down Expand Up @@ -35,11 +36,4 @@ abstract class LocksBackend {
leasersKey: String,
leaserValidityKeyPrefix: String,
): String?

protected fun convertToString(result: Any?): String? =
when (result) {
is String -> result
is Any -> result.toString()
else -> (null)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.redpulsar.core.locks.api

import kotlin.time.Duration

interface CountDownLatch {
fun countDown(): CallResult

fun await(): CallResult

fun await(timeout: Duration): CallResult

fun getCount(): Int
}

enum class CallResult {
SUCCESS,
FAILED,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.redpulsar.core.locks.excecutors

import kotlinx.coroutines.Job
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.selects.select

suspend inline fun <T> waitAllJobs(
jobs: List<Job>,
@Suppress("UNUSED_PARAMETER") results: MutableList<T>,
) {
jobs.joinAll()
}

suspend inline fun <T> waitAnyJobs(
jobs: List<Job>,
results: MutableList<T>,
) {
select { jobs.forEach { job -> job.onJoin { } } }
jobs.forEach { job -> job.cancel() }
// enough one success result to consider latch opened
if (results.isNotEmpty()) {
repeat(jobs.size - 1) { results.add(results.first()) }
}
}
Loading

0 comments on commit a8e02f6

Please sign in to comment.