recherche

Maison  >  Questions et réponses  >  le corps du texte

android - rxbus连续发送消息报错

我用rxbus连续发送大量消息报错

08-26 09:37:13.458 10637-10637/com.dituwuyou E/AndroidRuntime: FATAL EXCEPTION: main

                                                           Process: com.dituwuyou, PID: 10637
                                                           java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling.
                                                               at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:112)
                                                               at android.os.Handler.handleCallback(Handler.java:739)
                                                               at android.os.Handler.dispatchMessage(Handler.java:95)
                                                               at android.os.Looper.loop(Looper.java:148)
                                                               at android.app.ActivityThread.main(ActivityThread.java:5417)
                                                               at java.lang.reflect.Method.invoke(Native Method)
                                                               at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:726)
                                                               at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:616)
                                                            Caused by: rx.exceptions.OnErrorNotImplementedException: PublishSubject: could not emit value due to lack of requests
                                                               at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:386)
                                                               at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:383)
                                                               at rx.internal.util.ActionSubscriber.onError(ActionSubscriber.java:44)
                                                               at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:152)
                                                               at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:115)
                                                               at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.checkTerminated(OperatorObserveOn.java:276)
                                                               at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:219)
                                                               at rx.android.schedulers.LooperScheduler$ScheduledAction.run(LooperScheduler.java:107)
                                                               at android.os.Handler.handleCallback(Handler.java:739) 
                                                               at android.os.Handler.dispatchMessage(Handler.java:95) 
                                                               at android.os.Looper.loop(Looper.java:148) 
                                                               at android.app.ActivityThread.main(ActivityThread.java:5417) 
                                                               at java.lang.reflect.Method.invoke(Native Method) 
                                                               at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:726) 
                                                               at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:616) 
                                                            Caused by: rx.exceptions.MissingBackpressureException: PublishSubject: could not emit value due to lack of requests
                                                               at rx.subjects.PublishSubject$PublishSubjectProducer.onNext(PublishSubject.java:308)
                                                               at rx.subjects.PublishSubject$PublishSubjectState.onNext(PublishSubject.java:220)
                                                               at rx.subjects.PublishSubject.onNext(PublishSubject.java:73)
                                                               at rx.observers.SerializedObserver.onNext(SerializedObserver.java:92)
                                                               at rx.subjects.SerializedSubject.onNext(SerializedSubject.java:67)
                                                               at com.dituwuyou.widget.rxjava.RxBus.send(RxBus.java:45)
                                                               at com.dituwuyou.joint.CoorSocketService.messageReceived(CoorSocketService.java:51)
                                                               at com.dituwuyou.fayeclient.FayeClient.parseFayeMessage(FayeClient.java:535)
                                                               at com.dituwuyou.fayeclient.FayeClient.onMessage(FayeClient.java:390)
                                                               at com.dituwuyou.fayeclient.HybiParser.emitFrame(HybiParser.java:304)
                                                               at com.dituwuyou.fayeclient.HybiParser.start(HybiParser.java:130)
                                                               at com.dituwuyou.fayeclient.WebSocketClient$1.run(WebSocketClient.java:119)
                                                               at java.lang.Thread.run(Thread.java:818)

我的rxbus是这样定义的

import rx.Observable;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;

/**
 * Created by xg on 2016/6/24.
 * 消息传递(替换handler,eventbus)
 */
public class RxBus {
    private static volatile RxBus mInstance;
    private final Subject bus;

    public RxBus() {
        bus = new SerializedSubject<>(PublishSubject.create());
    }

    /**
     * 单例模式RxBus
     *
     * @return
     */
    public static RxBus getRxBusSingleton() {
        RxBus rxBus2 = mInstance;
        if (mInstance == null) {
            synchronized (RxBus.class) {
                rxBus2 = mInstance;
                if (mInstance == null) {
                    rxBus2 = new RxBus();
                    mInstance = rxBus2;
                }
            }
        }
        return rxBus2;
    }

    /**
     * 发送消息
     *
     * @param object
     */
    public void send(Object object) {
        bus.onNext(object);
    }

    /**
     * 接收消息
     *
     * @return
     */
    public Observable toObserverable() {
        return bus;
    }
}

这是第45行
bus.onNext(object);
应该是出现背压了

怪我咯怪我咯2772 Il y a quelques jours1457

répondre à tous(2)je répondrai

  • 迷茫

    迷茫2017-04-17 17:53:28

    1.看不懂你为什么要加个rxBus2
    2.要处理onError的情况
    3.先注册 后send

    répondre
    0
  • 伊谢尔伦

    伊谢尔伦2017-04-17 17:53:28

    用的什么版本的RxJava. 在1.1.6版本中已经没有rx.subjects.PublishSubject$PublishSubjectProducer这个类了

    répondre
    0
  • Annulerrépondre