From ead44a722bedcd6df072e2860f9704241215c751 Mon Sep 17 00:00:00 2001 From: Toomas Vahter Date: Mon, 29 Jul 2024 16:58:08 +0300 Subject: [PATCH] Fix a rare issue with incorrect message order when sending messages offline (#3316) --- CHANGELOG.md | 2 + Sources/StreamChat/ChatClient.swift | 16 ++ .../Database/DTOs/QueuedRequestDTO.swift | 4 + .../StreamChat/Database/DatabaseSession.swift | 1 + .../Repositories/MessageRepository.swift | 46 +++++- .../OfflineRequestsRepository.swift | 144 ++++++++++++++---- .../Repositories/SyncRepository.swift | 10 +- Sources/StreamChat/StateLayer/Chat.swift | 17 --- .../Workers/Background/MessageSender.swift | 79 ++++++---- .../Database/DatabaseSession_Mock.swift | 4 + .../SpyPattern/Spy/APIClient_Spy.swift | 11 ++ .../OfflineRequestsRepository_Tests.swift | 54 ++++++- .../Background/MessageSender_Tests.swift | 82 ++++++++++ 13 files changed, 386 insertions(+), 84 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f5765b4f088..da0de47d694 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). - Improve performance of `ChatChannel` and `ChatMessage` equality checks [#3335](https://github.com/GetStream/stream-chat-swift/pull/3335) ### ✅ Added - Expose `MissingConnectionId` + `InvalidURL` + `InvalidJSON` Errors [#3332](https://github.com/GetStream/stream-chat-swift/pull/3332) +### 🐞 Fixed +- Fix a rare issue with incorrect message order when sending multiple messages while offline [#3316](https://github.com/GetStream/stream-chat-swift/issues/3316) # [4.60.0](https://github.com/GetStream/stream-chat-swift/releases/tag/4.60.0) _July 18, 2024_ diff --git a/Sources/StreamChat/ChatClient.swift b/Sources/StreamChat/ChatClient.swift index 3a91fe5ec46..c38b82bd560 100644 --- a/Sources/StreamChat/ChatClient.swift +++ b/Sources/StreamChat/ChatClient.swift @@ -660,6 +660,7 @@ extension ChatClient: ConnectionStateDelegate { } ) connectionRecoveryHandler?.webSocketClient(client, didUpdateConnectionState: state) + try? backgroundWorker(of: MessageSender.self).didUpdateConnectionState(state) } } @@ -674,6 +675,21 @@ extension ChatClient: ConnectionDetailsProviderDelegate { } } +extension ChatClient { + func backgroundWorker(of type: T.Type) throws -> T { + if let worker = backgroundWorkers.compactMap({ $0 as? T }).first { + return worker + } + if currentUserId == nil { + throw ClientError.CurrentUserDoesNotExist() + } + if !config.isClientInActiveMode { + throw ClientError.ClientIsNotInActiveMode() + } + throw ClientError("Background worker of type \(T.self) is not set up") + } +} + extension ClientError { public final class MissingLocalStorageURL: ClientError { override public var localizedDescription: String { "The URL provided in ChatClientConfig is `nil`." } diff --git a/Sources/StreamChat/Database/DTOs/QueuedRequestDTO.swift b/Sources/StreamChat/Database/DTOs/QueuedRequestDTO.swift index 16531e3eb07..1c6fe0dc214 100644 --- a/Sources/StreamChat/Database/DTOs/QueuedRequestDTO.swift +++ b/Sources/StreamChat/Database/DTOs/QueuedRequestDTO.swift @@ -39,6 +39,10 @@ class QueuedRequestDTO: NSManagedObject { } extension NSManagedObjectContext: QueuedRequestDatabaseSession { + func allQueuedRequests() -> [QueuedRequestDTO] { + QueuedRequestDTO.loadAllPendingRequests(context: self) + } + func deleteQueuedRequest(id: String) { guard let request = QueuedRequestDTO.load(id: id, context: self) else { return } delete(request) diff --git a/Sources/StreamChat/Database/DatabaseSession.swift b/Sources/StreamChat/Database/DatabaseSession.swift index efbeb3994a0..6fcc6401993 100644 --- a/Sources/StreamChat/Database/DatabaseSession.swift +++ b/Sources/StreamChat/Database/DatabaseSession.swift @@ -393,6 +393,7 @@ protocol AttachmentDatabaseSession { } protocol QueuedRequestDatabaseSession { + func allQueuedRequests() -> [QueuedRequestDTO] func deleteQueuedRequest(id: String) } diff --git a/Sources/StreamChat/Repositories/MessageRepository.swift b/Sources/StreamChat/Repositories/MessageRepository.swift index 77593ead67d..60f9046a047 100644 --- a/Sources/StreamChat/Repositories/MessageRepository.swift +++ b/Sources/StreamChat/Repositories/MessageRepository.swift @@ -88,6 +88,47 @@ class MessageRepository { }) } } + + /// Marks the message's local status to failed and adds it to the offline retry which sends the message when connection comes back. + func scheduleOfflineRetry(for messageId: MessageId, completion: @escaping (Result) -> Void) { + var dataEndpoint: DataEndpoint! + var messageModel: ChatMessage! + database.write { session in + guard let dto = session.message(id: messageId) else { + throw MessageRepositoryError.messageDoesNotExist + } + guard let channelDTO = dto.channel, let cid = try? ChannelId(cid: channelDTO.cid) else { + throw MessageRepositoryError.messageDoesNotHaveValidChannel + } + + // Send the message to offline handling + let requestBody = dto.asRequestBody() as MessageRequestBody + let endpoint: Endpoint = .sendMessage( + cid: cid, + messagePayload: requestBody, + skipPush: dto.skipPush, + skipEnrichUrl: dto.skipEnrichUrl + ) + dataEndpoint = endpoint.withDataResponse + + // Mark it as failed + dto.localMessageState = .sendingFailed + messageModel = try dto.asModel() + } completion: { [weak self] writeError in + if let writeError { + switch writeError { + case let repositoryError as MessageRepositoryError: + completion(.failure(repositoryError)) + default: + completion(.failure(.failedToSendMessage(writeError))) + } + return + } + // Offline repository will send it when connection comes back on, until then we show the message as failed + self?.apiClient.queueOfflineRequest?(dataEndpoint.withDataResponse) + completion(.success(messageModel)) + } + } func saveSuccessfullySentMessage( cid: ChannelId, @@ -126,11 +167,12 @@ class MessageRepository { // error code for duplicated messages. let isDuplicatedMessageError = errorPayload.code == 4 && errorPayload.message.contains("already exists") if isDuplicatedMessageError { - database.write { + database.write({ let messageDTO = $0.message(id: messageId) messageDTO?.markMessageAsSent() + }, completion: { _ in completion(.failure(.failedToSendMessage(error))) - } + }) return } } diff --git a/Sources/StreamChat/Repositories/OfflineRequestsRepository.swift b/Sources/StreamChat/Repositories/OfflineRequestsRepository.swift index 4a5755f2c63..62adbb24e41 100644 --- a/Sources/StreamChat/Repositories/OfflineRequestsRepository.swift +++ b/Sources/StreamChat/Repositories/OfflineRequestsRepository.swift @@ -2,6 +2,7 @@ // Copyright © 2024 Stream.io Inc. All rights reserved. // +import CoreData import Foundation typealias QueueOfflineRequestBlock = (DataEndpoint) -> Void @@ -51,24 +52,100 @@ class OfflineRequestsRepository { /// - If the request fails with a connection error -> The request is kept to be executed once the connection is back (we are not putting it back at the queue to make sure we respect the order) /// - If the request fails with any other error -> We are dismissing the request, and removing it from the queue func runQueuedRequests(completion: @escaping () -> Void) { - let readContext = database.backgroundReadOnlyContext - readContext.perform { [weak self] in - let requests = QueuedRequestDTO.loadAllPendingRequests(context: readContext).map { - ($0.id, $0.endpoint, $0.date as Date) + database.read { session in + let dtos = session.allQueuedRequests() + var requests = [Request]() + requests.reserveCapacity(dtos.count) + var requestIdsToDelete = Set() + let currentDate = Date() + + for dto in dtos { + let id = dto.id + let endpointData = dto.endpoint + let date = dto.date.bridgeDate + + // Is valid + guard let endpoint = try? JSONDecoder.stream.decode(DataEndpoint.self, from: endpointData) else { + log.error("Could not decode queued request \(id)", subsystems: .offlineSupport) + requestIdsToDelete.insert(dto.id) + continue + } + + // Is expired + let hoursQueued = currentDate.timeIntervalSince(date) / Constants.secondsInHour + let shouldBeDiscarded = hoursQueued > Double(self.maxHoursThreshold) + guard endpoint.shouldBeQueuedOffline && !shouldBeDiscarded else { + log.error("Queued request for /\(endpoint.path.value) should not be queued", subsystems: .offlineSupport) + requestIdsToDelete.insert(dto.id) + continue + } + requests.append(Request(id: id, date: date, endpoint: endpoint)) + } + + // Out of valid requests, merge send message requests for the same id + let sendMessageIdGroups = Dictionary(grouping: requests, by: { $0.sendMessageId }) + var mergedRequests = [Request]() + mergedRequests.reserveCapacity(requests.count) + for request in requests { + if let sendMessageId = request.sendMessageId { + // Is it already merged into another + if requestIdsToDelete.contains(request.id) { + continue + } + if let duplicates = sendMessageIdGroups[sendMessageId], duplicates.count >= 2 { + // Coalesce send message requests in a way that we use the latest endpoint data + // because the message could have changed when there was a manual retry + let sortedDuplicates = duplicates.sorted(by: { $0.date < $1.date }) + let earliest = sortedDuplicates.first! + let latest = sortedDuplicates.last! + mergedRequests.append(Request(id: earliest.id, date: earliest.date, endpoint: latest.endpoint)) + // All the others should be deleted + requestIdsToDelete.formUnion(duplicates.dropFirst().map(\.id)) + } else { + mergedRequests.append(request) + } + } else { + mergedRequests.append(request) + } } - DispatchQueue.main.async { - self?.executeRequests(requests, completion: completion) + log.info("\(mergedRequests.count) pending offline requests (coalesced = \(requests.count - mergedRequests.count)", subsystems: .offlineSupport) + return (requests: mergedRequests, deleteIds: requestIdsToDelete) + } completion: { [weak self] result in + switch result { + case .success(let pair): + self?.deleteRequests(with: pair.deleteIds, completion: { + self?.retryQueue.async { + self?.executeRequests(pair.requests, completion: completion) + } + }) + case .failure(let error): + log.error("Failed to read queued requests with error \(error.localizedDescription)", subsystems: .offlineSupport) + completion() } } } - - private func executeRequests(_ requests: [(String, Data, Date)], completion: @escaping () -> Void) { - log.info("\(requests.count) pending offline requests", subsystems: .offlineSupport) - + + private func deleteRequests(with ids: Set, completion: @escaping () -> Void) { + guard !ids.isEmpty else { + completion() + return + } + database.write { session in + for id in ids { + session.deleteQueuedRequest(id: id) + } + } completion: { _ in + completion() + } + } + + private func executeRequests(_ requests: [Request], completion: @escaping () -> Void) { let database = self.database - let currentDate = Date() let group = DispatchGroup() - for (id, endpoint, date) in requests { + for request in requests { + let id = request.id + let endpoint = request.endpoint + group.enter() let leave = { group.leave() @@ -79,21 +156,6 @@ class OfflineRequestsRepository { }, completion: { _ in leave() }) } - guard let endpoint = try? JSONDecoder.stream.decode(DataEndpoint.self, from: endpoint) else { - log.error("Could not decode queued request \(id)", subsystems: .offlineSupport) - deleteQueuedRequestAndComplete() - continue - } - - let hoursQueued = currentDate.timeIntervalSince(date) / Constants.secondsInHour - let shouldBeDiscarded = hoursQueued > Double(maxHoursThreshold) - - guard endpoint.shouldBeQueuedOffline && !shouldBeDiscarded else { - log.error("Queued request for /\(endpoint.path.value) should not be queued", subsystems: .offlineSupport) - deleteQueuedRequestAndComplete() - continue - } - log.info("Executing queued offline request for /\(endpoint.path)", subsystems: .offlineSupport) apiClient.recoveryRequest(endpoint: endpoint) { [weak self] result in log.info("Completed queued offline request /\(endpoint.path)", subsystems: .offlineSupport) @@ -177,8 +239,36 @@ class OfflineRequestsRepository { database.write { _ in QueuedRequestDTO.createRequest(date: date, endpoint: data, context: database.writableContext) log.info("Queued request for /\(endpoint.path)", subsystems: .offlineSupport) + } completion: { _ in completion?() } } } } + +private extension OfflineRequestsRepository { + struct Request { + let id: String + let date: Date + let endpoint: DataEndpoint + let sendMessageId: MessageId? + + init(id: String, date: Date, endpoint: DataEndpoint) { + self.id = id + self.date = date + self.endpoint = endpoint + + sendMessageId = { + switch endpoint.path { + case .sendMessage: + guard let bodyData = endpoint.body as? Data else { return nil } + guard let json = try? JSONSerialization.jsonObject(with: bodyData) as? [String: Any] else { return nil } + guard let message = json["message"] as? [String: Any] else { return nil } + return message["id"] as? String + default: + return nil + } + }() + } + } +} diff --git a/Sources/StreamChat/Repositories/SyncRepository.swift b/Sources/StreamChat/Repositories/SyncRepository.swift index 19ad3d932c9..328d424bb67 100644 --- a/Sources/StreamChat/Repositories/SyncRepository.swift +++ b/Sources/StreamChat/Repositories/SyncRepository.swift @@ -132,6 +132,11 @@ class SyncRepository { // Enter recovery mode so no other requests are triggered. apiClient.enterRecoveryMode() + // Run offline actions requests as the first thing + if config.isLocalStorageEnabled { + operations.append(ExecutePendingOfflineActions(offlineRequestsRepository: offlineRequestsRepository)) + } + // Get the existing channelIds let activeChannelIds = activeChannelControllers.allObjects.compactMap(\.cid) operations.append(GetChannelIdsOperation(database: database, context: context, activeChannelIds: activeChannelIds)) @@ -176,11 +181,6 @@ class SyncRepository { // 4. Clean up unwanted channels operations.append(DeleteUnwantedChannelsOperation(database: database, context: context)) - // 5. Run offline actions requests - if config.isLocalStorageEnabled { - operations.append(ExecutePendingOfflineActions(offlineRequestsRepository: offlineRequestsRepository)) - } - operations.append(BlockOperation(block: { [weak self] in log.info("Finished recovering offline state", subsystems: .offlineSupport) DispatchQueue.main.async { diff --git a/Sources/StreamChat/StateLayer/Chat.swift b/Sources/StreamChat/StateLayer/Chat.swift index 90c79f413dc..71a6c51f18c 100644 --- a/Sources/StreamChat/StateLayer/Chat.swift +++ b/Sources/StreamChat/StateLayer/Chat.swift @@ -1418,20 +1418,3 @@ extension Chat { ) -> TypingEventsSender = TypingEventsSender.init } } - -// MARK: - Chat Client - -private extension ChatClient { - func backgroundWorker(of type: T.Type) throws -> T { - if let worker = backgroundWorkers.compactMap({ $0 as? T }).first { - return worker - } - if currentUserId == nil { - throw ClientError.CurrentUserDoesNotExist() - } - if !config.isClientInActiveMode { - throw ClientError.ClientIsNotInActiveMode() - } - throw ClientError("Background worker of type \(T.self) is not set up") - } -} diff --git a/Sources/StreamChat/Workers/Background/MessageSender.swift b/Sources/StreamChat/Workers/Background/MessageSender.swift index e600e2b8cae..af6d12fbb6b 100644 --- a/Sources/StreamChat/Workers/Background/MessageSender.swift +++ b/Sources/StreamChat/Workers/Background/MessageSender.swift @@ -14,10 +14,7 @@ import Foundation /// 3. When the message is being sent, its local state is changed to `.sending` /// 4. If the operation is successful, the local state of the message is changed to `nil`. If the operation fails, the local /// state of is changed to `sendingFailed`. -/// -// TODO: -/// - Message send retry -/// - Start sending messages when connection status changes (offline -> online) +/// 5. When connection errors happen, all the queued messages are sent to offline retry which retries them one by one. /// class MessageSender: Worker { /// Because we need to be sure messages for every channel are sent in the correct order, we create a sending queue for @@ -117,6 +114,15 @@ class MessageSender: Worker { } } } + + func didUpdateConnectionState(_ state: WebSocketConnectionState) { + guard state.isConnected else { return } + sendingDispatchQueue.async { [weak self] in + self?.sendingQueueByCid.forEach { _, messageQueue in + messageQueue.webSocketConnected() + } + } + } } // MARK: - Chat State Layer @@ -170,6 +176,7 @@ private class MessageSendingQueue { /// We use Set because the message Id is the main identifier. Thanks to this, it's possible to schedule message for sending /// multiple times without having to worry about that. @Atomic private(set) var requests: Set = [] + @Atomic private var isWaitingForConnection = false /// Schedules sending of the message. All already scheduled messages with `createdLocallyAt` older than these ones will /// be sent first. @@ -184,37 +191,57 @@ private class MessageSendingQueue { sendNextMessage() } } + + func webSocketConnected() { + guard isWaitingForConnection else { return } + isWaitingForConnection = false + log.debug("Message sender resumed sending messages after establishing internet connection") + sendNextMessage() + } + private var sortedQueuedRequests: [SendRequest] { + requests.sorted(by: { $0.createdLocallyAt < $1.createdLocallyAt }) + } + /// Gets the oldest message from the queue and tries to send it. private func sendNextMessage() { dispatchQueue.async { [weak self] in - // Sort the messages and send the oldest one - // If this proves to be a bottleneck in the future, we might - // switch to using a custom `OrderedSet` - guard let request = self?.requests.sorted(by: { $0.createdLocallyAt < $1.createdLocallyAt }).first else { return } - - self?.messageRepository.sendMessage(with: request.messageId) { [weak self] result in - guard let self else { return } - self.removeRequestAndContinue(request) - if let error = result.error { - switch error { - case .messageDoesNotExist, - .messageNotPendingSend, - .messageDoesNotHaveValidChannel: - let event = NewMessageErrorEvent(messageId: request.messageId, error: error) - self.eventsNotificationCenter.process(event) - case let .failedToSendMessage(error): - let event = NewMessageErrorEvent(messageId: request.messageId, error: error) - self.eventsNotificationCenter.process(event) - } + guard let self else { return } + guard let request = self.sortedQueuedRequests.first else { return } + + if self.isWaitingForConnection { + self.messageRepository.scheduleOfflineRetry(for: request.messageId) { [weak self] _ in + self?._requests.mutate { $0.remove(request) } + self?.sendNextMessage() + } + } else { + self.messageRepository.sendMessage(with: request.messageId) { [weak self] result in + self?.handleSendMessageResult(request, result: result) } - self.delegate?.messageSendingQueue(self, didProcess: request.messageId, result: result) } } } - - private func removeRequestAndContinue(_ request: SendRequest) { + + private func handleSendMessageResult(_ request: SendRequest, result: Result) { _requests.mutate { $0.remove(request) } + + if let repositoryError = result.error { + switch repositoryError { + case .messageDoesNotExist, .messageNotPendingSend, .messageDoesNotHaveValidChannel: + let event = NewMessageErrorEvent(messageId: request.messageId, error: repositoryError) + eventsNotificationCenter.process(event) + case .failedToSendMessage(let clientError): + let event = NewMessageErrorEvent(messageId: request.messageId, error: clientError) + eventsNotificationCenter.process(event) + + if ClientError.isEphemeral(error: clientError) { + // We hit a connection error, therefore all the remaining and upcoming requests should be scheduled for keeping the order + isWaitingForConnection = true + log.debug("Message sender started waiting for connection and forwarding messages to offline requests queue") + } + } + } + delegate?.messageSendingQueue(self, didProcess: request.messageId, result: result) sendNextMessage() } } diff --git a/TestTools/StreamChatTestTools/Mocks/StreamChat/Database/DatabaseSession_Mock.swift b/TestTools/StreamChatTestTools/Mocks/StreamChat/Database/DatabaseSession_Mock.swift index e3a35438ce3..9a04f23457f 100644 --- a/TestTools/StreamChatTestTools/Mocks/StreamChat/Database/DatabaseSession_Mock.swift +++ b/TestTools/StreamChatTestTools/Mocks/StreamChat/Database/DatabaseSession_Mock.swift @@ -360,6 +360,10 @@ class DatabaseSession_Mock: DatabaseSession { func saveQuery(query: MessageSearchQuery) -> MessageSearchQueryDTO { underlyingSession.saveQuery(query: query) } + + func allQueuedRequests() -> [QueuedRequestDTO] { + underlyingSession.allQueuedRequests() + } func deleteQueuedRequest(id: String) { underlyingSession.deleteQueuedRequest(id: id) diff --git a/TestTools/StreamChatTestTools/SpyPattern/Spy/APIClient_Spy.swift b/TestTools/StreamChatTestTools/SpyPattern/Spy/APIClient_Spy.swift index a2a7bd4f843..dd6bd131282 100644 --- a/TestTools/StreamChatTestTools/SpyPattern/Spy/APIClient_Spy.swift +++ b/TestTools/StreamChatTestTools/SpyPattern/Spy/APIClient_Spy.swift @@ -23,6 +23,7 @@ final class APIClient_Spy: APIClient, Spy { /// The last endpoint `recoveryRequest` function was called with. @Atomic var recoveryRequest_endpoint: AnyEndpoint? @Atomic var recoveryRequest_completion: Any? + @Atomic var recoveryRequest_results: [Any] = [] @Atomic var recoveryRequest_allRecordedCalls: [(endpoint: AnyEndpoint, completion: Any?)] = [] /// The last endpoint `unmanagedRequest` function was called with. @@ -53,6 +54,7 @@ final class APIClient_Spy: APIClient, Spy { request_completion = nil request_results = [] request_expectation = .init() + recoveryRequest_results = [] recoveryRequest_expectation = .init() uploadRequest_expectation = .init() @@ -104,6 +106,10 @@ final class APIClient_Spy: APIClient, Spy { func test_mockResponseResult(_ responseResult: Result) { request_results.append(responseResult) } + + func test_mockRecoveryResponseResult(_ responseResult: Result) { + recoveryRequest_results.append(responseResult) + } func test_mockUnmanagedResponseResult(_ responseResult: Result) { unmanagedRequest_result = responseResult @@ -129,6 +135,10 @@ final class APIClient_Spy: APIClient, Spy { completion: @escaping (Result) -> Void ) where Response: Decodable { recoveryRequest_endpoint = AnyEndpoint(endpoint) + if let resultIndex = recoveryRequest_results.firstIndex(where: { $0 is Result }) { + let result = recoveryRequest_results.remove(at: resultIndex) + completion(result as! Result) + } recoveryRequest_completion = completion _recoveryRequest_allRecordedCalls.mutate { $0.append((recoveryRequest_endpoint!, recoveryRequest_completion!)) } } @@ -166,6 +176,7 @@ final class APIClient_Spy: APIClient, Spy { @discardableResult func waitForRequest(timeout: Double = defaultTimeout) -> AnyEndpoint? { XCTWaiter().wait(for: [request_expectation], timeout: timeout) + request_expectation = XCTestExpectation() return request_endpoint } diff --git a/Tests/StreamChatTests/Repositories/OfflineRequestsRepository_Tests.swift b/Tests/StreamChatTests/Repositories/OfflineRequestsRepository_Tests.swift index 250cec361c1..deb7085a411 100644 --- a/Tests/StreamChatTests/Repositories/OfflineRequestsRepository_Tests.swift +++ b/Tests/StreamChatTests/Repositories/OfflineRequestsRepository_Tests.swift @@ -315,20 +315,34 @@ final class OfflineRequestsRepository_Tests: XCTestCase { XCTAssertEqual(pendingRequests.count, 0) } - private func createSendMessageRequests(count: Int, date: Date = Date()) throws { + private func createSendMessageRequests(count: Int, date: Date = Date().addingTimeInterval(-3600)) throws { try (1...count).forEach { - let id = "request\($0)" - try self.createRequest( - id: id, - path: .sendMessage(.init(type: .messaging, id: id)), - body: ["some\($0)": 123], - date: date + try createSendMessageRequest( + requestIdNumber: $0, + messageIdNumber: $0, + date: date.addingTimeInterval(TimeInterval($0)) ) } let allRequests = QueuedRequestDTO.loadAllPendingRequests(context: database.viewContext) XCTAssertEqual(allRequests.count, count) } + + private func createSendMessageRequest(requestIdNumber: Int, messageIdNumber: Int, date: Date) throws { + let id = "request\(requestIdNumber)" + let messageId = "message\(messageIdNumber)" + let requestBody = MessageRequestBody(id: messageId, user: .dummy(userId: .unique), text: .unique, extraData: [:]) + let endpoint: Endpoint = .sendMessage( + cid: .init(type: .messaging, id: id), + messagePayload: requestBody, + skipPush: false, + skipEnrichUrl: false + ) + let endpointData: Data = try JSONEncoder.stream.encode(endpoint.withDataResponse) + try database.writeSynchronously { _ in + QueuedRequestDTO.createRequest(id: id, date: date, endpoint: endpointData, context: self.database.writableContext) + } + } private func createRequest(id: String, path: EndpointPath, body: Encodable? = nil, date: Date = Date()) throws { let endpoint = Endpoint( @@ -382,4 +396,30 @@ final class OfflineRequestsRepository_Tests: XCTestCase { waitForExpectations(timeout: defaultTimeout, handler: nil) XCTAssertCall("write(_:completion:)", on: database, times: 1) } + + func test_queueOfflineRequestsMultipleTimesThenDuplicateSendMessageRequestsAreCoalesced() throws { + try createSendMessageRequests(count: 5) // 1...5 + // Duplicate second and forth + try createSendMessageRequest(requestIdNumber: 6, messageIdNumber: 2, date: Date()) + try createSendMessageRequest(requestIdNumber: 7, messageIdNumber: 4, date: Date()) + + // 5 successful responses, 2 should never end up here because these should be coalesced + for _ in 0..<5 { + apiClient.test_mockRecoveryResponseResult(Result.success(Data())) + } + + let expectation = XCTestExpectation(description: "Run") + repository.runQueuedRequests { + expectation.fulfill() + } + + // When failure happens then request merging did not work + wait(for: [expectation], timeout: defaultTimeout) + + // Validate that all the requests are cleaned up + try database.readSynchronously { session in + let requests = session.allQueuedRequests() + XCTAssertEqual(0, requests.count) + } + } } diff --git a/Tests/StreamChatTests/Workers/Background/MessageSender_Tests.swift b/Tests/StreamChatTests/Workers/Background/MessageSender_Tests.swift index 96583263b8f..ff5bb560111 100644 --- a/Tests/StreamChatTests/Workers/Background/MessageSender_Tests.swift +++ b/Tests/StreamChatTests/Workers/Background/MessageSender_Tests.swift @@ -404,6 +404,56 @@ final class MessageSender_Tests: XCTestCase { AssertAsync.willBeTrue(eventsNotificationCenter.mock_processCalledWithEvents.first is NewMessageErrorEvent) XCTAssertCall("sendMessage(with:completion:)", on: messageRepository, times: 1) } + + func test_senderSendsMessages_forwardsPendingMessagesToOfflineHandlingOnConnectionError() throws { + // Sender with non-mock message repository + let nonMockMessageRepository = MessageRepository(database: database, apiClient: apiClient) + sender = MessageSender( + messageRepository: nonMockMessageRepository, + eventsNotificationCenter: eventsNotificationCenter, + database: database, + apiClient: apiClient + ) + var queueOfflineRequestCounter = 0 + let offlineQueuingExpectation = XCTestExpectation(description: "2, 3, 4, 5 queued") + apiClient.queueOfflineRequest = { _ in + queueOfflineRequestCounter += 1 + guard queueOfflineRequestCounter == 4 else { return } + offlineQueuingExpectation.fulfill() + } + + // At the end of test all 5 are expected to finish successfully + let messageIds = (1...5).map { "\($0)" } + + try database.writeSynchronously { session in + for id in messageIds { + try self.createMessage(id: id, in: session) + } + } + + // First: success + apiClient.waitForRequest() + try resumeAPIRequestAndWaitForLocalStateChange(messageId: "1", success: true) + + // Second: connection error + apiClient.waitForRequest() + try resumeAPIRequestAndWaitForLocalStateChange(messageId: "2", success: false) + + // We use mocked API client which does not do the automatic forwarding, therefore we simulate it here + apiClient.queueOfflineRequest?(DataEndpoint(path: .sendMessage(cid), method: .post)) + + // Since connection error was received, all the remaining queued messages are sent directly to offline repository + wait(for: [offlineQueuingExpectation], timeout: defaultTimeout) + + // Verify states (one successful, others failing) + try database.readSynchronously { session in + let localMessageStates = messageIds.map { session.message(id: $0)?.localMessageState } + let expected: [LocalMessageState?] = [nil, .sendingFailed, .sendingFailed, .sendingFailed, .sendingFailed] + XCTAssertEqual(expected, localMessageStates) + } + + // Offline repository is now responsible of sending the requests + } // MARK: - Life cycle tests @@ -456,6 +506,38 @@ final class MessageSender_Tests: XCTestCase { // Then wait(for: [sessionMock.rescueMessagesExpectation], timeout: defaultTimeout) } + + // MARK: - + + @discardableResult func createMessage(id: MessageId, in session: DatabaseSession) throws -> MessageId { + let dto = try session.createNewMessage( + in: cid, + messageId: id, + text: "\(id)", + pinning: nil, + quotedMessageId: nil, + skipPush: false, + skipEnrichUrl: false + ) + dto.localMessageState = .pendingSend + return dto.id + } + + private func resumeAPIRequestAndWaitForLocalStateChange(messageId: MessageId, success: Bool) throws { + let localStateExpectation = XCTestExpectation(description: "\(messageId) - local state change") + database.didWrite = { + // Extra delay for allowing MessageSender to run the MessageRepository's completion + DispatchQueue.main.async { + localStateExpectation.fulfill() + } + } + if success { + apiClient.test_simulateResponse(.success(MessagePayload.Boxed(message: .dummy(messageId: messageId, text: "processed", cid: cid)))) + } else { + apiClient.test_simulateResponse(Result.failure(NSError(domain: NSURLErrorDomain, code: NSURLErrorNetworkConnectionLost))) + } + wait(for: [localStateExpectation], timeout: defaultTimeout) + } } private class DatabaseSessionRescueListener: DatabaseSession_Mock {