Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

How to Make an Event Bus With RxJava and RxAndroid

DZone's Guide to

How to Make an Event Bus With RxJava and RxAndroid

With the additions of RxJava and RxAndroid to the Android ecosystem, you don’t need to rely on Otto to help different parts of your app communicate anymore.

· Mobile Zone
Free Resource

Download this comprehensive Mobile Testing Reference Guide to help prioritize which mobile devices and OSs to test against, brought to you in partnership with Sauce Labs.

If you’ve ever needed to communicate between different parts of your application, it can be painful. To alleviate this, you can use an event bus like Otto. However, with the additions of RxJava and RxAndroid to the Android ecosystem, you don’t need to rely on Otto anymore. Otto is actually deprecated in favor of these newer libraries, since making your own event bus with them is actually quite easy.

I came up with my own solution that works well for my purposes. You can use it as is if you want, or tweak it to fit your needs.

First Attempt

If you just want to pass arbitrary data around your app this is all you need:

public final class RxBus {

    private static PublishSubject<Object> sSubject = PublishSubject.create();

    private RxBus() {
        // hidden constructor
    }


    public static Subscription subscribe(@NonNull Action1<Object> action) {
        return sSubject.subscribe(action);
    }

    public static void publish(@NonNull Object message) {
        sSubject.onNext(message);
    }
}


//Example usage below:

public class MyActivity extends Activity {

    @Override
    protected void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity);

        //Using Retrolambda
        RxBus.subscribe((message) -> {
            if (message instanceof Data) {
                Data data = (Data) cityObject;
                Log.v("Testing", data.getInfo());
            }
        });
    }
}

public class MyFragment extends Fragment {

    @Nullable
    @Override
    public View onCreateView(LayoutInflater inflater, @Nullable ViewGroup container, @Nullable Bundle savedInstanceState) {
        View v = inflater.inflate(R.layout.fragment, container, false);

        //Using Retrolambda
        v.findViewById(R.id.view).setOnClickListener(view -> {
            RxBus.publish(new Data("Hello World"));
        });

        return v;
    }

}

Using a PublishSubject means that a new subscriber will only receive events emitted after they subscribed. It will not repeat old events to new subscribers.

This simple solution has a few problems, though. You can’t pick what kind of events your subscriber will receive, it’s going to see everything. The only way to ensure you get the data you want, is to define a new class for each event, such as XYZDataDownloadedEvent or PurpleButtonClickedEvent. Then, you have to do an instanceof check and cast it. Personally, I don’t like having to create a new class for each type of event I want to broadcast.

Also, this solution can cause memory leaks if you don’t unsubscribe from each subscription. Ideally, I want to be able to publish an event and subscribe to updates for that event. I don’t want to have to manage subscriptions in each place I subscribe.

Second Attempt

To address these issues, my next iteration looked like this:

public final class RxBus {

    private static Map<String, PublishSubject<Object>> sSubjectMap = new HashMap<>();
    private static Map<Object, CompositeSubscription> sSubscriptionsMap = new HashMap<>();

    private RxBus() {
        // hidden constructor
    }

    /**
     * Get the subject or create it if it's not already in memory.
     */
    @NonNull
    private static PublishSubject<Object> getSubject(String subjectKey) {
        PublishSubject<Object> subject = sSubjectMap.get(subjectKey);
        if (subject == null) {
            subject = PublishSubject.create();
            subject.subscribeOn(AndroidSchedulers.mainThread());
            sSubjectMap.put(subjectKey, subject);
        }

        return subject;
    }

    /**
     * Get the CompositeSubscription or create it if it's not already in memory.
     */
    @NonNull
    private static CompositeSubscription getCompositeSubscription(@NonNull Object object) {
        CompositeSubscription compositeSubscription = sSubscriptionsMap.get(object);
        if (compositeSubscription == null) {
            compositeSubscription = new CompositeSubscription();
            sSubscriptionsMap.put(object, compositeSubscription);
        }

        return compositeSubscription;
    }

    /**
     * Subscribe to the specified subject and listen for updates on that subject. Pass in an object to associate
     * your registration with, so that you can unsubscribe later.
     * <br/><br/>
     * <b>Note:</b> Make sure to call {@link RxBus#unregister(Object)} to avoid memory leaks.
     */
    public static void subscribe(String subject, @NonNull Object lifecycle, @NonNull Action1<Object> action) {
        Subscription subscription = getSubject(subject).subscribe(action);
        getCompositeSubscription(lifecycle).add(subscription);
    }

    /**
     * Unregisters this object from the bus, removing all subscriptions.
     * This should be called when the object is going to go out of memory.
     */
    public static void unregister(@NonNull Object lifecycle) {
        //We have to remove the composition from the map, because once you unsubscribe it can't be used anymore
        CompositeSubscription compositeSubscription = sSubscriptionsMap.remove(lifecycle);
        if (compositeSubscription != null) {
            compositeSubscription.unsubscribe();
        }
    }

    /**
     * Publish an object to the specified subject for all subscribers of that subject.
     */
    public static void publish(String subject, @NonNull Object message) {
        getSubject(subject).onNext(message);
    }
}

I added a CompositeSubscription to manage all of the subscriptions for a given object (typically an Activity or Fragment). I also added a map to keep track of the different types of subjects. That way, I can easily keep track of all of the subscriptions and subjects.

Also, to simplify management of unsubscribing and keeping a reference to those subscriptions, I created a BaseActivity and BaseFragment.

public abstract class BaseFragment extends Fragment {

    @Override
    public void onDestroy() {
        super.onDestroy();
        RxBus.unregister(this);
    }
}

