Skip to content

Commit

Permalink
fix: event delay handling (#3056)
Browse files Browse the repository at this point in the history
  • Loading branch information
Garzas authored Oct 15, 2024
1 parent 5e31dec commit 0bc15da
Show file tree
Hide file tree
Showing 6 changed files with 324 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,24 @@ import com.wire.kalium.logic.CoreFailure
import com.wire.kalium.logic.NetworkFailure
import com.wire.kalium.logic.StorageFailure
import com.wire.kalium.logic.data.conversation.ClientId
import com.wire.kalium.logic.data.id.CurrentClientIdProvider
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.di.MapperProvider
import com.wire.kalium.logic.data.id.CurrentClientIdProvider
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.flatMap
import com.wire.kalium.logic.functional.fold
import com.wire.kalium.logic.functional.map
import com.wire.kalium.logic.wrapApiRequest
import com.wire.kalium.logic.wrapStorageRequest
import com.wire.kalium.network.api.base.authenticated.notification.NotificationApi
import com.wire.kalium.network.api.authenticated.notification.NotificationResponse
import com.wire.kalium.network.api.base.authenticated.notification.NotificationApi
import com.wire.kalium.network.api.base.authenticated.notification.WebSocketEvent
import com.wire.kalium.network.utils.NetworkResponse
import com.wire.kalium.network.utils.isSuccessful
import com.wire.kalium.persistence.dao.MetadataDAO
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.flattenConcat
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.isActive
import kotlinx.serialization.json.Json
import kotlin.coroutines.coroutineContext
Expand Down Expand Up @@ -104,26 +101,31 @@ class EventDataSource(
currentClientId().flatMap { clientId -> liveEventsFlow(clientId) }

private suspend fun liveEventsFlow(clientId: ClientId): Either<NetworkFailure, Flow<WebSocketEvent<EventEnvelope>>> =
wrapApiRequest { notificationApi.listenToLiveEvents(clientId.value) }.map {
it.map { webSocketEvent ->
when (webSocketEvent) {
is WebSocketEvent.Open -> {
flowOf(WebSocketEvent.Open())
}

is WebSocketEvent.NonBinaryPayloadReceived -> {
flowOf(WebSocketEvent.NonBinaryPayloadReceived<EventEnvelope>(webSocketEvent.payload))
}

is WebSocketEvent.Close -> {
flowOf(WebSocketEvent.Close(webSocketEvent.cause))
}

is WebSocketEvent.BinaryPayloadReceived -> {
eventMapper.fromDTO(webSocketEvent.payload, true).asFlow().map { WebSocketEvent.BinaryPayloadReceived(it) }
wrapApiRequest { notificationApi.listenToLiveEvents(clientId.value) }.map { webSocketEventFlow ->
flow {
webSocketEventFlow.collect { webSocketEvent ->
when (webSocketEvent) {
is WebSocketEvent.Open -> {
emit(WebSocketEvent.Open())
}

is WebSocketEvent.NonBinaryPayloadReceived -> {
emit(WebSocketEvent.NonBinaryPayloadReceived(webSocketEvent.payload))
}

is WebSocketEvent.Close -> {
emit(WebSocketEvent.Close(webSocketEvent.cause))
}

is WebSocketEvent.BinaryPayloadReceived -> {
val events = eventMapper.fromDTO(webSocketEvent.payload, true)
events.forEach { eventEnvelope ->
emit(WebSocketEvent.BinaryPayloadReceived(eventEnvelope))
}
}
}
}
}.flattenConcat()
}
}

private suspend fun pendingEventsFlow(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ import com.wire.kalium.logic.feature.message.composite.SendButtonActionMessageUs
import com.wire.kalium.logic.feature.message.composite.SendButtonMessageUseCase
import com.wire.kalium.logic.feature.message.confirmation.ConfirmationDeliveryHandler
import com.wire.kalium.logic.feature.message.confirmation.ConfirmationDeliveryHandlerImpl
import com.wire.kalium.logic.feature.message.confirmation.SendDeliverSignalUseCase
import com.wire.kalium.logic.feature.message.confirmation.SendDeliverSignalUseCaseImpl
import com.wire.kalium.logic.feature.message.draft.GetMessageDraftUseCase
import com.wire.kalium.logic.feature.message.draft.GetMessageDraftUseCaseImpl
import com.wire.kalium.logic.feature.message.draft.RemoveMessageDraftUseCase
Expand Down Expand Up @@ -177,12 +179,17 @@ class MessageScope internal constructor(
kaliumLogger = kaliumLogger
)

internal val confirmationDeliveryHandler: ConfirmationDeliveryHandler = ConfirmationDeliveryHandlerImpl(
syncManager = syncManager,
private val sendDeliverSignalUseCase: SendDeliverSignalUseCase = SendDeliverSignalUseCaseImpl(
selfUserId = selfUserId,
messageSender = messageSender,
currentClientIdProvider = currentClientIdProvider,
kaliumLogger = kaliumLogger
)

internal val confirmationDeliveryHandler: ConfirmationDeliveryHandler = ConfirmationDeliveryHandlerImpl(
syncManager = syncManager,
conversationRepository = conversationRepository,
messageSender = messageSender,
sendDeliverSignalUseCase = sendDeliverSignalUseCase,
kaliumLogger = kaliumLogger,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,16 @@
*/
package com.wire.kalium.logic.feature.message.confirmation

import com.benasher44.uuid.uuid4
import co.touchlab.stately.collections.ConcurrentMutableMap
import com.wire.kalium.logger.KaliumLogLevel
import com.wire.kalium.logger.KaliumLogger
import com.wire.kalium.logger.obfuscateId
import com.wire.kalium.logic.data.conversation.Conversation
import com.wire.kalium.logic.data.conversation.ConversationRepository
import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.id.CurrentClientIdProvider
import com.wire.kalium.logic.data.id.MessageId
import com.wire.kalium.logic.data.message.Message
import com.wire.kalium.logic.data.message.MessageContent
import com.wire.kalium.logic.data.message.receipt.ReceiptType
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.feature.message.MessageSender
import com.wire.kalium.logic.functional.flatMap
import com.wire.kalium.logic.functional.fold
import com.wire.kalium.logic.functional.onSuccess
import com.wire.kalium.logic.functional.right
import com.wire.kalium.logic.logStructuredJson
import com.wire.kalium.logic.sync.SyncManager
Expand All @@ -42,9 +36,6 @@ import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.datetime.Clock

/**
* Internal: Handles the send of delivery confirmation of messages.
Expand All @@ -54,23 +45,20 @@ internal interface ConfirmationDeliveryHandler {
suspend fun sendPendingConfirmations()
}

@Suppress("LongParameterList")
internal class ConfirmationDeliveryHandlerImpl(
private val syncManager: SyncManager,
private val selfUserId: UserId,
private val currentClientIdProvider: CurrentClientIdProvider,
private val conversationRepository: ConversationRepository,
private val messageSender: MessageSender,
private val pendingConfirmationMessages: MutableMap<ConversationId, MutableSet<String>> = mutableMapOf(),
kaliumLogger: KaliumLogger,
private val sendDeliverSignalUseCase: SendDeliverSignalUseCase,
private val pendingConfirmationMessages: ConcurrentMutableMap<ConversationId, MutableSet<String>> =
ConcurrentMutableMap(),
kaliumLogger: KaliumLogger
) : ConfirmationDeliveryHandler {

private val kaliumLogger = kaliumLogger.withTextTag("ConfirmationDeliveryHandler")
private val holder = MutableSharedFlow<Unit>(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
private val mutex = Mutex()

override suspend fun enqueueConfirmationDelivery(conversationId: ConversationId, messageId: String) = mutex.withLock {
val conversationMessages = pendingConfirmationMessages[conversationId] ?: mutableSetOf()
override suspend fun enqueueConfirmationDelivery(conversationId: ConversationId, messageId: String) {
val conversationMessages = pendingConfirmationMessages.computeIfAbsent(conversationId) { mutableSetOf() }
val isNewMessage = conversationMessages.add(messageId)
if (isNewMessage) {
kaliumLogger.logStructuredJson(
Expand All @@ -82,7 +70,6 @@ internal class ConfirmationDeliveryHandlerImpl(
"queueCount" to pendingConfirmationMessages.size
)
)
pendingConfirmationMessages[conversationId] = conversationMessages
holder.emit(Unit)
}
}
Expand All @@ -91,71 +78,40 @@ internal class ConfirmationDeliveryHandlerImpl(
override suspend fun sendPendingConfirmations() {
holder.debounce(DEBOUNCE_SEND_CONFIRMATION_TIME).collectLatest {
syncManager.waitUntilLive()
mutex.withLock {
kaliumLogger.d("Started collecting pending messages for delivery confirmation")
with(pendingConfirmationMessages.iterator()) {
forEach { (conversationId, messages) ->
conversationRepository.observeConversationById(conversationId).first().flatMap { conversation ->
if (conversation.type == Conversation.Type.ONE_ON_ONE) {
sendDeliveredSignal(
conversation = conversation,
messages = messages.toList()
)
} else {
kaliumLogger.logStructuredJson(
level = KaliumLogLevel.DEBUG,
leadingMessage = "Skipping group conversation: ${conversation?.id?.toLogString()}",
jsonStringKeyValues = mapOf(
"conversationId" to conversation?.id?.toLogString(),
"messages" to messages.joinToString { it.obfuscateId() },
"messageCount" to messages.size
)
)
kaliumLogger.d("Started collecting pending messages for delivery confirmation")
val messagesToSend = pendingConfirmationMessages.block { it.toMap() }
messagesToSend.forEach { (conversationId, messages) ->
conversationRepository.observeConversationById(conversationId).first().flatMap { conversation ->
if (conversation.type == Conversation.Type.ONE_ON_ONE) {
sendDeliverSignalUseCase(
conversation = conversation,
messages = messages.toList()
).onSuccess {
pendingConfirmationMessages.block {
val currentMessages = it[conversationId]
if (currentMessages != null) {
currentMessages.removeAll(messages.toSet())
if (currentMessages.isEmpty()) {
it.remove(conversationId)
}
}
}
remove() // safely clean the entry [conversationId to messages]
Unit.right()
}
} else {
kaliumLogger.logStructuredJson(
level = KaliumLogLevel.DEBUG,
leadingMessage = "Skipping group conversation: ${conversation.id.toLogString()}",
jsonStringKeyValues = mapOf(
"conversationId" to conversation.id.toLogString(),
"messages" to messages.joinToString { it.obfuscateId() },
"messageCount" to messages.size
)
)
}
Unit.right()
}
}
}
}

private suspend fun sendDeliveredSignal(conversation: Conversation, messages: List<MessageId>) {
currentClientIdProvider().flatMap { currentClientId ->
val message = Message.Signaling(
id = uuid4().toString(),
content = MessageContent.Receipt(ReceiptType.DELIVERED, messages),
conversationId = conversation.id,
date = Clock.System.now(),
senderUserId = selfUserId,
senderClientId = currentClientId,
status = Message.Status.Pending,
isSelfMessage = true,
expirationData = null
)
messageSender.sendMessage(message).fold({ error ->
kaliumLogger.logStructuredJson(
level = KaliumLogLevel.ERROR,
leadingMessage = "Error while sending delivery confirmation for ${conversation.id.toLogString()}",
jsonStringKeyValues = mapOf(
"conversationId" to conversation.id.toLogString(),
"messages" to messages.joinToString { it.obfuscateId() },
"error" to error.toString()
)
)
}, {
kaliumLogger.logStructuredJson(
level = KaliumLogLevel.DEBUG,
leadingMessage = "Delivery confirmation sent for ${conversation.id.toLogString()} and message count: ${messages.size}",
jsonStringKeyValues = mapOf(
"conversationId" to conversation.id.toLogString(),
"messages" to messages.joinToString { it.obfuscateId() },
"messageCount" to messages.size
)
)
})
Unit.right()
kaliumLogger.d("Finished collecting pending messages for delivery confirmation")
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Wire
* Copyright (C) 2024 Wire Swiss GmbH
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
*/
package com.wire.kalium.logic.feature.message.confirmation

import com.benasher44.uuid.uuid4
import com.wire.kalium.logger.KaliumLogLevel
import com.wire.kalium.logger.KaliumLogger
import com.wire.kalium.logger.obfuscateId
import com.wire.kalium.logic.CoreFailure
import com.wire.kalium.logic.data.conversation.Conversation
import com.wire.kalium.logic.data.id.CurrentClientIdProvider
import com.wire.kalium.logic.data.id.MessageId
import com.wire.kalium.logic.data.message.Message
import com.wire.kalium.logic.data.message.MessageContent
import com.wire.kalium.logic.data.message.receipt.ReceiptType
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.feature.message.MessageSender
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.flatMap
import com.wire.kalium.logic.functional.onFailure
import com.wire.kalium.logic.functional.onSuccess
import com.wire.kalium.logic.logStructuredJson
import kotlinx.datetime.Clock

/**
* Use case for sending a delivery confirmation signal for a list of messages in a conversation.
*/
interface SendDeliverSignalUseCase {
suspend operator fun invoke(conversation: Conversation, messages: List<MessageId>): Either<CoreFailure, Unit>
}

internal class SendDeliverSignalUseCaseImpl(
private val selfUserId: UserId,
private val messageSender: MessageSender,
private val currentClientIdProvider: CurrentClientIdProvider,
private val kaliumLogger: KaliumLogger
) : SendDeliverSignalUseCase {
override suspend fun invoke(
conversation: Conversation,
messages: List<MessageId>
): Either<CoreFailure, Unit> = currentClientIdProvider()
.flatMap { currentClientId ->
val message = Message.Signaling(
id = uuid4().toString(),
content = MessageContent.Receipt(ReceiptType.DELIVERED, messages),
conversationId = conversation.id,
date = Clock.System.now(),
senderUserId = selfUserId,
senderClientId = currentClientId,
status = Message.Status.Pending,
isSelfMessage = true,
expirationData = null
)
messageSender.sendMessage(message)
.onFailure { error ->
kaliumLogger.logStructuredJson(
level = KaliumLogLevel.ERROR,
leadingMessage = "Error while sending delivery confirmation for ${conversation.id.toLogString()}",
jsonStringKeyValues = mapOf(
"conversationId" to conversation.id.toLogString(),
"messages" to messages.joinToString { it.obfuscateId() },
"error" to error.toString()
)
)
}
.onSuccess {
kaliumLogger.logStructuredJson(
level = KaliumLogLevel.DEBUG,
leadingMessage = "Delivery confirmation sent for ${conversation.id.toLogString()}" +
" and message count: ${messages.size}",
jsonStringKeyValues = mapOf(
"conversationId" to conversation.id.toLogString(),
"messages" to messages.joinToString { it.obfuscateId() },
"messageCount" to messages.size
)
)
}
}
}
Loading

0 comments on commit 0bc15da

Please sign in to comment.