Skip to content

Commit

Permalink
Fix a rare issue with incorrect message order when sending messages o…
Browse files Browse the repository at this point in the history
…ffline (#3316)
  • Loading branch information
laevandus authored Jul 29, 2024
1 parent 95ee556 commit ead44a7
Show file tree
Hide file tree
Showing 13 changed files with 386 additions and 84 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
- Improve performance of `ChatChannel` and `ChatMessage` equality checks [#3335](https://github.com/GetStream/stream-chat-swift/pull/3335)
### ✅ Added
- Expose `MissingConnectionId` + `InvalidURL` + `InvalidJSON` Errors [#3332](https://github.com/GetStream/stream-chat-swift/pull/3332)
### 🐞 Fixed
- Fix a rare issue with incorrect message order when sending multiple messages while offline [#3316](https://github.com/GetStream/stream-chat-swift/issues/3316)

# [4.60.0](https://github.com/GetStream/stream-chat-swift/releases/tag/4.60.0)
_July 18, 2024_
Expand Down
16 changes: 16 additions & 0 deletions Sources/StreamChat/ChatClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ extension ChatClient: ConnectionStateDelegate {
}
)
connectionRecoveryHandler?.webSocketClient(client, didUpdateConnectionState: state)
try? backgroundWorker(of: MessageSender.self).didUpdateConnectionState(state)
}
}

Expand All @@ -674,6 +675,21 @@ extension ChatClient: ConnectionDetailsProviderDelegate {
}
}

extension ChatClient {
func backgroundWorker<T>(of type: T.Type) throws -> T {
if let worker = backgroundWorkers.compactMap({ $0 as? T }).first {
return worker
}
if currentUserId == nil {
throw ClientError.CurrentUserDoesNotExist()
}
if !config.isClientInActiveMode {
throw ClientError.ClientIsNotInActiveMode()
}
throw ClientError("Background worker of type \(T.self) is not set up")
}
}