Calling RxBus.unregister(this) in onDestroy() is all that’s required for cleanup. If there are subscriptions associated to that object they will be unsubscribed and removed. I also wrote comments to make it clear that if you subscribe, you need to call unregister. In case you aren’t using a base class that handles it or subscribe to the bus from somewhere else.

Whenever I create a new subject, it’s set to be subscribed to the main thread. For my purposes, all of the events being posted will trigger UI updates. You could always extend it to allow for subscribing to different threads if you want. The current implementation makes it simple and covers the majority of use cases.

Final Implementation

The last change I made is to how you define what subject you are subscribed to. Initially, you could just pass in a String key, defined wherever you like. I like having all these keys organized in one place, though. Also, I wanted to limit the events you could subscribe and publish to. So, I changed the String parameter to an int and created a set of integer constants with the IntDef annotation. You can see my completed RxBus.java class below:

/**
 * Used for subscribing to and publishing to subjects. Allowing you to send data between activities, fragments, etc.
 * <p>
 * Created by Pierce Zaifman on 2017-01-02.
 */

public final class RxBus {

    private static SparseArray<PublishSubject<Object>> sSubjectMap = new SparseArray<>();
    private static Map<Object, CompositeSubscription> sSubscriptionsMap = new HashMap<>();

    public static final int SUBJECT_MY_SUBJECT = 0;
    public static final int SUBJECT_ANOTHER_SUBJECT = 1;

    @Retention(SOURCE)
    @IntDef({SUBJECT_MY_SUBJECT, SUBJECT_ANOTHER_SUBJECT})
    @interface Subject {
    }

    private RxBus() {
        // hidden constructor
    }

    /**
     * Get the subject or create it if it's not already in memory.
     */
    @NonNull
    private static PublishSubject<Object> getSubject(@Subject int subjectCode) {
        PublishSubject<Object> subject = sSubjectMap.get(subjectCode);
        if (subject == null) {
            subject = PublishSubject.create();
            subject.subscribeOn(AndroidSchedulers.mainThread());
            sSubjectMap.put(subjectCode, subject);
        }

        return subject;
    }

    /**
     * Get the CompositeSubscription or create it if it's not already in memory.
     */
    @NonNull
    private static CompositeSubscription getCompositeSubscription(@NonNull Object object) {
        CompositeSubscription compositeSubscription = sSubscriptionsMap.get(object);
        if (compositeSubscription == null) {
            compositeSubscription = new CompositeSubscription();
            sSubscriptionsMap.put(object, compositeSubscription);
        }

        return compositeSubscription;
    }

    /**
     * Subscribe to the specified subject and listen for updates on that subject. Pass in an object to associate
     * your registration with, so that you can unsubscribe later.
     * <br/><br/>
     * <b>Note:</b> Make sure to call {@link RxBus#unregister(Object)} to avoid memory leaks.
     */
    public static void subscribe(@Subject int subject, @NonNull Object lifecycle, @NonNull Action1<Object> action) {
        Subscription subscription = getSubject(subject).subscribe(action);
        getCompositeSubscription(lifecycle).add(subscription);
    }

    /**
     * Unregisters this object from the bus, removing all subscriptions.
     * This should be called when the object is going to go out of memory.
     */
    public static void unregister(@NonNull Object lifecycle) {
        //We have to remove the composition from the map, because once you unsubscribe it can't be used anymore
        CompositeSubscription compositeSubscription = sSubscriptionsMap.remove(lifecycle);
        if (compositeSubscription != null) {
            compositeSubscription.unsubscribe();
        }
    }

    /**
     * Publish an object to the specified subject for all subscribers of that subject.
     */
    public static void publish(@Subject int subject, @NonNull Object message) {
        getSubject(subject).onNext(message);
    }
}

Potential Issues

While working on this, I made a list of problems I had with the implementation. Some of which I believe can be addressed, others I’m not sure.

I’m still passing around objects which have to be cast to the correct type. I’m not sure if there’s a way around this, because the subject publishes Objects. So, the subscriber will only receive Objects.

You can pass in any object to associate your subscription with, so there’s no guarantee that you’ve actually unsubscribed. I tried to address this with my comments, saying that you must call unregister. But there’s no guarantee that it gets called, which will cause memory leaks.

The BaseActivity and BaseFragment unregister from the bus in onDestroy(). This means that if you start a new activity, the old activity will still be subscribed. So, if you publish an event that the previous activity is subscribed to, it may end up causing your app to crash with java.lang.IllegalStateException: Can not perform this action after onSaveInstanceState.

I didn’t want to call unregister in onStop() because if you go back to the previous activity, it won’t be subscribed anymore. If you are careful with how you manage your subjects, this won't be an issue. Ideally, the subscriptions would pause and resume with the lifecycle and get destroyed with the lifecycle.

Lastly, I’m using static members instead of the singleton pattern. Technically, I believe that using the singleton pattern is more memory efficient. Since it will only create the class when it needs to. However, in my case, since I’m using RxBus in onCreate() for most of my activities, it won’t really save anything. Plus, the amount of memory it uses is negligible. Some people also think that static variables are evil.

Final Thoughts

This solution isn’t perfect, but I felt it was a good compromise between complexity and ease of use.

Analysts agree that a mix of emulators/simulators and real devices are necessary to optimize your mobile app testing - learn more in this white paper, brought to you in partnership with Sauce Labs.

Topics:
rxandroid ,tutorial ,integration ,event bus ,rxjava

Published at DZone with permission of Pierce Zaifman, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}