一个观察者(Observer)订阅一个可观察序列(Observable)。观察者对Observable发射的数据或数据序列作出响应
一个程序通常包含着大量的各种事件的产生以及对应的处理逻辑,各种响应方法使代码更加的混乱和复杂,而RxSwift是一个统一的处理各种响应事件的方式
Observable的创建和订阅
Subjects的使用
Combination:Observable的混合操作
Transforming:Observable的转换操作
Filtering:Observable消息元素的过滤操作
对Observable元素做运算操作
Connectable操作
错误处理
debug
这些都是Observable的方法,参数都是闭包,闭包是观察者
subscribe(on:(Event) -> void):订阅所有消息(Next, Error, and Completed)
subscribeNext((Element) -> void):只订阅Next
subscribeError((ErrorType) -> void):只订阅Error
subscribeCompleted(() -> Void):只订阅Completed
subscribe(onNext:(Element) -> void, onError:(ErrorType) -> void, onCompleted:() -> Void, onDisposed:() -> Void)订阅多个消息
订阅者可以通过调用.dispose()来释放分配的资源,但通过DisposeBag来管理或者通过takeUntil来自动释放更好
let disposeBag = DisposeBag()subscription.addDisposableTo(disposeBag)
或
sequence .takeUntil(self.rx_deallocated) .subscribe { print($0) }
Observable序列分为两类:
冷:只有当有观察者订阅这个序列时,序列才发射值
热:序列创建时就开始发射值
let disposeBag = DisposeBag()let neverSequence = Observable.never()let neverSequenceSubscription = neverSequence .subscribe { _ in print("This will never be printed") }neverSequenceSubscription.addDisposableTo(disposeBag)
let disposeBag = DisposeBag()Observable.empty() .subscribe { event in print(event) } .addDisposableTo(disposeBag)
output: Completed
let disposeBag = DisposeBag()Observable.just(":red_circle:") .subscribe { event in print(event) } .addDisposableTo(disposeBag)
注:如果传递null给just,它将返回一个发送null消息的Observable,不要传入错误的参数,否则将会得到一个空的Observable
output:
Next(:red_circle:)
Completed
let disposeBag = DisposeBag()Observable.of(":dog:", ":cat:", ":mouse:", ":hamster:") .subscribeNext { element in print(element) } .addDisposableTo(disposeBag)
output:
:dog:
:cat:
:mouse:
:hamster:
let disposeBag = DisposeBag()let myJust = { (element: String) -> Observablein return Observable.create { observer in observer.on(.Next(element)) observer.on(.Completed) return NopDisposable.instance } } myJust(":red_circle:") .subscribe { print($0) } .addDisposableTo(disposeBag)
output:
Next(:red_circle:)
Completed
let disposeBag = DisposeBag()Observable.range(start: 1, count: 10) .subscribe { print($0) } .addDisposableTo(disposeBag)
output:
Next(1)
Next(2)
Next(3)
Next(4)
Next(5)
Next(6)
Next(7)
Next(8)
Next(9)
Next(10)
Completed
let disposeBag = DisposeBag()Observable.repeatElement(":red_circle:") .take(3) .subscribeNext { print($0) } .addDisposableTo(disposeBag)
output:
:red_circle:
:red_circle:
:red_circle:
注:take可以用于所有Observable指定限制元素个数
let disposeBag = DisposeBag() Observable.generate( initialState: 0, condition: { $0 < 3 }, iterate: { $0 + 1 } ) .subscribeNext { print($0) } .addDisposableTo(disposeBag)
output:
0
1
2
let disposeBag = DisposeBag()var count = 1let deferredSequence = Observable.deferred { print("Creating \(count)") count += 1 return Observable.create { observer in print("Emitting...") observer.onNext(":dog:") observer.onNext(":cat:") observer.onNext(":monkey_face:") return NopDisposable.instance } }deferredSequence .subscribeNext { print($0) } .addDisposableTo(disposeBag)deferredSequence .subscribeNext { print($0) } .addDisposableTo(disposeBag)
注:deferred序列只有在一个观察者订阅它的时候才执行它的创建Observable方法,产生一个全新的Observable**
output:
Creating 1
Emitting…
:dog:
:cat:
:monkey_face:
Creating 2
Emitting…
:dog:
:cat:
:monkey_face:
let disposeBag = DisposeBag()Observable.error(Error.Test) .subscribe { print($0) } .addDisposableTo(disposeBag)
output: Error(Test)
let disposeBag = DisposeBag()Observable.of(":apple:", ":pear:", ":tangerine:", ":lemon:") .doOn { print("Intercepted:", $0) } .subscribeNext { print($0) } .addDisposableTo(disposeBag)
注: doOn(onNext:onError:onCompleted:)为不同订阅方式分别指定
output:
Intercepted: Next(:apple:)
:apple:
Intercepted: Next(:pear:)
:pear:
Intercepted: Next(:tangerine:)
:tangerine:
Intercepted: Next(:lemon:)
:lemon:
Intercepted: Completed
Subjects理解为observer和Observable之间的桥梁或代理,即扮演着observer又扮演着Observable,规定了添加的observer如何接收消息
let disposeBag = DisposeBag()let subject = PublishSubject()subject.addObserver("1").addDisposableTo(disposeBag)subject.onNext(":dog:")subject.onNext(":cat:")subject.addObserver("2").addDisposableTo(disposeBag)subject.onNext(":a:")subject.onNext(":b:")
output:
Subscription: 1 Event: Next(:dog:)
Subscription: 1 Event: Next(:cat:)
Subscription: 1 Event: Next(:a:)
Subscription: 2 Event: Next(:a:)
Subscription: 1 Event: Next(:b:)
Subscription: 2 Event: Next(:b:)
ReplaySubject有一个缓存机制,可以在创建时通过指定bufferSize指定缓存大小或调用buffer方法指定更详细的缓存条件来指定新添加的订阅者可以接收多少订阅前的消息
let disposeBag = DisposeBag()let subject = ReplaySubject.create(bufferSize: 1)subject.addObserver("1").addDisposableTo(disposeBag)subject.onNext(":dog:")subject.onNext(":cat:")subject.addObserver("2").addDisposableTo(disposeBag)subject.onNext(":a:")subject.onNext(":b:")
output:
Subscription: 1 Event: Next(:dog:)
Subscription: 1 Event: Next(:cat:)
Subscription: 2 Event: Next(:cat:)
Subscription: 1 Event: Next(:a:)
Subscription: 2 Event: Next(:a:)
Subscription: 1 Event: Next(:b:)
Subscription: 2 Event: Next(:b:)
let disposeBag = DisposeBag()let subject = BehaviorSubject(value: ":red_circle:")subject.addObserver("1").addDisposableTo(disposeBag)subject.onNext(":dog:")subject.onNext(":cat:")subject.addObserver("2").addDisposableTo(disposeBag)subject.onNext(":a:")subject.onNext(":b:")subject.addObserver("3").addDisposableTo(disposeBag)subject.onNext(":pear:")subject.onNext(":tangerine:")
note:以上都不会自动发送Completed当它们被释放的时候
output:
Subscription: 1 Event: Next(:red_circle:)
Subscription: 1 Event: Next(:dog:)
Subscription: 1 Event: Next(:cat:)
Subscription: 2 Event: Next(:cat:)
Subscription: 1 Event: Next(:a:)
Subscription: 2 Event: Next(:a:)
Subscription: 1 Event: Next(:b:)
Subscription: 2 Event: Next(:b:)
Subscription: 3 Event: Next(:b:)
Subscription: 1 Event: Next(:pear:)
Subscription: 2 Event: Next(:pear:)
Subscription: 3 Event: Next(:pear:)
Subscription: 1 Event: Next(:tangerine:)
Subscription: 2 Event: Next(:tangerine:)
Subscription: 3 Event: Next(:tangerine:)
let disposeBag = DisposeBag()let variable = Variable(":red_circle:")variable.asObservable().addObserver("1").addDisposableTo(disposeBag)variable.value = ":dog:"variable.value = ":cat:"variable.asObservable().addObserver("2").addDisposableTo(disposeBag)variable.value = ":a:"variable.value = ":b:"
注:variable.asObservable()实际是获取variable中的BehaviorSubject。variable也没有onNext,而是通过value来获取或添加元素,它会添加元素到BehaviorSubject
output:
Subscription: 1 Event: Next(:red_circle:)
Subscription: 1 Event: Next(:dog:)
Subscription: 1 Event: Next(:cat:)
Subscription: 2 Event: Next(:cat:)
Subscription: 1 Event: Next(:a:)
Subscription: 2 Event: Next(:a:)
Subscription: 1 Event: Next(:b:)
Subscription: 2 Event: Next(:b:)
Subscription: 1 Event: Completed
Subscription: 2 Event: Completed
startWith()分为原Observable和新Observable,并且在发送原Observable元素前会先发送完新Observable元素,有点像栈
let disposeBag = DisposeBag() Observable.of(":dog:", ":cat:", ":mouse:", ":hamster:") .startWith(":one:") .startWith(":two:") .startWith(":three:", ":a:", ":b:") .subscribeNext { print($0) } .addDisposableTo(disposeBag)
output:
:three:
:a:
:b:
:two:
:one:
:dog:
:cat:
:mouse:
:hamster:
http://reactivex.io/documentation/operators/startwith.html
let disposeBag = DisposeBag()let subject1 = PublishSubject()let subject2 = PublishSubject()let subject3 = PublishSubject()Observable.of(subject1, subject2, subject3) .merge() .subscribeNext { print($0) } .addDisposableTo(disposeBag)subject1.onNext(":a:")subject1.onNext(":b:")subject2.onNext("①")subject2.onNext("②")subject1.onNext(":ab:")subject3.onNext(":cat:")subject2.onNext("③")
output:
:a:
:b:
①
②
:ab:
③
http://reactivex.io/documentation/operators/merge.html
let disposeBag = DisposeBag()let stringSubject = PublishSubject()let intSubject = PublishSubject()Observable.zip(stringSubject, intSubject) { stringElement, intElement in "\(stringElement) \(intElement)" } .subscribeNext { print($0) } .addDisposableTo(disposeBag)stringSubject.onNext(":a:")stringSubject.onNext(":b:")intSubject.onNext(1)intSubject.onNext(2)stringSubject.onNext(":ab:")intSubject.onNext(3)
output:
:a: 1
:b: 2
:ab: 3
http://reactivex.io/documentation/operators/zip.html
let disposeBag = DisposeBag()let stringSubject = PublishSubject()let intSubject = PublishSubject()Observable.combineLatest(stringSubject, intSubject) { stringElement, intElement in "\(stringElement) \(intElement)" } .subscribeNext { print($0) } .addDisposableTo(disposeBag)stringSubject.onNext(":a:")stringSubject.onNext(":b:")intSubject.onNext(1)intSubject.onNext(2)stringSubject.onNext(":ab:")
output:
:b: 1
:b: 2
:ab: 2
http://reactivex.io/documentation/operators/combinelatest.html
在数组上的应用:
let disposeBag = DisposeBag()let stringObservable = Observable.just(":heart:")let fruitObservable = [":apple:", ":pear:", ":tangerine:"].toObservable()let animalObservable = Observable.of(":dog:", ":cat:", ":mouse:", ":hamster:")[stringObservable, fruitObservable, animalObservable].combineLatest { "\($0[0]) \($0[1]) \($0[2])" } .subscribeNext { print($0) } .addDisposableTo(disposeBag)
output:
:heart: :tangerine: :dog:
:heart: :tangerine: :cat:
:heart: :tangerine: :mouse:
:heart: :tangerine: :hamster:
switchLatest()可以将多个Observable序列合并成一个一维的Observable序列,只合并当前关注的Observable序列最近的消息
let disposeBag = DisposeBag()let subject1 = BehaviorSubject(value: ":soccer:️")let subject2 = BehaviorSubject(value: ":apple:")let variable = Variable(subject1)variable.asObservable() .switchLatest() .subscribeNext { print($0) } .addDisposableTo(disposeBag)subject1.onNext(":football:")subject1.onNext(":basketball:")variable.value = subject2subject1.onNext(":baseball:")subject1.onNext(":tennis:")subject2.onNext(":pear:")variable.value = subject1
output:
:soccer:️
:football:
:basketball:
:apple:
:pear:
:tennis:
note::soccer:️ 被忽略
let disposeBag = DisposeBag()Observable.of(1, 2, 3) .map { $0 * $0 } .subscribeNext { print($0) } .addDisposableTo(disposeBag)
output:
1
4
9
http://reactivex.io/documentation/operators/map.html
let disposeBag = DisposeBag()Observable.of(10, 100, 1000) .scan(1) { aggregateValue, newValue in aggregateValue + newValue } .subscribeNext { print($0) } .addDisposableTo(disposeBag)
output:
11
111
1111
http://reactivex.io/documentation/operators/scan.html
let disposeBag = DisposeBag()Observable.of( ":cat:", ":rabbit:", ":dog:", ":frog:", ":cat:", ":rabbit:", ":hamster:", ":frog:", ":cat:") .filter { $0 == ":cat:" } .subscribeNext { print($0) } .addDisposableTo(disposeBag)
output:
:cat:
:cat:
:cat:
http://reactivex.io/documentation/operators/filter.html
let disposeBag = DisposeBag()Observable.of(":cat:", ":pig:", ":cat:", ":cat:", ":cat:", ":monkey_face:", ":cat:") .distinctUntilChanged() .subscribeNext { print($0) } .addDisposableTo(disposeBag)
output:
:cat:
:pig:
:cat:
:monkey_face:
:cat:
http://reactivex.io/documentation/operators/distinct.html
let disposeBag = DisposeBag()Observable.of(":cat:", ":rabbit:", ":dog:", ":frog:", ":pig:", ":monkey_face:") .elementAt(3) .subscribeNext { print($0) } .addDisposableTo(disposeBag)
output:
:frog:
http://reactivex.io/documentation/operators/elementat.html
single()不传参数则发送Observable的第一个元素,否则为满足条件表达式的第一个元素,如果没有发送一个确切的元素,将发送一个 Error消息
let disposeBag = DisposeBag()Observable.of(":cat:", ":rabbit:", ":dog:", ":frog:", ":pig:", ":monkey_face:") .single{ $0 <= 6}//如果是==这种确切的判断,将没有Error消息而是Completed消息 .subscribeNext { print($0) } .addDisposableTo(disposeBag)
output:
Next(1)
Error(Sequence contains more than one element.)
let disposeBag = DisposeBag()Observable.of(":cat:", ":rabbit:", ":dog:", ":frog:", ":pig:", ":monkey_face:") .take(3) .subscribeNext { print($0) } .addDisposableTo(disposeBag)
output:
:cat:
:rabbit:
:dog:
http://reactivex.io/documentation/operators/take.html
let disposeBag = DisposeBag()Observable.of(":cat:", ":rabbit:", ":dog:", ":frog:", ":pig:", ":monkey_face:") .takeLast(3) .subscribeNext { print($0) } .addDisposableTo(disposeBag)
output:
:frog:
:pig:
:monkey_face:
http://reactivex.io/documentation/operators/takelast.html
let disposeBag = DisposeBag()Observable.of(1, 2, 3, 4, 5, 6) .takeWhile { $0 < 4 } .subscribeNext { print($0) } .addDisposableTo(disposeBag)
output:
1
2
3
let disposeBag = DisposeBag()let sourceSequence = PublishSubject()let referenceSequence = PublishSubject()sourceSequence .takeUntil(referenceSequence) .subscribe { print($0) } .addDisposableTo(disposeBag)sourceSequence.onNext(":cat:")sourceSequence.onNext(":rabbit:")sourceSequence.onNext(":dog:")referenceSequence.onNext(":red_circle:")sourceSequence.onNext(":frog:")sourceSequence.onNext(":pig:")sourceSequence.onNext(":monkey_face:")
output:
Next(:cat:)
Next(:rabbit:)
Next(:dog:)
Completed
http://reactivex.io/documentation/operators/takeuntil.html
let disposeBag = DisposeBag()Observable.of(":cat:", ":rabbit:", ":dog:", ":frog:", ":pig:", ":monkey_face:") .skip(2) .subscribeNext { print($0) } .addDisposableTo(disposeBag)
output:
:dog:
:frog:
:pig:
:monkey_face:
http://reactivex.io/documentation/operators/skip.html
let disposeBag = DisposeBag()Observable.of(1, 2, 3, 4, 5, 6) .skipWhile { $0 < 4 } .subscribeNext { print($0) } .addDisposableTo(disposeBag)
output:
4
5
6
let disposeBag = DisposeBag()Observable.of(":cat:", ":rabbit:", ":dog:", ":frog:", ":pig:", ":monkey_face:") .skipWhileWithIndex { element, index in index < 3 } .subscribeNext { print($0) } .addDisposableTo(disposeBag)
output:
:frog:
:pig:
:monkey_face:
let disposeBag = DisposeBag()let sourceSequence = PublishSubject()let referenceSequence = PublishSubject()sourceSequence .skipUntil(referenceSequence) .subscribeNext { print($0) } .addDisposableTo(disposeBag)sourceSequence.onNext(":cat:")sourceSequence.onNext(":rabbit:")sourceSequence.onNext(":dog:")referenceSequence.onNext(":red_circle:")sourceSequence.onNext(":frog:")sourceSequence.onNext(":pig:")sourceSequence.onNext(":monkey_face:")
output:
:frog:
:pig:
:monkey_face:
http://reactivex.io/documentation/operators/skipuntil.html
let disposeBag = DisposeBag()Observable.range(start: 1, count: 10) .toArray() .subscribe { print($0) } .addDisposableTo(disposeBag)
output:
Next([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
Completed
let disposeBag = DisposeBag()Observable.of(10, 100, 1000) .reduce(1, accumulator: +) .subscribeNext { print($0) } .addDisposableTo(disposeBag)
output: =
1111
http://reactivex.io/documentation/operators/reduce.html
concat()将一个Observable序列的内部Observable序列串联起来,且同一时间只操作一个序列,只有当前序列Completed后,才开始串联下一个序列的前一个元素及之后的元素
let disposeBag = DisposeBag()let subject1 = BehaviorSubject(value: ":apple:")let subject2 = BehaviorSubject(value: ":dog:")let variable = Variable(subject1)variable.asObservable() .concat() .subscribe { print($0) } .addDisposableTo(disposeBag)subject1.onNext(":pear:")subject1.onNext(":tangerine:")variable.value = subject2subject2.onNext("I would be ignored")subject2.onNext(":cat:")subject1.onNext(":tropical_drink:")subject1.onCompleted()subject2.onNext(":mouse:")
output:
Next(:apple:)
Next(:pear:)
Next(:tangerine:)
Next(:tropical_drink:)
Next(:cat:)
Next(:mouse:)
http://reactivex.io/documentation/operators/concat.html
Connectable操作,Connectable Observable操作跟普通的Observable区别在于,Connectable Observable只有在它们的connect()方法调用后才开始发送元素,因此可以等到所有订阅者都订阅后才开始发送元素,有点像事务一样
printExampleHeader(#function)let intSequence = Observable.interval(1, scheduler: MainScheduler.instance) .publish()_ = intSequence .subscribeNext { print("Subscription 1:, Event: \($0)") }delay(2) { intSequence.connect() }delay(4) { _ = intSequence .subscribeNext { print("Subscription 2:, Event: \($0)") }}delay(6) { _ = intSequence .subscribeNext { print("Subscription 3:, Event: \($0)") }
output:
delay 2
Subscription 1:, Event: 0
Subscription 1:, Event: 1
Subscription 2:, Event: 1
Subscription 1:, Event: 2
Subscription 2:, Event: 2
Subscription 1:, Event: 3
Subscription 2:, Event: 3
Subscription 3:, Event: 3
Subscription 1:, Event: 4
Subscription 2:, Event: 4
Subscription 3:, Event: 4
replay()相对于publish增加了bufferSize指定对元素的缓存大小,这样新加入的订阅者可以获取相应个数的已发送的元素
printExampleHeader(#function)let intSequence = Observable.interval(1, scheduler: MainScheduler.instance) .replay(5)_ = intSequence .subscribeNext { print("Subscription 1:, Event: \($0)") }delay(2) { intSequence.connect() }delay(4) { _ = intSequence .subscribeNext { print("Subscription 2:, Event: \($0)") }}delay(8) { _ = intSequence .subscribeNext { print("Subscription 3:, Event: \($0)") }}
output:
delay 2
Subscription 1:, Event: 0
Subscription 2:, Event: 0
Subscription 1:, Event: 1
Subscription 2:, Event: 1
Subscription 1:, Event: 2
Subscription 2:, Event: 2
Subscription 1:, Event: 3
Subscription 2:, Event: 3
Subscription 1:, Event: 4
Subscription 2:, Event: 4
Subscription 3:, Event: 0
Subscription 3:, Event: 1
Subscription 3:, Event: 2
Subscription 3:, Event: 3
Subscription 3:, Event: 4
Subscription 1:, Event: 5
Subscription 2:, Event: 5
Subscription 3:, Event: 5
printExampleHeader(#function)let subject = PublishSubject()_ = subject .subscribeNext { print("Subject: \($0)") }let intSequence = Observable.interval(1, scheduler: MainScheduler.instance) .multicast(subject)_ = intSequence .subscribeNext { print("\tSubscription 1:, Event: \($0)") }delay(2) { intSequence.connect() }delay(4) { _ = intSequence .subscribeNext { print("\tSubscription 2:, Event: \($0)") }}delay(6) { _ = intSequence .subscribeNext { print("\tSubscription 3:, Event: \($0)") }}
output:
delay 2
Subject: 0
Subscription 1:, Event: 0
Subject: 1
Subscription 1:, Event: 1
Subscription 2:, Event: 1
Subject: 2
Subscription 1:, Event: 2
Subscription 2:, Event: 2
Subject: 3
Subscription 1:, Event: 3
Subscription 2:, Event: 3
Subscription 3:, Event: 3
Subject: 4
Subscription 1:, Event: 4
Subscription 2:, Event: 4
Subscription 3:, Event: 4
Subject: 5
Subscription 1:, Event: 5
Subscription 2:, Event: 5
Subscription 3:, Event: 5
catchErrorJustReturn()通过返回一个只发送一个元素的Observable序列来捕获错误信息,然后 Completed
let disposeBag = DisposeBag()let sequenceThatFails = PublishSubject()sequenceThatFails .catchErrorJustReturn(":blush:") .subscribe { print($0) } .addDisposableTo(disposeBag)sequenceThatFails.onNext(":grimacing:")sequenceThatFails.onNext(":fearful:")sequenceThatFails.onNext(":rage:")sequenceThatFails.onNext(":red_circle:")sequenceThatFails.onError(Error.Test)
output:
Next(:grimacing:)
Next(:fearful:)
Next(:rage:)
Next(:red_circle:)
Next(:blush:)
Completed
let disposeBag = DisposeBag()let sequenceThatErrors = PublishSubject()let recoverySequence = PublishSubject()sequenceThatErrors .catchError { print("Error:", $0) return recoverySequence } .subscribe { print($0) } .addDisposableTo(disposeBag)sequenceThatErrors.onNext(":grimacing:")sequenceThatErrors.onNext(":fearful:")sequenceThatErrors.onNext(":rage:")sequenceThatErrors.onNext(":red_circle:")sequenceThatErrors.onError(Error.Test)recoverySequence.onNext(":blush:")
output:
Next(:grimacing:)
Next(:fearful:)
Next(:rage:)
Next(:red_circle:)
Error: Test
Next(:blush:)
retry()当遇到error后发送一条error消息然后重新重头发送元素,通过传入一个整数可以指定重复次数
let disposeBag = DisposeBag() var count = 1let sequenceThatErrors = Observable.create { observer in observer.onNext(":apple:") observer.onNext(":pear:") observer.onNext(":tangerine:") if count == 1 { observer.onError(Error.Test) print("Error encountered") count += 1 } observer.onNext(":dog:") observer.onNext(":cat:") observer.onNext(":mouse:") observer.onCompleted() return NopDisposable.instance}sequenceThatErrors .retry() .subscribeNext { print($0) } .addDisposableTo(disposeBag)
output:
:apple:
:pear:
:tangerine:
Error encountered
:apple:
:pear:
:tangerine:
:dog:
:cat:
:mouse: