From 4667ea74d19385d68a3f91b1fe9a1134b7de6cbd Mon Sep 17 00:00:00 2001 From: Yamil Medina Date: Tue, 5 Nov 2024 13:39:58 -0300 Subject: [PATCH] feat: add metrics for sync engine slow and incremental (WPB-11199) (#3086) * feat: add base for metrics, implement in slow sync * feat: add base for metrics, implement in slow sync * feat: add metrics for slow sync and improve json log * feat: incremental sync logs * feat: incremental sync logs * feat: incremental sync logs, improv for foreground * feat: refactor some naming to clarify * feat: refactor some naming to clarify * feat: docs --- .../kalium/logic/sync/SyncManagerLogger.kt | 99 +++++++++++++++++++ .../incremental/IncrementalSyncManager.kt | 23 ++++- .../kalium/logic/sync/slow/SlowSyncManager.kt | 5 + 3 files changed, 123 insertions(+), 4 deletions(-) create mode 100644 logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/SyncManagerLogger.kt diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/SyncManagerLogger.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/SyncManagerLogger.kt new file mode 100644 index 0000000000..145a813480 --- /dev/null +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/SyncManagerLogger.kt @@ -0,0 +1,99 @@ +/* + * Wire + * Copyright (C) 2024 Wire Swiss GmbH + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see http://www.gnu.org/licenses/. + */ +package com.wire.kalium.logic.sync + +import com.benasher44.uuid.uuid4 +import com.wire.kalium.logger.KaliumLogLevel +import com.wire.kalium.logger.KaliumLogger +import com.wire.kalium.logic.logStructuredJson +import kotlinx.datetime.Clock +import kotlinx.datetime.Instant +import kotlin.time.Duration + +/** + * Logs the sync process by providing structured logs. + * It logs the sync process start and completion with the syncId as a unique identifier. + */ +internal class SyncManagerLogger( + private val logger: KaliumLogger, + private val syncId: String, + private val syncType: SyncType, + private val syncStartedMoment: Instant +) { + + /** + * Logs the sync process start. + */ + fun logSyncStarted() { + logger.withFeatureId(KaliumLogger.Companion.ApplicationFlow.SYNC).logStructuredJson( + level = KaliumLogLevel.INFO, + leadingMessage = "Started sync process", + jsonStringKeyValues = mapOf( + "syncMetadata" to mapOf( + "id" to syncId, + "status" to SyncStatus.STARTED.name, + "type" to syncType.name + ) + ) + ) + } + + /** + * Logs the sync process completion. + * Optionally, it can pass the duration of the sync process, + * useful for incremental sync that can happen between collecting states. + * + * @param duration optional the duration of the sync process. + */ + fun logSyncCompleted(duration: Duration = Clock.System.now() - syncStartedMoment) { + val logMap = mapOf( + "id" to syncId, + "status" to SyncStatus.COMPLETED.name, + "type" to syncType.name, + "performanceData" to mapOf("timeTakenInMillis" to duration.inWholeMilliseconds) + ) + + logger.withFeatureId(KaliumLogger.Companion.ApplicationFlow.SYNC).logStructuredJson( + level = KaliumLogLevel.INFO, + leadingMessage = "Completed sync process", + jsonStringKeyValues = mapOf("syncMetadata" to logMap) + ) + } +} + +internal enum class SyncStatus { + STARTED, + COMPLETED +} + +internal enum class SyncType { + SLOW, + INCREMENTAL +} + +/** + * Provides a new [SyncManagerLogger] instance with the given parameters. + * @param syncType the [SyncType] that will log. + * @param syncId the unique identifier for the sync process. + * @param syncStartedMoment the moment when the sync process started. + */ +internal fun KaliumLogger.provideNewSyncManagerLogger( + syncType: SyncType, + syncId: String = uuid4().toString(), + syncStartedMoment: Instant = Clock.System.now() +) = SyncManagerLogger(this, syncId, syncType, syncStartedMoment) diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/incremental/IncrementalSyncManager.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/incremental/IncrementalSyncManager.kt index 6e7d265400..38e2ec5c2c 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/incremental/IncrementalSyncManager.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/incremental/IncrementalSyncManager.kt @@ -18,6 +18,7 @@ package com.wire.kalium.logic.sync.incremental +import com.benasher44.uuid.uuid4 import com.wire.kalium.logger.KaliumLogger import com.wire.kalium.logger.KaliumLogger.Companion.ApplicationFlow.SYNC import com.wire.kalium.logic.data.event.Event @@ -28,6 +29,8 @@ import com.wire.kalium.logic.data.sync.SlowSyncRepository import com.wire.kalium.logic.data.sync.SlowSyncStatus import com.wire.kalium.logic.kaliumLogger import com.wire.kalium.logic.sync.SyncExceptionHandler +import com.wire.kalium.logic.sync.SyncType +import com.wire.kalium.logic.sync.provideNewSyncManagerLogger import com.wire.kalium.logic.sync.slow.SlowSyncManager import com.wire.kalium.logic.util.ExponentialDurationHelper import com.wire.kalium.logic.util.ExponentialDurationHelperImpl @@ -41,12 +44,15 @@ import kotlinx.coroutines.async import kotlinx.coroutines.cancelChildren import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.flow.cancellable +import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.collectLatest import kotlinx.coroutines.flow.drop import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.runningFold import kotlinx.coroutines.launch import kotlinx.coroutines.selects.select +import kotlinx.datetime.Clock import kotlin.time.Duration import kotlin.time.Duration.Companion.minutes import kotlin.time.Duration.Companion.seconds @@ -181,16 +187,25 @@ internal class IncrementalSyncManager( incrementalSyncWorker .processEventsWhilePolicyAllowsFlow() .cancellable() - .collect { - val newState = when (it) { - EventSource.PENDING -> IncrementalSyncStatus.FetchingPendingEvents + .runningFold(uuid4().toString() to Clock.System.now()) { syncData, eventSource -> + val syncLogger = kaliumLogger.provideNewSyncManagerLogger(SyncType.INCREMENTAL, syncData.first) + val newState = when (eventSource) { + EventSource.PENDING -> { + syncLogger.logSyncStarted() + IncrementalSyncStatus.FetchingPendingEvents + } + EventSource.LIVE -> { + syncLogger.logSyncCompleted(duration = Clock.System.now() - syncData.second) exponentialDurationHelper.reset() IncrementalSyncStatus.Live } } incrementalSyncRepository.updateIncrementalSyncState(newState) - } + + // when the source is LIVE, we need to generate a new syncId since it means the previous one is done + if (eventSource == EventSource.LIVE) uuid4().toString() to Clock.System.now() else syncData + }.collect() incrementalSyncRepository.updateIncrementalSyncState(IncrementalSyncStatus.Pending) logger.i("$TAG IncrementalSync stopped.") } diff --git a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/slow/SlowSyncManager.kt b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/slow/SlowSyncManager.kt index f04c99e814..f3c25780c6 100644 --- a/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/slow/SlowSyncManager.kt +++ b/logic/src/commonMain/kotlin/com/wire/kalium/logic/sync/slow/SlowSyncManager.kt @@ -26,7 +26,9 @@ import com.wire.kalium.logic.data.sync.SlowSyncStatus import com.wire.kalium.logic.functional.combine import com.wire.kalium.logic.kaliumLogger import com.wire.kalium.logic.sync.SyncExceptionHandler +import com.wire.kalium.logic.sync.SyncType import com.wire.kalium.logic.sync.incremental.IncrementalSyncManager +import com.wire.kalium.logic.sync.provideNewSyncManagerLogger import com.wire.kalium.logic.sync.slow.migration.SyncMigrationStepsProvider import com.wire.kalium.logic.sync.slow.migration.steps.SyncMigrationStep import com.wire.kalium.logic.util.ExponentialDurationHelper @@ -187,11 +189,14 @@ internal class SlowSyncManager( } private suspend fun performSlowSync(migrationSteps: List) { + val syncLogger = kaliumLogger.provideNewSyncManagerLogger(SyncType.SLOW) + syncLogger.logSyncStarted() logger.i("Starting SlowSync as all criteria are met and it wasn't performed recently") slowSyncWorker.slowSyncStepsFlow(migrationSteps).cancellable().collect { step -> logger.i("Performing SlowSyncStep $step") slowSyncRepository.updateSlowSyncStatus(SlowSyncStatus.Ongoing(step)) } + syncLogger.logSyncCompleted() logger.i("SlowSync completed. Updating last completion instant") slowSyncRepository.setSlowSyncVersion(CURRENT_VERSION) slowSyncRepository.setLastSlowSyncCompletionInstant(DateTimeUtil.currentInstant())