From da16c4ab151f14c96e2cb05acab3d496c17acd9b Mon Sep 17 00:00:00 2001 From: Mathieu Allaire Date: Fri, 29 Nov 2019 12:08:31 -0500 Subject: [PATCH] Backport Swift Access Races in multi-threading causing memory issues #53 --- Source/Classes/ActionCableClient.swift | 160 +++++++++++++------------ 1 file changed, 84 insertions(+), 76 deletions(-) diff --git a/Source/Classes/ActionCableClient.swift b/Source/Classes/ActionCableClient.swift index afc0082..c4f2688 100644 --- a/Source/Classes/ActionCableClient.swift +++ b/Source/Classes/ActionCableClient.swift @@ -26,16 +26,17 @@ import Starscream public typealias ActionPayload = Dictionary open class ActionCableClient { - + //MARK: Socket fileprivate(set) var socket : WebSocket - + fileprivate let queue = DispatchQueue(label: "Custom_Queue") + /// Reconnection Strategy /// /// If a disconnection occurs, reconnnectionStrategy determines and calculates /// the time interval at which a retry happens. open var reconnectionStrategy : RetryStrategy = .logarithmicBackoff(maxRetries: 5, maxIntervalTime: 30.0) - + //MARK: Global Callbacks /// Will Connect /// @@ -61,13 +62,13 @@ open class ActionCableClient { /// /// Called when the server pings the client open var onPing: (() -> Void)? - + // MARK: Channel Callbacks open var onChannelSubscribed: ((Channel) -> (Void))? open var onChannelUnsubscribed: ((Channel) -> (Void))? open var onChannelRejected: ((Channel) -> (Void))? open var onChannelReceive: ((Channel, Any?, Swift.Error?) -> Void)? - + //MARK: Properties open var isConnected : Bool { return socket.isConnected } open var url: Foundation.URL { return socket.currentURL } @@ -96,7 +97,7 @@ open class ActionCableClient { socket = WebSocket(request: request) setupWebSocket() } - + /// Connect with the server @discardableResult open func connect() -> ActionCableClient { @@ -104,51 +105,51 @@ open class ActionCableClient { if let callback = self.willConnect { callback() } - + ActionCableConcurrentQueue.async { self.socket.connect() self.reconnectionState = nil } } - + return self } - + /// Disconnect from the server. open func disconnect() { manualDisconnectFlag = true socket.disconnect(forceTimeout: 0) } - + internal func reconnect() { DispatchQueue.main.async { var shouldReconnect = true if let callback = self.willReconnect { shouldReconnect = callback() } - + // Reconnection has been cancelled if (!shouldReconnect) { self.reconnectionState = nil return } - + if let callback = self.willConnect { callback() } - + ActionCableConcurrentQueue.async { self.socket.connect() } } } - + @discardableResult internal func transmit(_ data: ActionPayload? = nil, on channel: Channel, as command: Command) throws -> Bool { // First let's see if we can even encode this data - + let JSONString = try JSONSerializer.serialize(channel, command: command, data: data) - + // // If it's a message, let's see if we are subscribed first. // @@ -156,30 +157,30 @@ open class ActionCableClient { // actions, we want to tell the channel it's not subscribed rather // than we are not connected. // - + if (command == Command.message) { guard channel.isSubscribed else { throw TransmitError.notSubscribed } } - + // Let's check if we are connected. guard isConnected else { throw TransmitError.notConnected } - + socket.write(string: JSONString) { //FINISHED! } - + return true } - + // MARK: Properties fileprivate var channelArray = Array() fileprivate(set) var channels = Dictionary() fileprivate var unconfirmedChannels = Dictionary() - + /// Reconnection State /// This keeps our reconnection state around while we try to reconnect fileprivate var reconnectionState : RetryHandler? - + /// Manual Disconnect Flag /// /// This flag tells us if we decided to manually disconnect @@ -194,43 +195,43 @@ extension ActionCableClient { /// - Parameters: /// - name: The name of the channel. The name must match the class name on the server exactly. (e.g. RoomChannel) /// - Returns: a Channel - + public func create(_ name: String) -> Channel { let channel = create(name, identifier: nil, autoSubscribe: true, bufferActions: true) return channel } - + /// Create and subscribe to a channel. - /// + /// /// - Parameters: /// - name: The name of the channel. The name must match the class name on the server exactly. (e.g. RoomChannel) /// - identifier: An optional Dictionary with parameters to be passed into the Channel on each request /// - autoSubscribe: Whether to automatically subscribe to the channel. Defaults to true. /// - Returns: a Channel - + public func create(_ name: String, identifier: ChannelIdentifier?, autoSubscribe: Bool=true, bufferActions: Bool=true) -> Channel { // Look in existing channels and return that if let channel = channels[name] { return channel } - + // Look in unconfirmed channels and return that if let channel = unconfirmedChannels[name] { return channel } - + // Otherwise create a new one let channel = Channel(name: name, identifier: identifier, client: self, autoSubscribe: autoSubscribe, shouldBufferActions: bufferActions) - + self.unconfirmedChannels[name] = channel - + if (channel.autoSubscribe) { subscribe(channel) } - + return channel } - + public subscript(name: String) -> Channel { let channel = create(name, identifier: nil, autoSubscribe: true, bufferActions: true) return channel @@ -239,37 +240,37 @@ extension ActionCableClient { // MARK: Channel Subscriptions extension ActionCableClient { - + public func subscribed(_ name: String) -> Bool { return self.channels.keys.contains(name) } - + internal func subscribe(_ channel: Channel) { // Is it already added and subscribed? if let existingChannel = channels[channel.name] , (existingChannel == channel) { return } - + guard let channel = unconfirmedChannels[channel.name] else { debugPrint("[ActionCableClient] Internal inconsistency error!"); return } - + do { try transmit(on: channel, as: Command.subscribe) } catch { debugPrint(error) } } - + internal func unsubscribe(_ channel: Channel) { do { try self.transmit(on: channel, as: Command.unsubscribe) - + let message = Message(channelName: channel.name, actionName: nil, messageType: MessageType.cancelSubscription, data: nil, error: nil) - + onMessage(message) } catch { // There is a chance here the server could be down or not connected. @@ -278,7 +279,7 @@ extension ActionCableClient { debugPrint(error) } } - + @discardableResult internal func action(_ action: String, on channel: Channel, with data: ActionPayload?) throws -> Bool { var internalData : ActionPayload @@ -287,16 +288,16 @@ extension ActionCableClient { } else { internalData = ActionPayload() } - + internalData["action"] = action - + return try transmit(internalData, on: channel, as: .message) } } // MARK: WebSocket Callbacks extension ActionCableClient { - + fileprivate func setupWebSocket() { self.socket.onConnect = { [weak self] in self!.didConnect() } as (() -> Void) self.socket.onDisconnect = { [weak self] (error: Swift.Error?) in self!.didDisconnect(error) } @@ -304,43 +305,43 @@ extension ActionCableClient { self.socket.onData = { [weak self] (data: Data) in self!.onData(data) } self.socket.onPong = { [weak self] (data: Data?) in self!.didPong() } } - + fileprivate func didConnect() { - + // Clear Reconnection State: We successfull connected reconnectionState = nil - + if let callback = onConnected { DispatchQueue.main.async(execute: callback) } - + for (_, channel) in self.unconfirmedChannels { if channel.autoSubscribe { self.subscribe(channel) } } } - + fileprivate func didDisconnect(_ error: Swift.Error?) { - + var attemptReconnect: Bool = true var connectionError: ConnectionError? - + let channels = self.channels for (_, channel) in channels { let message = Message(channelName: channel.name, actionName: nil, messageType: MessageType.hibernateSubscription, data: nil, error: nil) onMessage(message) } - + // Attempt Reconnection? if let unwrappedError = error { connectionError = ConnectionError(from: unwrappedError) attemptReconnect = connectionError!.recoverable } - + // Reconcile reconncetion attempt with manual disconnect attemptReconnect = !manualDisconnectFlag && attemptReconnect - + // disconnect() has not been called and error is // worthy of attempting a reconnect. if (attemptReconnect) { @@ -350,38 +351,38 @@ extension ActionCableClient { if reconnectionState == nil { reconnectionState = RetryHandler(strategy: reconnectionStrategy) } - + reconnectionState?.retry {[weak self] in self?.reconnect() } - + return // if strategy is None, we don't want to reconnect case .none: break } } - - + + // Clear Reconnetion State reconnectionState = nil - + // Fire Callbacks if let callback = onDisconnected { // Clear the Connection Error on a manual disconnect // as it will not seem accurate if manualDisconnectFlag { connectionError = nil } - + DispatchQueue.main.async(execute: { callback(connectionError) }) } - + // Reset Manual Disconnect Flag manualDisconnectFlag = false } - + fileprivate func didPong() { // This never seems to fire with ActionCable! } - + fileprivate func onText(_ text: String) { ActionCableConcurrentQueue.async(execute: { () -> Void in do { @@ -392,9 +393,10 @@ extension ActionCableClient { } }) } - + fileprivate func onMessage(_ message: Message) { - switch(message.messageType) { + queue.sync { + switch(message.messageType) { case .unrecognized: break case .welcome: @@ -407,18 +409,23 @@ extension ActionCableClient { if let channel = channels[message.channelName!] { // Notify Channel channel.onMessage(message) - + if let callback = onChannelReceive { - DispatchQueue.main.async(execute: { callback(channel, message.data, message.error) } ) + DispatchQueue.main.async(execute: { [weak self] in + guard let weakSelf = self else { return } + if weakSelf.onChannelReceive != nil { + callback(channel, message.data, message.error) + } + }) } } case .confirmSubscription: if let channel = unconfirmedChannels.removeValue(forKey: message.channelName!) { self.channels.updateValue(channel, forKey: channel.name) - + // Notify Channel channel.onMessage(message) - + if let callback = onChannelSubscribed { DispatchQueue.main.async(execute: { callback(channel) }) } @@ -426,10 +433,10 @@ extension ActionCableClient { case .rejectSubscription: // Remove this channel from the list of unconfirmed subscriptions if let channel = unconfirmedChannels.removeValue(forKey: message.channelName!) { - + // Notify Channel channel.onMessage(message) - + if let callback = onChannelRejected { DispatchQueue.main.async(execute: { callback(channel) }) } @@ -438,23 +445,24 @@ extension ActionCableClient { if let channel = channels.removeValue(forKey: message.channelName!) { // Add channel into unconfirmed channels unconfirmedChannels[channel.name] = channel - + // We want to treat this like an unsubscribe. fallthrough } case .cancelSubscription: if let channel = channels.removeValue(forKey: message.channelName!) { - + // Notify Channel channel.onMessage(message) - + if let callback = onChannelUnsubscribed { DispatchQueue.main.async(execute: { callback(channel) }) } } } + } } - + fileprivate func onData(_ data: Data) { debugPrint("Received NSData from ActionCable.") } @@ -477,7 +485,7 @@ extension ActionCableClient { assert(false, "This class doesn't implement NSCopying. ") return nil } - + func copy() -> AnyObject! { assert(false, "This class doesn't implement NSCopying") return nil