From 16a15e365aa0c108fcb5792730a62fc1b3386192 Mon Sep 17 00:00:00 2001 From: Lucas Nelaupe Date: Fri, 6 May 2022 00:44:16 +0800 Subject: [PATCH] Allow user to specify enqueue DispatchQueue to fix multi-thread enqueue crash (#403) --- Sources/SwiftQueue/JobBuilder.swift | 1 - Sources/SwiftQueue/SwiftQueueManager.swift | 24 +++++++++++++++++- .../SwiftQueueManagerTests.swift | 25 +++++++++++++++++++ 3 files changed, 48 insertions(+), 2 deletions(-) diff --git a/Sources/SwiftQueue/JobBuilder.swift b/Sources/SwiftQueue/JobBuilder.swift index 7384dce4..5f9dba6a 100644 --- a/Sources/SwiftQueue/JobBuilder.swift +++ b/Sources/SwiftQueue/JobBuilder.swift @@ -184,7 +184,6 @@ public final class JobBuilder { assert(JSONSerialization.isValidJSONObject(info.params)) info.constraints.append(PersisterConstraint(serializer: manager.params.serializer, persister: manager.params.persister)) } - manager.enqueue(info: info) } diff --git a/Sources/SwiftQueue/SwiftQueueManager.swift b/Sources/SwiftQueue/SwiftQueueManager.swift index 7e703d7e..9640cc97 100644 --- a/Sources/SwiftQueue/SwiftQueueManager.swift +++ b/Sources/SwiftQueue/SwiftQueueManager.swift @@ -63,6 +63,16 @@ public final class SwiftQueueManager { /// Schedule a job to the queue /// TODO Need to remove this method public func enqueue(info: JobInfo) { + if let thread = params.enqueueThread { + thread.sync(flags: .barrier) { + enqueueInMainThread(info: info) + } + } else { + enqueueInMainThread(info: info) + } + } + + private func enqueueInMainThread(info: JobInfo) { let queue = getQueue(queueName: info.queueName) let job = queue.createHandler(type: info.type, params: info.params) let constraints = info.constraints @@ -167,6 +177,8 @@ internal struct SqManagerParams { var initInBackground: Bool + var enqueueThread: DispatchQueue? + init(jobCreator: JobCreator, queueCreator: QueueCreator, persister: JobPersister = UserDefaultsPersister(), @@ -174,7 +186,8 @@ internal struct SqManagerParams { logger: SwiftQueueLogger = NoLogger.shared, listener: JobListener? = nil, initInBackground: Bool = false, - dispatchQueue: DispatchQueue = DispatchQueue.global(qos: DispatchQoS.QoSClass.utility) + dispatchQueue: DispatchQueue = DispatchQueue.global(qos: DispatchQoS.QoSClass.utility), + enqueueThread: DispatchQueue? = nil ) { self.jobCreator = jobCreator self.queueCreator = queueCreator @@ -184,6 +197,7 @@ internal struct SqManagerParams { self.listener = listener self.initInBackground = initInBackground self.dispatchQueue = dispatchQueue + self.enqueueThread = enqueueThread } } @@ -242,6 +256,14 @@ public final class SwiftQueueManagerBuilder { return self } + /// Use a single DispatchQueue to enqueue jobs + /// This can solve crashes when calling 'enqueue' in a multi-thread environment + public func set(enqueueDispatcher: DispatchQueue) -> Self { + params.enqueueThread = enqueueDispatcher + return self + } + + /// Get an instance of `SwiftQueueManager` public func build() -> SwiftQueueManager { return SwiftQueueManager(params: params, isSuspended: isSuspended) diff --git a/Tests/SwiftQueueTests/SwiftQueueManagerTests.swift b/Tests/SwiftQueueTests/SwiftQueueManagerTests.swift index 0087b70a..e3cd8767 100644 --- a/Tests/SwiftQueueTests/SwiftQueueManagerTests.swift +++ b/Tests/SwiftQueueTests/SwiftQueueManagerTests.swift @@ -305,4 +305,29 @@ class SwiftQueueManagerTests: XCTestCase { job.assertError(queueError: .canceled) } + func testConcurrentScheduling() { + let (type, job) = (UUID().uuidString, TestJob()) + let creator = TestCreator([type: job]) + let persister = PersisterTracker(key: UUID().uuidString) + let manager = SwiftQueueManagerBuilder(creator: creator) + .set(isSuspended: true) + .set(enqueueDispatcher: .main) + .set(persister: persister).build() + + + let concurrentQueue = DispatchQueue(label: "com.test.concurrent", attributes: .concurrent) + for _ in 0..<10 { + concurrentQueue.async { + JobBuilder(type: type).parallel(queueName: UUID().uuidString).schedule(manager: manager) + } + } + + for _ in 0..<10 { + DispatchQueue(label: "com.test.concurrent", attributes: .concurrent).async { + JobBuilder(type: type).parallel(queueName: UUID().uuidString).schedule(manager: manager) + } + } + + } + }