extension ClientError {
public final class MissingLocalStorageURL: ClientError {
override public var localizedDescription: String { "The URL provided in ChatClientConfig is `nil`." }
Expand Down
4 changes: 4 additions & 0 deletions Sources/StreamChat/Database/DTOs/QueuedRequestDTO.swift
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ class QueuedRequestDTO: NSManagedObject {
}

extension NSManagedObjectContext: QueuedRequestDatabaseSession {
func allQueuedRequests() -> [QueuedRequestDTO] {
QueuedRequestDTO.loadAllPendingRequests(context: self)
}

func deleteQueuedRequest(id: String) {
guard let request = QueuedRequestDTO.load(id: id, context: self) else { return }
delete(request)
Expand Down
1 change: 1 addition & 0 deletions Sources/StreamChat/Database/DatabaseSession.swift
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ protocol AttachmentDatabaseSession {
}

protocol QueuedRequestDatabaseSession {
func allQueuedRequests() -> [QueuedRequestDTO]
func deleteQueuedRequest(id: String)
}

Expand Down
46 changes: 44 additions & 2 deletions Sources/StreamChat/Repositories/MessageRepository.swift
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,47 @@ class MessageRepository {
})
}
}

/// Marks the message's local status to failed and adds it to the offline retry which sends the message when connection comes back.
func scheduleOfflineRetry(for messageId: MessageId, completion: @escaping (Result<ChatMessage, MessageRepositoryError>) -> Void) {
var dataEndpoint: DataEndpoint!
var messageModel: ChatMessage!
database.write { session in
guard let dto = session.message(id: messageId) else {
throw MessageRepositoryError.messageDoesNotExist
}
guard let channelDTO = dto.channel, let cid = try? ChannelId(cid: channelDTO.cid) else {
throw MessageRepositoryError.messageDoesNotHaveValidChannel
}

// Send the message to offline handling
let requestBody = dto.asRequestBody() as MessageRequestBody
let endpoint: Endpoint<MessagePayload.Boxed> = .sendMessage(
cid: cid,
messagePayload: requestBody,
skipPush: dto.skipPush,
skipEnrichUrl: dto.skipEnrichUrl
)
dataEndpoint = endpoint.withDataResponse

// Mark it as failed
dto.localMessageState = .sendingFailed
messageModel = try dto.asModel()
} completion: { [weak self] writeError in
if let writeError {
switch writeError {
case let repositoryError as MessageRepositoryError:
completion(.failure(repositoryError))
default:
completion(.failure(.failedToSendMessage(writeError)))
}
return
}
// Offline repository will send it when connection comes back on, until then we show the message as failed
self?.apiClient.queueOfflineRequest?(dataEndpoint.withDataResponse)
completion(.success(messageModel))
}
}

func saveSuccessfullySentMessage(
cid: ChannelId,
Expand Down Expand Up @@ -126,11 +167,12 @@ class MessageRepository {
// error code for duplicated messages.
let isDuplicatedMessageError = errorPayload.code == 4 && errorPayload.message.contains("already exists")
if isDuplicatedMessageError {
database.write {
database.write({
let messageDTO = $0.message(id: messageId)
messageDTO?.markMessageAsSent()
}, completion: { _ in
completion(.failure(.failedToSendMessage(error)))
}
})
return
}
}
Expand Down
144 changes: 117 additions & 27 deletions Sources/StreamChat/Repositories/OfflineRequestsRepository.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Copyright © 2024 Stream.io Inc. All rights reserved.
//

import CoreData
import Foundation

typealias QueueOfflineRequestBlock = (DataEndpoint) -> Void
Expand Down Expand Up @@ -51,24 +52,100 @@ class OfflineRequestsRepository {
/// - If the request fails with a connection error -> The request is kept to be executed once the connection is back (we are not putting it back at the queue to make sure we respect the order)
/// - If the request fails with any other error -> We are dismissing the request, and removing it from the queue
func runQueuedRequests(completion: @escaping () -> Void) {
let readContext = database.backgroundReadOnlyContext
readContext.perform { [weak self] in
let requests = QueuedRequestDTO.loadAllPendingRequests(context: readContext).map {
($0.id, $0.endpoint, $0.date as Date)
database.read { session in
let dtos = session.allQueuedRequests()
var requests = [Request]()
requests.reserveCapacity(dtos.count)
var requestIdsToDelete = Set<String>()
let currentDate = Date()

for dto in dtos {
let id = dto.id
let endpointData = dto.endpoint
let date = dto.date.bridgeDate

// Is valid
guard let endpoint = try? JSONDecoder.stream.decode(DataEndpoint.self, from: endpointData) else {
log.error("Could not decode queued request \(id)", subsystems: .offlineSupport)
requestIdsToDelete.insert(dto.id)
continue
}

// Is expired
let hoursQueued = currentDate.timeIntervalSince(date) / Constants.secondsInHour
let shouldBeDiscarded = hoursQueued > Double(self.maxHoursThreshold)
guard endpoint.shouldBeQueuedOffline && !shouldBeDiscarded else {
log.error("Queued request for /\(endpoint.path.value) should not be queued", subsystems: .offlineSupport)
requestIdsToDelete.insert(dto.id)
continue
}
requests.append(Request(id: id, date: date, endpoint: endpoint))
}

// Out of valid requests, merge send message requests for the same id
let sendMessageIdGroups = Dictionary(grouping: requests, by: { $0.sendMessageId })
var mergedRequests = [Request]()
mergedRequests.reserveCapacity(requests.count)
for request in requests {
if let sendMessageId = request.sendMessageId {
// Is it already merged into another
if requestIdsToDelete.contains(request.id) {
continue
}
if let duplicates = sendMessageIdGroups[sendMessageId], duplicates.count >= 2 {
// Coalesce send message requests in a way that we use the latest endpoint data
// because the message could have changed when there was a manual retry
let sortedDuplicates = duplicates.sorted(by: { $0.date < $1.date })
let earliest = sortedDuplicates.first!
let latest = sortedDuplicates.last!
mergedRequests.append(Request(id: earliest.id, date: earliest.date, endpoint: latest.endpoint))
// All the others should be deleted
requestIdsToDelete.formUnion(duplicates.dropFirst().map(\.id))
} else {
mergedRequests.append(request)
}
} else {
mergedRequests.append(request)
}
}
DispatchQueue.main.async {
self?.executeRequests(requests, completion: completion)
log.info("\(mergedRequests.count) pending offline requests (coalesced = \(requests.count - mergedRequests.count)", subsystems: .offlineSupport)
return (requests: mergedRequests, deleteIds: requestIdsToDelete)
} completion: { [weak self] result in
switch result {
case .success(let pair):
self?.deleteRequests(with: pair.deleteIds, completion: {
self?.retryQueue.async {
self?.executeRequests(pair.requests, completion: completion)
}
})
case .failure(let error):
log.error("Failed to read queued requests with error \(error.localizedDescription)", subsystems: .offlineSupport)
completion()
}
}
}

private func executeRequests(_ requests: [(String, Data, Date)], completion: @escaping () -> Void) {
log.info("\(requests.count) pending offline requests", subsystems: .offlineSupport)


private func deleteRequests(with ids: Set<String>, completion: @escaping () -> Void) {
guard !ids.isEmpty else {
completion()
return
}
database.write { session in
for id in ids {
session.deleteQueuedRequest(id: id)
}
} completion: { _ in
completion()
}
}

private func executeRequests(_ requests: [Request], completion: @escaping () -> Void) {
let database = self.database
let currentDate = Date()
let group = DispatchGroup()
for (id, endpoint, date) in requests {
for request in requests {
let id = request.id
let endpoint = request.endpoint

group.enter()
let leave = {
group.leave()
Expand All @@ -79,21 +156,6 @@ class OfflineRequestsRepository {
}, completion: { _ in leave() })
}

guard let endpoint = try? JSONDecoder.stream.decode(DataEndpoint.self, from: endpoint) else {
log.error("Could not decode queued request \(id)", subsystems: .offlineSupport)
deleteQueuedRequestAndComplete()
continue
}

let hoursQueued = currentDate.timeIntervalSince(date) / Constants.secondsInHour
let shouldBeDiscarded = hoursQueued > Double(maxHoursThreshold)

guard endpoint.shouldBeQueuedOffline && !shouldBeDiscarded else {
log.error("Queued request for /\(endpoint.path.value) should not be queued", subsystems: .offlineSupport)
deleteQueuedRequestAndComplete()
continue
}

log.info("Executing queued offline request for /\(endpoint.path)", subsystems: .offlineSupport)
apiClient.recoveryRequest(endpoint: endpoint) { [weak self] result in
log.info("Completed queued offline request /\(endpoint.path)", subsystems: .offlineSupport)
Expand Down Expand Up @@ -177,8 +239,36 @@ class OfflineRequestsRepository {
database.write { _ in
QueuedRequestDTO.createRequest(date: date, endpoint: data, context: database.writableContext)
log.info("Queued request for /\(endpoint.path)", subsystems: .offlineSupport)
} completion: { _ in
completion?()
}
}
}
}

private extension OfflineRequestsRepository {
struct Request {
let id: String
let date: Date
let endpoint: DataEndpoint
let sendMessageId: MessageId?

init(id: String, date: Date, endpoint: DataEndpoint) {
self.id = id
self.date = date
self.endpoint = endpoint

sendMessageId = {
switch endpoint.path {
case .sendMessage:
guard let bodyData = endpoint.body as? Data else { return nil }
guard let json = try? JSONSerialization.jsonObject(with: bodyData) as? [String: Any] else { return nil }
guard let message = json["message"] as? [String: Any] else { return nil }
return message["id"] as? String
default:
return nil
}
}()
}
}
}
10 changes: 5 additions & 5 deletions Sources/StreamChat/Repositories/SyncRepository.swift
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ class SyncRepository {
// Enter recovery mode so no other requests are triggered.
apiClient.enterRecoveryMode()

// Run offline actions requests as the first thing
if config.isLocalStorageEnabled {
operations.append(ExecutePendingOfflineActions(offlineRequestsRepository: offlineRequestsRepository))
}

// Get the existing channelIds
let activeChannelIds = activeChannelControllers.allObjects.compactMap(\.cid)
operations.append(GetChannelIdsOperation(database: database, context: context, activeChannelIds: activeChannelIds))
Expand Down Expand Up @@ -176,11 +181,6 @@ class SyncRepository {
// 4. Clean up unwanted channels
operations.append(DeleteUnwantedChannelsOperation(database: database, context: context))

// 5. Run offline actions requests
if config.isLocalStorageEnabled {
operations.append(ExecutePendingOfflineActions(offlineRequestsRepository: offlineRequestsRepository))
}

operations.append(BlockOperation(block: { [weak self] in
log.info("Finished recovering offline state", subsystems: .offlineSupport)
DispatchQueue.main.async {
Expand Down
17 changes: 0 additions & 17 deletions Sources/StreamChat/StateLayer/Chat.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1418,20 +1418,3 @@ extension Chat {
) -> TypingEventsSender = TypingEventsSender.init
}
}

// MARK: - Chat Client

private extension ChatClient {
func backgroundWorker<T>(of type: T.Type) throws -> T {
if let worker = backgroundWorkers.compactMap({ $0 as? T }).first {
return worker
}
if currentUserId == nil {
throw ClientError.CurrentUserDoesNotExist()
}
if !config.isClientInActiveMode {
throw ClientError.ClientIsNotInActiveMode()
}
throw ClientError("Background worker of type \(T.self) is not set up")
}
}
Loading

0 comments on commit ead44a7

Please sign in to comment.