我用rxbus连续发送大量消息报错
08-26 09:37:13.458 10637-10637/com.dituwuyou E/AndroidRuntime: FATAL EXCEPTION: main
Process: com.dituwuyou, PID: 10637
java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling.
at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:112)
at android.os.Handler.handleCallback(Handler.java:739)
at android.os.Handler.dispatchMessage(Handler.java:95)
at android.os.Looper.loop(Looper.java:148)
at android.app.ActivityThread.main(ActivityThread.java:5417)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:726)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:616)
Caused by: rx.exceptions.OnErrorNotImplementedException: PublishSubject: could not emit value due to lack of requests
at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:386)
at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:383)
at rx.internal.util.ActionSubscriber.onError(ActionSubscriber.java:44)
at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:152)
at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:115)
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.checkTerminated(OperatorObserveOn.java:276)
at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:219)
at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:107)
at android.os.Handler.handleCallback(Handler.java:739)
at android.os.Handler.dispatchMessage(Handler.java:95)
at android.os.Looper.loop(Looper.java:148)
at android.app.ActivityThread.main(ActivityThread.java:5417)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:726)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:616)
Caused by: rx.exceptions.MissingBackpressureException: PublishSubject: could not emit value due to lack of requests
at rx.subjects.PublishSubject$PublishSubjectProducer.onNext(PublishSubject.java:308)
at rx.subjects.PublishSubject$PublishSubjectState.onNext(PublishSubject.java:220)
at rx.subjects.PublishSubject.onNext(PublishSubject.java:73)
at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
at rx.subjects.SerializedSubject.onNext(SerializedSubject.java:67)
at com.dituwuyou.widget.rxjava.RxBus.send(RxBus.java:45)
at com.dituwuyou.joint.CoorSocketService.messageReceived(CoorSocketService.java:51)
at com.dituwuyou.fayeclient.FayeClient.parseFayeMessage(FayeClient.java:535)
at com.dituwuyou.fayeclient.FayeClient.onMessage(FayeClient.java:390)
at com.dituwuyou.fayeclient.HybiParser.emitFrame(HybiParser.java:304)
at com.dituwuyou.fayeclient.HybiParser.start(HybiParser.java:130)
at com.dituwuyou.fayeclient.WebSocketClient$1.run(WebSocketClient.java:119)
at java.lang.Thread.run(Thread.java:818)
我的rxbus是这样定义的
import rx.Observable;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;
/**
* Created by xg on 2016/6/24.
* 消息传递(替换handler,eventbus)
*/
public class RxBus {
private static volatile RxBus mInstance;
private final Subject bus;
public RxBus() {
bus = new SerializedSubject<>(PublishSubject.create());
}
/**
* 单例模式RxBus
*
* @return
*/
public static RxBus getRxBusSingleton() {
RxBus rxBus2 = mInstance;
if (mInstance == null) {
synchronized (RxBus.class) {
rxBus2 = mInstance;
if (mInstance == null) {
rxBus2 = new RxBus();
mInstance = rxBus2;
}
}
}
return rxBus2;
}
/**
* 发送消息
*
* @param object
*/
public void send(Object object) {
bus.onNext(object);
}
/**
* 接收消息
*
* @return
*/
public Observable toObserverable() {
return bus;
}
}
这是第45行
bus.onNext(object);
应该是出现背压了
伊谢尔伦2017-04-17 17:53:28
使用している RxJava のバージョンは何ですか? バージョン 1.1.6 には rx.subjects.PublishSubject$PublishSubjectProducer クラスがありません