Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

RxJava2: Continue Until Success

DZone's Guide to

RxJava2: Continue Until Success

Need to cycle through a list of data, hunting for your first success while ignoring exceptions thrown along the way? Here's how to do it with RxJava2.

· Java Zone ·
Free Resource

Verify, standardize, and correct the Big 4 + more – name, email, phone and global addresses – try our Data Quality APIs now at Melissa Developer Portal!

Let's start off with a problem statement.

There is a list of data. Among the list, some data satisfies a success condition while other data throws exceptions. You need to find the first success condition and continue. For example, there is a list of ports you would like to connect to, and you want to know which one is the first port that you can connect to. Once you are connected to a port, you want to stop trying to connect to other ports down in the list. Using RxJava2, I try to explain how this can be achieved.

Test ports:

        deviceList.add("Port_Address1");
        deviceList.add("Port_Address2");
        deviceList.add("Port_Address3");
        deviceList.add("Port_Address4");
        deviceList.add("Port_Address");
        deviceList.add("Port_Address5");
        deviceList.add("Port_Address6");


The test method helps in connecting to the port address:

    private String checkPortConnection(String deviceID) throws Exception{
        //code to validate port connection. 
        if("Port_Address".equals(deviceID) || "Port_Address5".equals(deviceID)){
            return deviceID;
        }
        throw new Exception("Not connected.");
    }


Let's start the RxJava process to iterate through the port list and start connecting to the ports until success is achieved:

           Observable observableName = Observable.fromIterable(deviceList);
            observableName
                    .flatMap(new Function<String, Observable<String>>() {
                        @Override
                        public Observable<String> apply(String device) throws Exception {
                            return Observable.create(new ObservableOnSubscribe<String>() {
                                @Override
                                public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {

                                    try {
                                        System.out.println("Test and connect to port connection:"+device);
                                        String portAddress = checkPortConnection(device);
                                        observableEmitter.onNext(portAddress);
                                        observableEmitter.onComplete();
                                    } catch (Exception e) {
                                        System.out.println("Problem connecting to port:"+device);
                                    }
                                }
                            });
                        }
                    })
                    .take(1)
                    .subscribe(getObserver());


And the overall test code is here:

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;


import java.util.ArrayList;
import java.util.List;

public class MainTest {

    List<String> deviceList = new ArrayList<>();

    void setTestPorts() {
        deviceList.add("Port_Address1");
        deviceList.add("Port_Address2");
        deviceList.add("Port_Address3");
        deviceList.add("Port_Address4");
        deviceList.add("Port_Address");
        deviceList.add("Port_Address5");
        deviceList.add("Port_Address6");
    }


    public void testPortConnection() {

        try {
            Observable observableName = Observable.fromIterable(deviceList);
            observableName
                    .flatMap(new Function<String, Observable<String>>() {
                        @Override
                        public Observable<String> apply(String device) throws Exception {
                            return Observable.create(new ObservableOnSubscribe<String>() {
                                @Override
                                public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {

                                    try {
                                        System.out.println("Test and connect to port connection:"+device);
                                        String portAddress = checkPortConnection(device);
                                        observableEmitter.onNext(portAddress);
                                        observableEmitter.onComplete();
                                    } catch (Exception e) {
                                        System.out.println("Problem connecting to port:"+device);
                                    }
                                }
                            });
                        }
                    })
                    .take(1)
                    .subscribe(getObserver());

        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    private String checkPortConnection(String deviceID) throws Exception {
        if ("Port_Address".equals(deviceID) || "Port_Address5".equals(deviceID)) {
            return deviceID;
        }
        throw new Exception("Not connected.");
    }


    public static void main(String[] args) {
        System.out.println("Test port connection.");

        MainTest test = new MainTest();
        test.setTestPorts();
        test.testPortConnection();
    }


    private Observer<String> getObserver() {
        return new Observer<String>() {

            @Override
            public void onSubscribe(Disposable disposable) {
                System.out.println("Subscribed to observable.");
            }

            @Override
            public void onNext(String s) {
                System.out.println("**** Port Connected:" + s +" ****");
            }

            @Override
            public void onError(Throwable throwable) {
            }

            @Override
            public void onComplete() {
                System.out.println("connect process completed");
            }
        };
    }
}


The output of the program is:

Test port connection.
Subscribed to observable.
Test and connect to port connection:Port_Address1
Problem connecting to port:Port_Address1
Test and connect to port connection:Port_Address2
Problem connecting to port:Port_Address2
Test and connect to port connection:Port_Address3
Problem connecting to port:Port_Address3
Test and connect to port connection:Port_Address4
Problem connecting to port:Port_Address4
Test and connect to port connection:Port_Address
**** Port Connected:Port_Address ****
connect process completed


Developers! Quickly and easily gain access to the tools and information you need! Explore, test and combine our data quality APIs at Melissa Developer Portal – home to tools that save time and boost revenue. Our APIs verify, standardize, and correct the Big 4 + more – name, email, phone and global addresses – to ensure accurate delivery, prevent blacklisting and identify risks in real-time.

Topics:
rxjava ,java ,reactive ,tutorial

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}