This operator's arguments include multiple source observables and a transform closure (optional). Each element in a source observable means the start of a new task, and more importantly, it ends the previous task. With multiple source observables, multiple tasks exist at the same time. This operator is like a serial queue, and these tasks are queued. If a task is pending, and the relevant observable emits a new element, this task will be cancelled and the new one will be queued.
To run the Example.playground
, clone the repo, and run pod install
from the Example directory first.
let o1 = PublishSubject<Int>()
let o2 = PublishSubject<Int>()
Observable.semaphore([o1, o2])
.subscribe(onNext: { value in
print(value)
})
.disposed(by: disposeBag)
o1.onNext(1)
o2.onNext(2)
o2.onNext(3)
o1.onNext(4)
o2.onCompleted()
o1.onCompleted()
wait(1)
signal
wait(3)
signal
wait(4)
signal
For nil
used as signal, I provide a convenient version without RxSemaphoreAction
wrapped:
let o1 = PublishSubject<Int?>()
let o2 = PublishSubject<Int?>()
Observable.semaphore([o1, o2])
.subscribe(onNext: { value in
print(value)
})
.disposed(by: disposeBag)
o1.onNext(1)
o2.onNext(2)
o2.onNext(nil)
o1.onNext(3)
o2.onNext(4)
o1.onNext(nil)
o2.onCompleted()
Optional(1)
nil
Optional(3)
nil
Optional(4)
nil
RxSemaphore is available through CocoaPods. To install it, simply add the following line to your Podfile:
pod 'RxSemaphore'
Adelais0, [email protected]
RxSemaphore is available under the MIT license. See the LICENSE file for more info.