diff --git a/sample/src/main/java/com/trendyol/transmission/components/features/input/InputTransformer.kt b/sample/src/main/java/com/trendyol/transmission/components/features/input/InputTransformer.kt index 2d8708c..477691d 100644 --- a/sample/src/main/java/com/trendyol/transmission/components/features/input/InputTransformer.kt +++ b/sample/src/main/java/com/trendyol/transmission/components/features/input/InputTransformer.kt @@ -1,7 +1,11 @@ package com.trendyol.transmission.components.features.input +import androidx.compose.ui.graphics.Color import com.trendyol.transmission.DefaultDispatcher +import com.trendyol.transmission.ExperimentalTransmissionApi +import com.trendyol.transmission.components.features.InputUiState import com.trendyol.transmission.components.features.colorpicker.ColorPickerEffect +import com.trendyol.transmission.components.features.multioutput.multiOutputTransformerIdentity import com.trendyol.transmission.transformer.Transformer import com.trendyol.transmission.transformer.dataholder.dataHolder import com.trendyol.transmission.transformer.handler.Handlers @@ -9,13 +13,13 @@ import com.trendyol.transmission.transformer.handler.createHandlers import com.trendyol.transmission.transformer.handler.onEffect import com.trendyol.transmission.transformer.handler.onSignal import com.trendyol.transmission.transformer.request.Contracts +import com.trendyol.transmission.transformer.request.checkpointWithArgs import com.trendyol.transmission.transformer.request.computation import com.trendyol.transmission.transformer.request.computation.Computations import com.trendyol.transmission.transformer.request.computation.createComputations import com.trendyol.transmission.transformer.request.computation.register import com.trendyol.transmission.transformer.request.computationWithArgs import com.trendyol.transmission.transformer.request.dataHolder -import com.trendyol.transmission.components.features.InputUiState import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.delay import javax.inject.Inject @@ -37,20 +41,29 @@ class InputTransformer @Inject constructor( } } + @OptIn(ExperimentalTransmissionApi::class) override val handlers: Handlers = createHandlers { onSignal { signal -> holder.update { it.copy(writtenText = signal.value) } + val color = pauseOn(colorCheckpoint) + send( + effect = ColorPickerEffect.SelectedColorUpdate(color), + identity = multiOutputTransformerIdentity + ) publish(effect = InputEffect.InputUpdate(signal.value)) } onEffect { effect -> + validate(colorCheckpoint, effect.color) holder.update { it.copy(backgroundColor = effect.color) } } } + @OptIn(ExperimentalTransmissionApi::class) companion object { val writtenInputWithArgs = Contracts.computationWithArgs() val writtenInputContract = Contracts.computation() val holderContract = Contracts.dataHolder() + val colorCheckpoint = Contracts.checkpointWithArgs() } } diff --git a/transmission-test/src/main/java/com/trendyol/transmissiontest/TransformerTestScope.kt b/transmission-test/src/main/java/com/trendyol/transmissiontest/TransformerTestScope.kt index 3fbd639..b01f656 100644 --- a/transmission-test/src/main/java/com/trendyol/transmissiontest/TransformerTestScope.kt +++ b/transmission-test/src/main/java/com/trendyol/transmissiontest/TransformerTestScope.kt @@ -1,7 +1,6 @@ package com.trendyol.transmissiontest import com.trendyol.transmission.Transmission -import com.trendyol.transmission.effect.EffectWrapper interface TransformerTestScope { val dataStream: List diff --git a/transmission/src/main/java/com/trendyol/transmission/Annotations.kt b/transmission/src/main/java/com/trendyol/transmission/Annotations.kt new file mode 100644 index 0000000..e9812b3 --- /dev/null +++ b/transmission/src/main/java/com/trendyol/transmission/Annotations.kt @@ -0,0 +1,16 @@ +package com.trendyol.transmission + +/** + * Marks internal declarations in Transmission. Internal declarations must not be used outside the library. + * There are no backward compatibility guarantees between different versions of Transmission. + */ +@RequiresOptIn(level = RequiresOptIn.Level.ERROR) +@Retention(AnnotationRetention.BINARY) +annotation class InternalTransmissionApi + +/** + * Marks experimental API in Transmission. An experimental API can be changed or removed at any time. + */ +@RequiresOptIn(level = RequiresOptIn.Level.WARNING) +@Retention(AnnotationRetention.BINARY) +annotation class ExperimentalTransmissionApi diff --git a/transmission/src/main/java/com/trendyol/transmission/effect/EffectWrapper.kt b/transmission/src/main/java/com/trendyol/transmission/effect/EffectWrapper.kt index dae229f..6994ed3 100644 --- a/transmission/src/main/java/com/trendyol/transmission/effect/EffectWrapper.kt +++ b/transmission/src/main/java/com/trendyol/transmission/effect/EffectWrapper.kt @@ -3,7 +3,7 @@ package com.trendyol.transmission.effect import com.trendyol.transmission.Transmission import com.trendyol.transmission.transformer.request.Contract -data class EffectWrapper( +internal data class EffectWrapper( val effect: Transmission.Effect, val identity: Contract.Identity? = null ) diff --git a/transmission/src/main/java/com/trendyol/transmission/identifier/IdentifierGenerator.kt b/transmission/src/main/java/com/trendyol/transmission/identifier/IdentifierGenerator.kt index c6416b4..772cb20 100644 --- a/transmission/src/main/java/com/trendyol/transmission/identifier/IdentifierGenerator.kt +++ b/transmission/src/main/java/com/trendyol/transmission/identifier/IdentifierGenerator.kt @@ -3,9 +3,12 @@ package com.trendyol.transmission.identifier import kotlin.uuid.ExperimentalUuidApi import kotlin.uuid.Uuid +/** + * Generates identifier for internal communication. + */ @OptIn(ExperimentalUuidApi::class) internal object IdentifierGenerator { fun generateIdentifier(): String { return Uuid.random().toString() } -} \ No newline at end of file +} diff --git a/transmission/src/main/java/com/trendyol/transmission/router/RequestDelegate.kt b/transmission/src/main/java/com/trendyol/transmission/router/RequestDelegate.kt index 2864e46..161e5ab 100644 --- a/transmission/src/main/java/com/trendyol/transmission/router/RequestDelegate.kt +++ b/transmission/src/main/java/com/trendyol/transmission/router/RequestDelegate.kt @@ -1,5 +1,3 @@ -@file:OptIn(ExperimentalUuidApi::class) - package com.trendyol.transmission.router import com.trendyol.transmission.Transmission @@ -20,7 +18,6 @@ import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.flow.shareIn import kotlinx.coroutines.launch -import kotlin.uuid.ExperimentalUuidApi internal class RequestDelegate( private val queryScope: CoroutineScope, @@ -49,7 +46,7 @@ internal class RequestDelegate( // region process queries - private suspend fun processQuery(query: Query) = queryScope.launch { + private fun processQuery(query: Query) = queryScope.launch { when (query) { is Query.Computation -> processComputationQuery(query) is Query.Data -> processDataQuery(query) @@ -60,7 +57,7 @@ internal class RequestDelegate( } private fun processDataQuery( - query: Query.Data + query: Query.Data, ) = queryScope.launch { val dataHolder = routerRef.transformerSet .filter { it.storage.isHolderStateInitialized() } @@ -79,7 +76,7 @@ internal class RequestDelegate( } private fun processComputationQuery( - query: Query.Computation + query: Query.Computation, ) = queryScope.launch { val computationHolder = routerRef.transformerSet .find { it.storage.hasComputation(query.key) } @@ -110,7 +107,7 @@ internal class RequestDelegate( } private fun processComputationQueryWithArgs( - query: Query.ComputationWithArgs + query: Query.ComputationWithArgs, ) = queryScope.launch { val computationHolder = routerRef.transformerSet .find { it.storage.hasComputation(query.key) } @@ -145,7 +142,7 @@ internal class RequestDelegate( } private fun processExecution( - query: Query.Execution + query: Query.Execution, ) = queryScope.launch { val executionHolder = routerRef.transformerSet .find { it.storage.hasExecution(query.key) } ?: return@launch @@ -180,7 +177,7 @@ internal class RequestDelegate( } private fun testDataQuery( - query: Query.Data + query: Query.Data, ) = queryScope.launch { val dataToSend = QueryResult.Data( owner = query.sender, @@ -194,7 +191,7 @@ internal class RequestDelegate( } private fun testComputationQuery( - query: Query.Computation + query: Query.Computation, ) = queryScope.launch { val computationToSend = QueryResult.Computation( owner = query.sender, @@ -206,7 +203,7 @@ internal class RequestDelegate( } private fun testComputationQueryWithArgs( - query: Query.ComputationWithArgs + query: Query.ComputationWithArgs, ) = queryScope.launch { val computationToSend = QueryResult.Computation( owner = query.sender, @@ -236,7 +233,7 @@ internal class RequestDelegate( override suspend fun , D : Any> compute( contract: C, - invalidate: Boolean + invalidate: Boolean, ): D? { val queryIdentifier = IdentifierGenerator.generateIdentifier() outGoingQuery.send( @@ -256,7 +253,7 @@ internal class RequestDelegate( override suspend fun , A : Any, D : Any> compute( contract: C, args: A, - invalidate: Boolean + invalidate: Boolean, ): D? { val queryIdentifier = IdentifierGenerator.generateIdentifier() outGoingQuery.send( @@ -284,7 +281,7 @@ internal class RequestDelegate( override suspend fun , A : Any> execute( contract: C, - args: A + args: A, ) { outGoingQuery.send( Query.ExecutionWithArgs( diff --git a/transmission/src/main/java/com/trendyol/transmission/router/TransmissionRouter.kt b/transmission/src/main/java/com/trendyol/transmission/router/TransmissionRouter.kt index a0a22ee..3b824c5 100644 --- a/transmission/src/main/java/com/trendyol/transmission/router/TransmissionRouter.kt +++ b/transmission/src/main/java/com/trendyol/transmission/router/TransmissionRouter.kt @@ -5,6 +5,7 @@ import com.trendyol.transmission.effect.EffectWrapper import com.trendyol.transmission.router.builder.TransmissionRouterBuilderScope import com.trendyol.transmission.router.loader.TransformerSetLoader import com.trendyol.transmission.transformer.Transformer +import com.trendyol.transmission.transformer.checkpoint.CheckpointTracker import com.trendyol.transmission.transformer.request.Contract import com.trendyol.transmission.transformer.request.RequestHandler import kotlinx.coroutines.CoroutineDispatcher @@ -42,6 +43,8 @@ class TransmissionRouter internal constructor( private val dataBroadcast = routerScope.createBroadcast() private val effectBroadcast = routerScope.createBroadcast() + private val checkpointTracker = CheckpointTracker() + val dataStream = dataBroadcast.output val effectStream: SharedFlow = effectBroadcast.output.map { it.effect } .shareIn(routerScope, SharingStarted.WhileSubscribed()) @@ -51,6 +54,7 @@ class TransmissionRouter internal constructor( routerRef = this@TransmissionRouter, registry = registryScope ) + val requestHelper: RequestHandler = _requestDelegate init { @@ -97,6 +101,7 @@ class TransmissionRouter internal constructor( } transformerSet.forEach { transformer -> transformer.run { + bindCheckpointTracker(checkpointTracker) startSignalCollection(incoming = signalBroadcast.output) startDataPublishing(data = dataBroadcast.producer) startEffectProcessing( diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/CommunicationScopeBuilder.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/CommunicationScopeBuilder.kt index 5a15700..db6c627 100644 --- a/transmission/src/main/java/com/trendyol/transmission/transformer/CommunicationScopeBuilder.kt +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/CommunicationScopeBuilder.kt @@ -1,5 +1,6 @@ package com.trendyol.transmission.transformer +import com.trendyol.transmission.ExperimentalTransmissionApi import com.trendyol.transmission.Transmission import com.trendyol.transmission.effect.EffectWrapper import com.trendyol.transmission.transformer.handler.CommunicationScope @@ -7,6 +8,7 @@ import com.trendyol.transmission.transformer.request.Contract import com.trendyol.transmission.transformer.request.TransformerRequestDelegate import kotlinx.coroutines.channels.Channel +@OptIn(ExperimentalTransmissionApi::class) internal class CommunicationScopeBuilder( private val effectChannel: Channel, private val dataChannel: Channel, @@ -29,14 +31,14 @@ internal class CommunicationScopeBuilder( } override suspend fun , D : Transmission.Data> getData(contract: C): D? { - return requestDelegate.interactor.getData(contract) + return requestDelegate.requestHandler.getData(contract) } override suspend fun , D : Any> compute( contract: C, invalidate: Boolean ): D? { - return requestDelegate.interactor.compute(contract, invalidate) + return requestDelegate.requestHandler.compute(contract, invalidate) } override suspend fun , A : Any, D : Any> compute( @@ -44,17 +46,55 @@ internal class CommunicationScopeBuilder( args: A, invalidate: Boolean ): D? { - return requestDelegate.interactor.compute(contract, args, invalidate) + return requestDelegate.requestHandler.compute(contract, args, invalidate) } override suspend fun execute(contract: Contract.Execution) { - requestDelegate.interactor.execute(contract) + requestDelegate.requestHandler.execute(contract) } override suspend fun , A : Any> execute( contract: C, args: A ) { - requestDelegate.interactor.execute(contract, args) + requestDelegate.requestHandler.execute(contract, args) + } + + @ExperimentalTransmissionApi + override suspend fun CommunicationScope.pauseOn( + contract: Contract.Checkpoint.Default + ) { + with(requestDelegate.checkpointHandler) { + pauseOn(contract) + } + } + + @ExperimentalTransmissionApi + override suspend fun CommunicationScope.pauseOn( + vararg contract: Contract.Checkpoint.Default + ) { + with(requestDelegate.checkpointHandler) { + pauseOn(*contract) + } + } + + @ExperimentalTransmissionApi + override suspend fun , A : Any> CommunicationScope.pauseOn( + contract: C + ): A { + return with(requestDelegate.checkpointHandler) { + pauseOn(contract) + } + } + + override suspend fun validate(contract: Contract.Checkpoint.Default) { + requestDelegate.checkpointHandler.validate(contract) + } + + override suspend fun , A : Any> validate( + contract: C, + args: A + ) { + requestDelegate.checkpointHandler.validate(contract, args) } } diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/Transformer.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/Transformer.kt index 714374a..7aecaea 100644 --- a/transmission/src/main/java/com/trendyol/transmission/transformer/Transformer.kt +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/Transformer.kt @@ -1,8 +1,10 @@ package com.trendyol.transmission.transformer +import com.trendyol.transmission.ExperimentalTransmissionApi import com.trendyol.transmission.Transmission import com.trendyol.transmission.effect.EffectWrapper import com.trendyol.transmission.effect.RouterEffect +import com.trendyol.transmission.transformer.checkpoint.CheckpointTracker import com.trendyol.transmission.transformer.handler.CommunicationScope import com.trendyol.transmission.transformer.handler.ExtendedHandlers import com.trendyol.transmission.transformer.handler.Handlers @@ -36,6 +38,7 @@ import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.launch import kotlinx.coroutines.supervisorScope +@OptIn(ExperimentalTransmissionApi::class) open class Transformer( identity: Contract.Identity = Contracts.identity(), dispatcher: CoroutineDispatcher = Dispatchers.Default, @@ -51,7 +54,14 @@ open class Transformer( private val _identity: Contract.Identity = identity private val effectChannel: Channel = Channel(capacity = Channel.BUFFERED) - private val requestDelegate = TransformerRequestDelegate(transformerScope, _identity) + private var checkpointProvider: () -> CheckpointTracker? = { null } + private val requestDelegate by lazy { + TransformerRequestDelegate( + scope = transformerScope, + checkpointTrackerProvider = checkpointProvider, + identity = _identity + ) + } internal val dataChannel: Channel = Channel(capacity = Channel.BUFFERED) internal val storage = TransformerStorage() @@ -73,11 +83,18 @@ open class Transformer( var currentEffectProcessing: Job? = null var currentSignalProcessing: Job? = null - val communicationScope: CommunicationScope = CommunicationScopeBuilder( - effectChannel = effectChannel, - dataChannel = dataChannel, - requestDelegate = requestDelegate - ) + val communicationScope: CommunicationScope by lazy { + CommunicationScopeBuilder( + effectChannel = effectChannel, + dataChannel = dataChannel, + requestDelegate = requestDelegate + ) + } + + + internal fun bindCheckpointTracker(tracker: CheckpointTracker) { + checkpointProvider = { tracker } + } internal fun startSignalCollection(incoming: SharedFlow) { transformerScope.launch { diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointHandler.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointHandler.kt new file mode 100644 index 0000000..f349db9 --- /dev/null +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointHandler.kt @@ -0,0 +1,54 @@ +package com.trendyol.transmission.transformer.checkpoint + +import com.trendyol.transmission.ExperimentalTransmissionApi +import com.trendyol.transmission.transformer.handler.CommunicationScope +import com.trendyol.transmission.transformer.request.Contract + +interface CheckpointHandler { + + /** + * Pauses the processing of a [Transmission.Signal] or [Transmission.Effect] using + * the given [Contract.Checkpoint]. + * @param contract Checkpoint to check for pause condition + * @return Unit after the [contract] is validated + */ + @ExperimentalTransmissionApi + suspend fun CommunicationScope.pauseOn( + contract: Contract.Checkpoint.Default, + ) + + /** + * Pauses the processing of a [Transmission.Signal] or [Transmission.Effect] using + * the given [Contract.Checkpoint]s. + * @param contract Checkpoints to check for pause condition. + * @return Unit after the [contract]s are validated + * @throws IllegalStateException when no [Contract.Checkpoint] is supplied + */ + @ExperimentalTransmissionApi + suspend fun CommunicationScope.pauseOn( + vararg contract: Contract.Checkpoint.Default, + ) + + /** + * Pauses the processing of a [Transmission.Signal] or [Transmission.Effect] using + * the given [Contract.CheckpointWithArgs]s. + * @param contract Checkpoint with Args to check for pause condition. + * @return the type of argument depicted in [contract] after the checkpoint is validated + */ + @ExperimentalTransmissionApi + suspend fun , A : Any> CommunicationScope.pauseOn( + contract: C, + ): A + + /** + * Validates the given [Contract.Checkpoint] and resumes the execution added with [pauseOn] + */ + @ExperimentalTransmissionApi + suspend fun validate(contract: Contract.Checkpoint.Default) + + /** + * Validates the given [Contract.CheckpointWithArgs] and resumes the execution added with [pauseOn] + */ + @ExperimentalTransmissionApi + suspend fun , A : Any> validate(contract: C, args: A) +} diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointTracker.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointTracker.kt new file mode 100644 index 0000000..f542158 --- /dev/null +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointTracker.kt @@ -0,0 +1,43 @@ +package com.trendyol.transmission.transformer.checkpoint + +import com.trendyol.transmission.InternalTransmissionApi +import com.trendyol.transmission.transformer.request.Contract +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ConcurrentMap + +@OptIn(InternalTransmissionApi::class) +internal class CheckpointTracker { + private val tracker: ConcurrentMap>> = + ConcurrentHashMap() + private val contractTracker: ConcurrentMap = ConcurrentHashMap() + + fun registerContract(contract: Contract.Checkpoint, identifier: String) { + contractTracker.put(contract, identifier) + } + + fun putOrCreate( + identifier: String, + validator: CheckpointValidator + ) { + tracker + .putIfAbsent(identifier, ArrayDeque>().apply { + addLast(validator) + })?.addLast(validator) + } + + fun useValidator(contract: Contract.Checkpoint): CheckpointValidator? { + val identifier = contractTracker[contract] ?: return null + return tracker[identifier]?.firstOrNull() as? CheckpointValidator + } + + fun removeValidator(contract: Contract.Checkpoint) { + val identifier = contractTracker[contract] ?: return + val contractsToRemove = contractTracker.entries + .filter { it.value == identifier } + .map { it.key } + contractsToRemove.forEach { + contractTracker.remove(it) + } + tracker[identifier]?.removeFirstOrNull() + } +} diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointValidator.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointValidator.kt new file mode 100644 index 0000000..ccf5d23 --- /dev/null +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/checkpoint/CheckpointValidator.kt @@ -0,0 +1,9 @@ +package com.trendyol.transmission.transformer.checkpoint + +import com.trendyol.transmission.InternalTransmissionApi +import com.trendyol.transmission.transformer.request.Contract + +@InternalTransmissionApi +interface CheckpointValidator { + suspend fun validate(contract: C, args: A): Boolean +} \ No newline at end of file diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/dataholder/TransmissionDataHolder.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/dataholder/TransmissionDataHolder.kt index 965563b..f852813 100644 --- a/transmission/src/main/java/com/trendyol/transmission/transformer/dataholder/TransmissionDataHolder.kt +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/dataholder/TransmissionDataHolder.kt @@ -16,7 +16,6 @@ interface TransmissionDataHolder { suspend fun updateAndGet(updater: (T) -> @UnsafeVariance T): T } -@PublishedApi internal class TransmissionDataHolderImpl( initialValue: T, publishUpdates: Boolean, diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/handler/CommunicationScope.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/handler/CommunicationScope.kt index 9f9971e..53de3be 100644 --- a/transmission/src/main/java/com/trendyol/transmission/transformer/handler/CommunicationScope.kt +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/handler/CommunicationScope.kt @@ -3,10 +3,11 @@ package com.trendyol.transmission.transformer.handler import com.trendyol.transmission.Transmission import com.trendyol.transmission.router.TransmissionRouter import com.trendyol.transmission.transformer.Transformer +import com.trendyol.transmission.transformer.checkpoint.CheckpointHandler import com.trendyol.transmission.transformer.request.Contract import com.trendyol.transmission.transformer.request.RequestHandler -interface CommunicationScope : RequestHandler { +interface CommunicationScope : RequestHandler, CheckpointHandler { /** * Sends data to [TransmissionRouter] * @param data of type [Transmission.Data] diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/request/Contract.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/request/Contract.kt index 4a20b4f..bbddd8b 100644 --- a/transmission/src/main/java/com/trendyol/transmission/transformer/request/Contract.kt +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/request/Contract.kt @@ -20,9 +20,22 @@ sealed interface Contract { internal val useCache: Boolean = false ) : Contract - class Execution internal constructor(internal val key: String) : Contract + @JvmInline + value class Execution internal constructor(internal val key: String) : Contract class ExecutionWithArgs internal constructor( internal val key: String ) : Contract + + sealed class Checkpoint( + internal open val key: String, + ) : Contract { + class Default internal constructor( + internal override val key: String, + ) : Checkpoint(key) + + class WithArgs internal constructor( + internal override val key: String, + ) : Checkpoint(key) + } } diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/request/ContractExt.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/request/ContractExt.kt index 4b9c02e..594a09c 100644 --- a/transmission/src/main/java/com/trendyol/transmission/transformer/request/ContractExt.kt +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/request/ContractExt.kt @@ -1,5 +1,6 @@ package com.trendyol.transmission.transformer.request +import com.trendyol.transmission.ExperimentalTransmissionApi import com.trendyol.transmission.Transmission import com.trendyol.transmission.identifier.IdentifierGenerator @@ -38,3 +39,17 @@ fun Contracts.execution(): Contract.Execution { fun Contracts.executionWithArgs(): Contract.ExecutionWithArgs { return Contract.ExecutionWithArgs(key = IdentifierGenerator.generateIdentifier()) } + +@ExperimentalTransmissionApi +fun Contracts.checkpoint(): Contract.Checkpoint.Default { + return Contract.Checkpoint.Default( + key = IdentifierGenerator.generateIdentifier(), + ) +} + +@ExperimentalTransmissionApi +fun Contracts.checkpointWithArgs(): Contract.Checkpoint.WithArgs { + return Contract.Checkpoint.WithArgs( + key = IdentifierGenerator.generateIdentifier(), + ) +} diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/request/Query.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/request/Query.kt index 14818a0..2d2a8b5 100644 --- a/transmission/src/main/java/com/trendyol/transmission/transformer/request/Query.kt +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/request/Query.kt @@ -31,4 +31,5 @@ internal sealed interface Query { val key: String, val args: A, ) : Query + } diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/request/QueryResult.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/request/QueryResult.kt index 579ebbe..aa6a85b 100644 --- a/transmission/src/main/java/com/trendyol/transmission/transformer/request/QueryResult.kt +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/request/QueryResult.kt @@ -2,7 +2,7 @@ package com.trendyol.transmission.transformer.request import com.trendyol.transmission.Transmission -sealed class QueryResult( +internal sealed class QueryResult( open val owner: String, open val key: String, ) { @@ -19,4 +19,12 @@ sealed class QueryResult( val data: D?, val resultIdentifier: String, ) : QueryResult(owner, key) + + class Checkpoint( + override val owner: String, + override val key: String, + val data: D, + val resultIdentifier: String, + ) : QueryResult(owner, key) + } diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/request/RequestHandler.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/request/RequestHandler.kt index c0d4ce9..c608bff 100644 --- a/transmission/src/main/java/com/trendyol/transmission/transformer/request/RequestHandler.kt +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/request/RequestHandler.kt @@ -1,23 +1,50 @@ package com.trendyol.transmission.transformer.request import com.trendyol.transmission.Transmission +import com.trendyol.transmission.transformer.Transformer interface RequestHandler { + /** + * Gets the data using the provided [Contract.DataHolder] + * @param contract DataHolder Contract to be sent + */ suspend fun , D : Transmission.Data> getData(contract: C): D? + /** + * Starts computation in the target [Transformer] and returns the result data. + * @param contract Computation Contract to be sent + * @param invalidate if the Computation is cached, this invalidates the result make it compute + * again. If it is not cached, it doesn't have any effect. + */ suspend fun , D : Any> compute( contract: C, invalidate: Boolean = false, ): D? + /** + * Starts computation in the target [Transformer] and returns the result data. + * @param contract Computation Contract to be sent + * @param args data required by Computation + * @param invalidate if the Computation is cached, this invalidates the result make it compute + * again. If it is not cached, it doesn't have any effect. + */ suspend fun , A : Any, D : Any> compute( contract: C, args: A, invalidate: Boolean = false, ): D? + /** + * Starts Execution in the target [Transformer] + * @param contract Execution Contract to be sent + */ suspend fun execute(contract: Contract.Execution) + /** + * Starts Execution in the target [Transformer] + * @param contract Execution Contract to be sent + * @param args data required by Execution + */ suspend fun , A : Any> execute(contract: C, args: A) } diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/request/TransformerRequestDelegate.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/request/TransformerRequestDelegate.kt index 3ce398b..d50924f 100644 --- a/transmission/src/main/java/com/trendyol/transmission/transformer/request/TransformerRequestDelegate.kt +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/request/TransformerRequestDelegate.kt @@ -1,20 +1,142 @@ package com.trendyol.transmission.transformer.request +import com.trendyol.transmission.ExperimentalTransmissionApi +import com.trendyol.transmission.InternalTransmissionApi import com.trendyol.transmission.Transmission import com.trendyol.transmission.identifier.IdentifierGenerator import com.trendyol.transmission.router.createBroadcast +import com.trendyol.transmission.transformer.checkpoint.CheckpointHandler +import com.trendyol.transmission.transformer.checkpoint.CheckpointTracker +import com.trendyol.transmission.transformer.checkpoint.CheckpointValidator +import com.trendyol.transmission.transformer.handler.CommunicationScope import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.filterIsInstance import kotlinx.coroutines.flow.first +import kotlinx.coroutines.suspendCancellableCoroutine +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import java.util.concurrent.ConcurrentHashMap +import kotlin.coroutines.resume -internal class TransformerRequestDelegate(scope: CoroutineScope, identity: Contract.Identity) { +@OptIn(InternalTransmissionApi::class) +@ExperimentalTransmissionApi +internal class TransformerRequestDelegate( + scope: CoroutineScope, + checkpointTrackerProvider: () -> CheckpointTracker?, + identity: Contract.Identity +) { val outGoingQuery: Channel = Channel(capacity = Channel.BUFFERED) val resultBroadcast = scope.createBroadcast() - val interactor: RequestHandler = object : RequestHandler { + val checkpointHandler: CheckpointHandler by lazy { + object : CheckpointHandler { + + @ExperimentalTransmissionApi + override suspend fun CommunicationScope.pauseOn(contract: Contract.Checkpoint.Default) { + val queryIdentifier = IdentifierGenerator.generateIdentifier() + suspendCancellableCoroutine { continuation -> + val validator = + object : CheckpointValidator { + + override suspend fun validate( + contract: Contract.Checkpoint.Default, + args: Unit + ): Boolean { + continuation.resume(Unit) + return true + } + } + checkpointTrackerProvider()?.run { + registerContract(contract, queryIdentifier) + putOrCreate(queryIdentifier, validator) + } + } + } + + @ExperimentalTransmissionApi + override suspend fun CommunicationScope.pauseOn( + vararg contract: Contract.Checkpoint.Default + ) { + val contractList = contract.toList() + check(contractList.isNotEmpty()) { + "At least one checkpoint should be provided" + } + check(contractList.toSet().size == contractList.size) { + "All Checkpoint Contracts should be unique" + } + val queryIdentifier = IdentifierGenerator.generateIdentifier() + suspendCancellableCoroutine { continuation -> + val validator = + object : CheckpointValidator { + private val lock = Mutex() + private val contractMap = + ConcurrentHashMap() + .apply { putAll(contractList.map { it to false }) } + + override suspend fun validate( + contract: Contract.Checkpoint.Default, + args: Unit + ): Boolean { + lock.withLock { contractMap.put(contract, true) } + if (contractMap.values.all { it }) { + continuation.resume(Unit) + return true + } else return false + } + } + checkpointTrackerProvider()?.run { + contractList.forEach { registerContract(it, queryIdentifier) } + putOrCreate(queryIdentifier, validator) + } + } + } + + @ExperimentalTransmissionApi + override suspend fun , A : Any> CommunicationScope.pauseOn( + contract: C + ): A { + val queryIdentifier = IdentifierGenerator.generateIdentifier() + return suspendCancellableCoroutine { continuation -> + val validator = object : CheckpointValidator { + override suspend fun validate(contract: C, args: A): Boolean { + continuation.resume(args) + return true + } + } + checkpointTrackerProvider()?.run { + registerContract(contract, queryIdentifier) + putOrCreate(queryIdentifier, validator) + } + } + } + + override suspend fun validate(contract: Contract.Checkpoint.Default) { + val validator = checkpointTrackerProvider() + ?.useValidator(contract) + if (validator?.validate(contract, Unit) == true) { + checkpointTrackerProvider() + ?.removeValidator(contract) + } + } + + override suspend fun , A : Any> validate( + contract: C, + args: A + ) { + val validator = checkpointTrackerProvider() + ?.useValidator(contract) + if (validator?.validate(contract, args) == true) { + checkpointTrackerProvider() + ?.removeValidator(contract) + } + } + } + } + + val requestHandler: RequestHandler = object : RequestHandler { override suspend fun , D : Transmission.Data> getData(contract: C): D? { val queryIdentifier = IdentifierGenerator.generateIdentifier() diff --git a/transmission/src/main/java/com/trendyol/transmission/transformer/request/execution/ExecutionExt.kt b/transmission/src/main/java/com/trendyol/transmission/transformer/request/execution/ExecutionExt.kt index 3ff1d0a..3b0e2e9 100644 --- a/transmission/src/main/java/com/trendyol/transmission/transformer/request/execution/ExecutionExt.kt +++ b/transmission/src/main/java/com/trendyol/transmission/transformer/request/execution/ExecutionExt.kt @@ -13,8 +13,8 @@ import com.trendyol.transmission.transformer.request.RequestHandler * Can be queried using [RequestHandler.execute] * @param execution execution to get the result [Transmission.Data] */ -fun ExecutionScope.register( - contract: C, +fun ExecutionScope.register( + contract: Contract.Execution, execution: suspend RequestHandler.() -> Unit, ) { this.executionRegistry.buildWith(contract.key, execution)