Skip to content

Commit

Permalink
fix: send calling messages in order [WPB-10051] (#2915) (#2916)
Browse files Browse the repository at this point in the history
Co-authored-by: Vitor Hugo Schwaab <[email protected]>
  • Loading branch information
github-actions[bot] and vitorhugods authored Jul 30, 2024
1 parent 206d8fe commit 7d961c0
Show file tree
Hide file tree
Showing 10 changed files with 611 additions and 324 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import com.wire.kalium.logic.data.message.Message
import com.wire.kalium.logic.data.message.MessageContent
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.data.user.UserRepository
import com.wire.kalium.logic.feature.call.scenario.CallingMessageSender
import com.wire.kalium.logic.feature.call.scenario.OnActiveSpeakers
import com.wire.kalium.logic.feature.call.scenario.OnAnsweredCall
import com.wire.kalium.logic.feature.call.scenario.OnClientsRequest
Expand Down Expand Up @@ -100,9 +101,9 @@ class CallManagerImpl internal constructor(
private val callRepository: CallRepository,
private val userRepository: UserRepository,
private val currentClientIdProvider: CurrentClientIdProvider,
private val selfConversationIdProvider: SelfConversationIdProvider,
selfConversationIdProvider: SelfConversationIdProvider,
private val conversationRepository: ConversationRepository,
private val messageSender: MessageSender,
messageSender: MessageSender,
private val callMapper: CallMapper,
private val federatedIdMapper: FederatedIdMapper,
private val qualifiedIdMapper: QualifiedIdMapper,
Expand All @@ -122,6 +123,14 @@ class CallManagerImpl internal constructor(
private val scope = CoroutineScope(job + kaliumDispatchers.io)
private val deferredHandle: Deferred<Handle> = startHandleAsync()

private val callingMessageSender = CallingMessageSender(
deferredHandle,
calling,
messageSender,
scope,
selfConversationIdProvider
)

private val strongReferences = Collections.synchronizedList(mutableListOf<Any>())
private fun <T : Any> T.keepingStrongReference(): T {
strongReferences.add(this)
Expand Down Expand Up @@ -185,15 +194,11 @@ class CallManagerImpl internal constructor(
}.keepingStrongReference(),
// TODO(refactor): inject all of these CallbackHandlers in class constructor
sendHandler = OnSendOTR(
deferredHandle,
calling,
qualifiedIdMapper,
selfUserId,
selfClientId,
messageSender,
selfConversationIdProvider,
scope,
callMapper
qualifiedIdMapper = qualifiedIdMapper,
selfUserId = selfUserId,
selfClientId = selfClientId,
callMapper = callMapper,
callingMessageSender = callingMessageSender,
).keepingStrongReference(),
sftRequestHandler = OnSFTRequest(deferredHandle, calling, callRepository, scope)
.keepingStrongReference(),
Expand Down Expand Up @@ -511,6 +516,13 @@ class CallManagerImpl internal constructor(
initActiveSpeakersHandler()
initRequestNewEpochHandler()
initSelfUserMuteHandler()
initCallingMessageSender()
}

private fun initCallingMessageSender() {
scope.launch {
callingMessageSender.processQueue()
}
}

private fun initParticipantsHandler() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Wire
* Copyright (C) 2024 Wire Swiss GmbH
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
*/
package com.wire.kalium.logic.feature.call.scenario

import com.sun.jna.Pointer
import com.wire.kalium.logic.data.conversation.ClientId
import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.message.MessageTarget
import com.wire.kalium.logic.data.user.UserId

/**
* Represents the instructions for sending a calling message.
*
* @property context The pointer context for the message (optional).
* @property callHostConversationId The ID of the conversation where the call is taking place.
* @property messageString The content of the message.
* @property avsSelfUserId The self user ID used by AVS.
* @property avsSelfClientId The self client ID used by AVS.
* @property messageTarget The target for sending the message.
*/
data class CallingMessageInstructions(
val context: Pointer?,
val callHostConversationId: ConversationId,
val messageString: String,
val avsSelfUserId: UserId,
val avsSelfClientId: ClientId,
val messageTarget: CallingMessageTarget
)

sealed interface CallingMessageTarget {
val specificTarget: MessageTarget

/**
* Send the message only to other devices of self-user.
*/
data object Self : CallingMessageTarget {
override val specificTarget: MessageTarget
get() = MessageTarget.Conversation()
}

/**
* Send the message to the host conversation.
* Supports ignoring users through the [specificTarget].
*/
data class HostConversation(
override val specificTarget: MessageTarget = MessageTarget.Conversation()
) : CallingMessageTarget
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Wire
* Copyright (C) 2024 Wire Swiss GmbH
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see http://www.gnu.org/licenses/.
*/

package com.wire.kalium.logic.feature.call.scenario

import com.benasher44.uuid.uuid4
import com.sun.jna.Pointer
import com.wire.kalium.calling.Calling
import com.wire.kalium.calling.types.Handle
import com.wire.kalium.logic.CoreFailure
import com.wire.kalium.logic.cache.SelfConversationIdProvider
import com.wire.kalium.logic.callingLogger
import com.wire.kalium.logic.data.conversation.ClientId
import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.message.Message
import com.wire.kalium.logic.data.message.MessageContent
import com.wire.kalium.logic.data.message.MessageTarget
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.feature.message.MessageSender
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.flatMap
import com.wire.kalium.logic.functional.foldToEitherWhileRight
import io.ktor.http.HttpStatusCode
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.launch
import kotlinx.datetime.Clock

internal interface CallingMessageSender {

suspend fun processQueue()

@Suppress("LongParameterList")
fun enqueueSendingOfCallingMessage(
context: Pointer?,
callHostConversationId: ConversationId,
messageString: String?,
avsSelfUserId: UserId,
avsSelfClientId: ClientId,
messageTarget: CallingMessageTarget,
)
}

@Suppress("FunctionNaming")
internal fun CallingMessageSender(
handle: Deferred<Handle>,
calling: Calling,
messageSender: MessageSender,
callingScope: CoroutineScope,
selfConversationIdProvider: SelfConversationIdProvider
) = object : CallingMessageSender {

private val logger = callingLogger.withTextTag("CallingMessageSender")

private val queue = Channel<CallingMessageInstructions>(
capacity = Channel.UNLIMITED,
)

@Suppress("LongParameterList")
override fun enqueueSendingOfCallingMessage(
context: Pointer?,
callHostConversationId: ConversationId,
messageString: String?,
avsSelfUserId: UserId,
avsSelfClientId: ClientId,
messageTarget: CallingMessageTarget,
) {
if (messageString == null) return
callingScope.launch {
queue.send(
CallingMessageInstructions(
context,
callHostConversationId,
messageString,
avsSelfUserId,
avsSelfClientId,
messageTarget,
)
)
}
}

override suspend fun processQueue() {
queue.consumeAsFlow().collect { messageInstructions ->
processInstruction(messageInstructions, selfConversationIdProvider)
}
}

private suspend fun processInstruction(
messageInstructions: CallingMessageInstructions,
selfConversationIdProvider: SelfConversationIdProvider
) {
val target = messageInstructions.messageTarget

val transportConversationIds = when (target) {
is CallingMessageTarget.Self -> {
selfConversationIdProvider()
}

is CallingMessageTarget.HostConversation -> {
Either.Right(listOf(messageInstructions.callHostConversationId))
}
}

val result = transportConversationIds.flatMap { conversations ->
conversations.foldToEitherWhileRight(Unit) { transportConversationId, _ ->
sendCallingMessage(
messageInstructions.callHostConversationId,
messageInstructions.avsSelfUserId,
messageInstructions.avsSelfClientId,
messageInstructions.messageString,
target.specificTarget,
transportConversationId
)
}
}

val (code, message) = when (result) {
is Either.Right -> {
logger.i("Notifying AVS - Success sending message")
HttpStatusCode.OK.value to ""
}

is Either.Left -> {
logger.i("Notifying AVS - Error sending message")
HttpStatusCode.BadRequest.value to "Couldn't send Calling Message"
}
}
calling.wcall_resp(
inst = handle.await(),
status = code,
reason = message,
arg = messageInstructions.context
)
}

@Suppress("LongParameterList")
private suspend fun sendCallingMessage(
callHostConversationId: ConversationId,
userId: UserId,
clientId: ClientId,
data: String,
messageTarget: MessageTarget,
transportConversationId: ConversationId
): Either<CoreFailure, Unit> {
val messageContent = MessageContent.Calling(data, callHostConversationId)
val date = Clock.System.now()
val message = Message.Signaling(
id = uuid4().toString(),
content = messageContent,
conversationId = transportConversationId,
date = date,
senderUserId = userId,
senderClientId = clientId,
status = Message.Status.Sent,
isSelfMessage = true,
expirationData = null
)
return messageSender.sendMessage(message, messageTarget)
}
}
Loading

0 comments on commit 7d961c0

Please sign in to comment.