Skip to content

Commit

Permalink
update docks (#56)
Browse files Browse the repository at this point in the history
  • Loading branch information
himadieievsv authored Jan 13, 2024
1 parent a48beda commit 011f3a7
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ abstract class AbstractLock : Lock {

/**
* Locks the resource on the given Redis instance.
* @param backend [LocksBackend] the Redis instance to lock the resource on.
* @param resourceName [String] the name of the resource for which lock key was created.
* @param ttl [Duration] the time to live of the lock.
*/
protected open fun lockInstance(
backend: LocksBackend,
Expand All @@ -26,6 +29,8 @@ abstract class AbstractLock : Lock {

/**
* Unlocks the resource on the given Redis instance.
* @param backend [LocksBackend] the Redis instance to unlock the resource on.
* @param resourceName [String] the name of the resource for which lock key was created.
*/
protected open fun unlockInstance(
backend: LocksBackend,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ abstract class CountDownLatchBackend : Backend() {
* e.g. calling this method with the same arguments multiple times should not total counts more than once.
* Also, this method responsible for publishing message to channel if count is reached.
* Message body that is published to channel should be "open".
* @param latchKeyName [String] the name of the latch.
* @param channelName [String] the name of the channel.
* @param clientId [String] the id of the client.
* @param count [Int] the number of current counter state.
* @param initialCount [Int] the number of times [countDown] must be invoked before threads can pass through [await].
* @param ttl [Duration] the maximum time to wait.
* @return [String] the result of operation. If result is null, then operation failed.
*/
abstract fun count(
latchKeyName: String,
Expand All @@ -22,14 +29,30 @@ abstract class CountDownLatchBackend : Backend() {
ttl: Duration,
): String?

/**
* Undo count() operation.
* @param latchKeyName [String] the name of the latch.
* @param clientId [String] the id of the client.
* @param count [Int] the number of current counter state.
* @return [Long] the result of operation. If result is null, then operation failed.
*/
abstract fun undoCount(
latchKeyName: String,
clientId: String,
count: Int,
): Long?

/**
* Check current count of the latch.
* @param latchKeyName [String] the name of the latch.
* @return [Long] the result of operation. If result is null, then operation failed.
*/
abstract fun checkCount(latchKeyName: String): Long?

/** Receive notification about count down latch is opened now. */
/**
* Receive notification about count down latch is opened now.
* @param channelName [String] the name of the channel.
* @return [String] the message body. If result is null, then operation failed.
*/
abstract suspend fun listen(channelName: String): String?
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,39 @@ import java.time.Duration
* An abstraction for underlying storage for distributed locks.
*/
abstract class LocksBackend : Backend() {
/**
* Set lock on the resource with given name.
* @param resourceName [String] the name of the resource.
* @param clientId [String] the id of the client.
* @param ttl [Duration] the maximum time to wait.
* @return [String] the result of operation. If result is null, then operation failed.
*/
abstract fun setLock(
resourceName: String,
clientId: String,
ttl: Duration,
): String?

/**
* Remove lock on the resource with given name.
* @param resourceName [String] the name of the resource.
* @param clientId [String] the id of the client.
* @return [String] the result of operation. If result is null, then operation failed.
*/
abstract fun removeLock(
resourceName: String,
clientId: String,
): String?

/**
* Set semaphore lock on the resource with given name.
* @param leasersKey [String] the name of the leasers key.
* @param leaserValidityKey [String] the name of the leaser validity key.
* @param clientId [String] the id of the client.
* @param maxLeases [Int] the maximum number of leases.
* @param ttl [Duration] the maximum time to wait.
* @return [String] the result of operation. If result is null, then operation failed.
*/
abstract fun setSemaphoreLock(
leasersKey: String,
leaserValidityKey: String,
Expand All @@ -26,12 +48,25 @@ abstract class LocksBackend : Backend() {
ttl: Duration,
): String?

/**
* Remove semaphore lock on the resource with given name.
* @param leasersKey [String] the name of the leasers key.
* @param leaserValidityKey [String] the name of the leaser validity key.
* @param clientId [String] the id of the client.
* @return [String] the result of operation. If result is null, then operation failed.
*/
abstract fun removeSemaphoreLock(
leasersKey: String,
leaserValidityKey: String,
clientId: String,
): String?

/**
* Clean up expired locks.
* @param leasersKey [String] the name of the leasers key.
* @param leaserValidityKeyPrefix [String] the name of the leaser validity key prefix.
* @return [String] the result of operation. If result is null, then operation failed.
*/
abstract fun cleanUpExpiredSemaphoreLocks(
leasersKey: String,
leaserValidityKeyPrefix: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,22 @@ import kotlin.system.measureTimeMillis
* [waitAllJobs] and [waitAnyJobs].
* Also, it checks whether the result is successful on majority (depends on waiting strategy) of instances and time
* spend for getting results is not exceeding some reasonable time difference using [timeout] and
* clok drift.
* clock drift.
* It returns list of results from each instance or empty list if either time validity wasn't met or operation was
* failing on majority of instances.
*
* Coroutine used by callee must be cooperative coroutine (not blocking).
* In order to cancel jobs forcefully, use [withTimeoutInThread] instead.
*
* @param backends [List] of [Backend] instances.
* @param scope [CoroutineScope] the scope to run coroutine in.
* @param timeout [Duration] the maximum time to wait.
* @param defaultDrift [Duration] the default clock drift.
* @param cleanUp [Function] the function to clean up resources on each backend.
* @param waiter [Function] the function to wait for results.
* @param callee [Function] the function to call on each backend.
*/
inline fun <T : Backend, R> multyInstanceExecute(
inline fun <T : Backend, R> multiInstanceExecute(
backends: List<T>,
scope: CoroutineScope,
timeout: Duration,
Expand Down Expand Up @@ -75,7 +83,7 @@ inline fun <T : Backend, R> multyInstanceExecuteWithRetry(
crossinline callee: suspend (backend: T) -> R,
): List<R> {
return withRetry(retryCount = retryCount, retryDelay = retryDelay) {
return@withRetry multyInstanceExecute(
return@withRetry multiInstanceExecute(
backends = backends,
scope = scope,
timeout = timeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class MultyInstanceExecutorTest {
backends.forEach { backend -> every { backend.test() } returns "OK" }

val result =
multyInstanceExecute(
multiInstanceExecute(
backends = backends,
scope = scope,
timeout = Duration.ofSeconds(1),
Expand All @@ -58,7 +58,7 @@ class MultyInstanceExecutorTest {
}

val result =
multyInstanceExecute(
multiInstanceExecute(
backends = backends,
scope = scope,
timeout = Duration.ofSeconds(1),
Expand All @@ -82,7 +82,7 @@ class MultyInstanceExecutorTest {
}

val result =
multyInstanceExecute(
multiInstanceExecute(
backends = backends,
scope = scope,
timeout = Duration.ofSeconds(1),
Expand All @@ -102,7 +102,7 @@ class MultyInstanceExecutorTest {
backends.forEach { backend -> every { backend.test() } returns "OK" }

val result =
multyInstanceExecute(
multiInstanceExecute(
backends = backends,
scope = scope,
timeout = Duration.ofSeconds(1),
Expand All @@ -121,7 +121,7 @@ class MultyInstanceExecutorTest {
every { backends[Random.nextInt(0, number)].test() } returns "OK"

val result =
multyInstanceExecute(
multiInstanceExecute(
backends = backends,
scope = scope,
timeout = Duration.ofSeconds(1),
Expand Down

0 comments on commit 011f3a7

Please sign in to comment.