Skip to content

Commit

Permalink
feat: Reduce DB reads needed during call [WPB-10067] (#2904)
Browse files Browse the repository at this point in the history
* feat: reduce DB reads needed during call

* Not ready yet

* Ready for review

* Code style fixes

* Fixed tests

* Fixed small true-false mistake

* Fixed mapper test

* Fixed video streaming issue

* Fixed code-style

* Fixed tests

* More tests

---------

Co-authored-by: Yamil Medina <[email protected]>
  • Loading branch information
borichellow and yamilmedina authored Aug 2, 2024
1 parent 2829746 commit b919b24
Show file tree
Hide file tree
Showing 19 changed files with 393 additions and 372 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ package com.wire.kalium.logic.data.call

import com.wire.kalium.logic.data.conversation.Conversation
import com.wire.kalium.logic.data.id.ConversationId
import com.wire.kalium.logic.data.user.OtherUserMinimized
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.data.user.type.UserType

data class CallMetadataProfile(
val data: Map<ConversationId, CallMetadata>
Expand All @@ -37,7 +40,26 @@ data class CallMetadata(
val callerTeamName: String?,
val establishedTime: String? = null,
val callStatus: CallStatus,
val participants: List<Participant> = emptyList(),
val participants: List<ParticipantMinimized> = emptyList(),
val maxParticipants: Int = 0, // Was used for tracking
val protocol: Conversation.ProtocolInfo
)
val protocol: Conversation.ProtocolInfo,
val activeSpeakers: Map<UserId, List<String>> = mapOf(),
val users: List<OtherUserMinimized> = listOf()
) {
fun getFullParticipants(): List<Participant> = participants.map { participant ->
val user = users.firstOrNull { it.id == participant.userId }
val isSpeaking = (activeSpeakers[participant.id]?.contains(participant.clientId) ?: false) && !participant.isMuted
Participant(
id = participant.id,
clientId = participant.clientId,
name = user?.name,
isMuted = participant.isMuted,
isCameraOn = participant.isCameraOn,
isSpeaking = isSpeaking,
isSharingScreen = participant.isSharingScreen,
hasEstablishedAudio = participant.hasEstablishedAudio,
avatarAssetId = user?.completePicture,
userType = user?.userType ?: UserType.NONE
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,13 @@ data class Participant(
val avatarAssetId: UserAssetId? = null,
val userType: UserType = UserType.NONE,
)

data class ParticipantMinimized(
val id: QualifiedID,
val userId: QualifiedID,
val clientId: String,
val isMuted: Boolean,
val isCameraOn: Boolean,
val isSharingScreen: Boolean = false,
val hasEstablishedAudio: Boolean,
)
Original file line number Diff line number Diff line change
Expand Up @@ -531,8 +531,7 @@ class CallManagerImpl internal constructor(
val onParticipantListChanged = OnParticipantListChanged(
callRepository = callRepository,
qualifiedIdMapper = qualifiedIdMapper,
participantMapper = ParticipantMapperImpl(videoStateChecker, callMapper),
userRepository = userRepository,
participantMapper = ParticipantMapperImpl(videoStateChecker, callMapper, qualifiedIdMapper),
callingScope = scope
).keepingStrongReference()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import com.wire.kalium.calling.types.Handle
import com.wire.kalium.logic.data.call.CallActiveSpeakers
import com.wire.kalium.logic.data.call.CallRepository
import com.wire.kalium.logic.data.id.QualifiedIdMapper
import kotlinx.serialization.decodeFromString
import kotlinx.serialization.json.Json

class OnActiveSpeakers(
Expand All @@ -36,9 +35,13 @@ class OnActiveSpeakers(
val callActiveSpeakers = Json.decodeFromString<CallActiveSpeakers>(data)
val conversationIdWithDomain = qualifiedIdMapper.fromStringToQualifiedID(conversationId)

val onlyActiveSpeakers = callActiveSpeakers.activeSpeakers.filter { activeSpeaker ->
activeSpeaker.audioLevel > 0 || activeSpeaker.audioLevelNow > 0
}.groupBy({ qualifiedIdMapper.fromStringToQualifiedID(it.userId) }) { it.clientId }

callRepository.updateParticipantsActiveSpeaker(
conversationId = conversationIdWithDomain,
activeSpeakers = callActiveSpeakers
activeSpeakers = onlyActiveSpeakers
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,9 @@ import com.wire.kalium.logger.obfuscateId
import com.wire.kalium.logic.callingLogger
import com.wire.kalium.logic.data.call.CallParticipants
import com.wire.kalium.logic.data.call.CallRepository
import com.wire.kalium.logic.data.call.Participant
import com.wire.kalium.logic.data.call.ParticipantMinimized
import com.wire.kalium.logic.data.call.mapper.ParticipantMapper
import com.wire.kalium.logic.data.id.QualifiedIdMapper
import com.wire.kalium.logic.data.user.UserRepository
import com.wire.kalium.logic.functional.onFailure
import com.wire.kalium.logic.functional.onSuccess
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import kotlinx.serialization.json.Json
Expand All @@ -39,7 +36,6 @@ class OnParticipantListChanged internal constructor(
private val callRepository: CallRepository,
private val qualifiedIdMapper: QualifiedIdMapper,
private val participantMapper: ParticipantMapper,
private val userRepository: UserRepository,
private val callingScope: CoroutineScope
) : ParticipantChangedHandler {

Expand All @@ -48,22 +44,11 @@ class OnParticipantListChanged internal constructor(
val participantsChange = Json.decodeFromString<CallParticipants>(data)

callingScope.launch {
val participants = mutableListOf<Participant>()
val participants = mutableListOf<ParticipantMinimized>()
val conversationIdWithDomain = qualifiedIdMapper.fromStringToQualifiedID(remoteConversationId)

participantsChange.members.map { member ->
val participant = participantMapper.fromCallMemberToParticipant(member)
val userId = qualifiedIdMapper.fromStringToQualifiedID(member.userId)
userRepository.getKnownUserMinimized(userId).onSuccess {
val updatedParticipant = participant.copy(
name = it.name,
avatarAssetId = it.completePicture,
userType = it.userType
)
participants.add(updatedParticipant)
}.onFailure {
participants.add(participant)
}
participants.add(participantMapper.fromCallMemberToParticipantMinimized(member))
}

callRepository.updateCallParticipants(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import com.wire.kalium.logger.obfuscateDomain
import com.wire.kalium.logger.obfuscateId
import com.wire.kalium.logic.CoreFailure
import com.wire.kalium.logic.callingLogger
import com.wire.kalium.logic.data.call.mapper.ActiveSpeakerMapper
import com.wire.kalium.logic.data.call.mapper.CallMapper
import com.wire.kalium.logic.data.client.MLSClientProvider
import com.wire.kalium.logic.data.conversation.ClientId
Expand All @@ -51,9 +50,9 @@ import com.wire.kalium.logic.data.message.PersistMessageUseCase
import com.wire.kalium.logic.data.team.TeamRepository
import com.wire.kalium.logic.data.user.UserId
import com.wire.kalium.logic.data.user.UserRepository
import com.wire.kalium.logic.di.MapperProvider
import com.wire.kalium.logic.functional.Either
import com.wire.kalium.logic.functional.flatMap
import com.wire.kalium.logic.functional.getOrElse
import com.wire.kalium.logic.functional.getOrNull
import com.wire.kalium.logic.functional.map
import com.wire.kalium.logic.functional.onFailure
Expand Down Expand Up @@ -119,8 +118,8 @@ interface CallRepository {
fun updateIsMutedById(conversationId: ConversationId, isMuted: Boolean)
fun updateIsCbrEnabled(isCbrEnabled: Boolean)
fun updateIsCameraOnById(conversationId: ConversationId, isCameraOn: Boolean)
fun updateCallParticipants(conversationId: ConversationId, participants: List<Participant>)
fun updateParticipantsActiveSpeaker(conversationId: ConversationId, activeSpeakers: CallActiveSpeakers)
suspend fun updateCallParticipants(conversationId: ConversationId, participants: List<ParticipantMinimized>)
fun updateParticipantsActiveSpeaker(conversationId: ConversationId, activeSpeakers: Map<UserId, List<String>>)
suspend fun getLastClosedCallCreatedByConversationId(conversationId: ConversationId): Flow<String?>
suspend fun updateOpenCallsToClosedStatus()
suspend fun persistMissedCall(conversationId: ConversationId)
Expand Down Expand Up @@ -151,7 +150,6 @@ internal class CallDataSource(
private val leaveSubconversation: LeaveSubconversationUseCase,
private val callMapper: CallMapper,
private val federatedIdMapper: FederatedIdMapper,
private val activeSpeakerMapper: ActiveSpeakerMapper = MapperProvider.activeSpeakerMapper(),
kaliumDispatchers: KaliumDispatcher = KaliumDispatcherImpl
) : CallRepository {

Expand Down Expand Up @@ -224,7 +222,8 @@ internal class CallDataSource(
isCbrEnabled = isCbrEnabled,
establishedTime = null,
callStatus = status,
protocol = conversation.conversation.protocol
protocol = conversation.conversation.protocol,
activeSpeakers = mapOf()
)

val isCallInCurrentSession = _callMetadataProfile.value.data.containsKey(conversationId)
Expand Down Expand Up @@ -407,7 +406,8 @@ internal class CallDataSource(
}
}

override fun updateCallParticipants(conversationId: ConversationId, participants: List<Participant>) {
@Suppress("NestedBlockDepth")
override suspend fun updateCallParticipants(conversationId: ConversationId, participants: List<ParticipantMinimized>) {
val callMetadataProfile = _callMetadataProfile.value
callMetadataProfile.data[conversationId]?.let { call ->
if (call.participants != participants) {
Expand All @@ -417,10 +417,24 @@ internal class CallDataSource(
" with size of: ${participants.size}"
)

val currentParticipantIds = call.participants.map { it.userId }.toSet()
val newParticipantIds = participants.map { it.userId }.toSet()

val updatedUsers = call.users.toMutableList()

newParticipantIds.minus(currentParticipantIds).let { missedUserIds ->
if (missedUserIds.isNotEmpty())
updatedUsers.addAll(
userRepository.getUsersMinimizedByQualifiedIDs(missedUserIds.toList()).getOrElse { listOf() }
)

}

val updatedCallMetadata = callMetadataProfile.data.toMutableMap().apply {
this[conversationId] = call.copy(
participants = participants,
maxParticipants = max(call.maxParticipants, participants.size + 1)
maxParticipants = max(call.maxParticipants, participants.size + 1),
users = updatedUsers
)
}

Expand All @@ -441,14 +455,14 @@ internal class CallDataSource(
}
}

private fun clearStaleParticipantTimeout(participant: Participant) {
private fun clearStaleParticipantTimeout(participant: ParticipantMinimized) {
callingLogger.i("Clear stale participant timer")
val qualifiedClient = QualifiedClientID(ClientId(participant.clientId), participant.id)
staleParticipantJobs.remove(qualifiedClient)?.cancel()
}

private fun removeStaleParticipantAfterTimeout(
participant: Participant,
participant: ParticipantMinimized,
conversationId: ConversationId
) {
val qualifiedClient = QualifiedClientID(ClientId(participant.clientId), participant.id)
Expand All @@ -469,27 +483,19 @@ internal class CallDataSource(
}
}

override fun updateParticipantsActiveSpeaker(conversationId: ConversationId, activeSpeakers: CallActiveSpeakers) {
override fun updateParticipantsActiveSpeaker(conversationId: ConversationId, activeSpeakers: Map<UserId, List<String>>) {
val callMetadataProfile = _callMetadataProfile.value

callMetadataProfile.data[conversationId]?.let { call ->
callingLogger.i(
"updateActiveSpeakers() -" +
" conversationId: ${conversationId.value.obfuscateId()}" +
"@${conversationId.domain.obfuscateDomain()}" +
"with size of: ${activeSpeakers.activeSpeakers.size}"
)

val updatedParticipants = activeSpeakerMapper.mapParticipantsActiveSpeaker(
participants = call.participants,
activeSpeakers = activeSpeakers
"with size of: ${activeSpeakers.size}"
)

val updatedCallMetadata = callMetadataProfile.data.toMutableMap().apply {
this[conversationId] = call.copy(
participants = updatedParticipants,
maxParticipants = max(call.maxParticipants, updatedParticipants.size + 1)
)
this[conversationId] = call.copy(activeSpeakers = activeSpeakers)
}

_callMetadataProfile.value = callMetadataProfile.copy(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ class CallMapperImpl(
callerName = metadata?.callerName,
callerTeamName = metadata?.callerTeamName,
establishedTime = metadata?.establishedTime,
participants = metadata?.participants ?: emptyList(),
participants = metadata?.getFullParticipants() ?: emptyList(),
maxParticipants = metadata?.maxParticipants ?: 0
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,29 @@
package com.wire.kalium.logic.data.call.mapper

import com.wire.kalium.logic.data.call.CallMember
import com.wire.kalium.logic.data.call.Participant
import com.wire.kalium.logic.data.call.ParticipantMinimized
import com.wire.kalium.logic.data.call.VideoStateChecker
import com.wire.kalium.logic.data.id.QualifiedID
import com.wire.kalium.logic.data.id.QualifiedIdMapper

interface ParticipantMapper {
fun fromCallMemberToParticipant(member: CallMember): Participant
fun fromCallMemberToParticipantMinimized(member: CallMember): ParticipantMinimized
}

class ParticipantMapperImpl(
private val videoStateChecker: VideoStateChecker,
private val callMapper: CallMapper
private val callMapper: CallMapper,
private val qualifiedIdMapper: QualifiedIdMapper
) : ParticipantMapper {

override fun fromCallMemberToParticipant(member: CallMember): Participant = with(member) {
override fun fromCallMemberToParticipantMinimized(member: CallMember): ParticipantMinimized = with(member) {
val videoState = callMapper.fromIntToCallingVideoState(vrecv)
val isCameraOn = videoStateChecker.isCameraOn(videoState)
val isSharingScreen = videoStateChecker.isSharingScreen(videoState)

Participant(
id = QualifiedID(
value = userId.removeDomain(),
domain = userId.getDomain()
),
ParticipantMinimized(
id = QualifiedID(value = member.userId.removeDomain(), domain = member.userId.getDomain()),
userId = qualifiedIdMapper.fromStringToQualifiedID(member.userId),
clientId = clientId,
isMuted = isMuted == 1,
isCameraOn = isCameraOn,
Expand Down
Loading

0 comments on commit b919b24

Please sign in to comment.