{{announcement.body}}
{{announcement.title}}

Dubbo 3.0 Preview: Support for Reactive Programming

DZone 's Guide to

Dubbo 3.0 Preview: Support for Reactive Programming

Learn more about added reactive programming support for this open-source Java RPC framework.

· Java Zone ·
Free Resource

Background

Dubbo is graduating from the Apache foundation! And we are planning some major release milestones. Release 3.0 is on the way. Since there will be many new features in 3.0, we want to make sure they are up to the expectation of the community. We are now offering the 3.0.0-SNAPSHOT version. It has many preview features of the 3.0 release. In this article, I’ll introduce one of the major enhancements: the support for reactive programming.

RSocket

Reactive programming enables developers to write more efficient applications, especially in a distributed architecture. The community has been asking for this feature for a long time and now we are delivering!

The reactive programming implementation we choose is RSocket. RSocket is an application protocol providing Reactive Streams semantics. It extends the classic reactive streams to the networking layer by ensuring end-to-end back pressure. We believe this is of vital importance in the era of microservices.

RSocket Java SDK is utilizing the java Reactor project. The most common Reactor reactive types are Mono and Flux. RSocket defines four types of communication models:

  • Request/response (Mono)
  • Request/stream (Flux)
  • Fire-and-forget (Void Mono)
  • Channel (bi-directional Flux)

The RSocket Java SDK provides APIs for each communication model. But users still need to know the details of how RSocket works to assemble the right code structure.

What Dubbo offers is an extra layer of abstraction on top of the SDK. So that users need only deal with Mono and Flux types without worrying about things like transport or fragmentation.

Example

Next, let’s look at some code examples. The examples are demonstrated using the  Spring Boot framework.

First, we define the interfaces of the service:

public interface DemoService {

    Mono<String> requestMonoWithMonoArg(Mono<String> m1, Mono<String> m2);

    Flux<String> requestFluxWithFluxArg(Flux<String> f1, Flux<String> f2);

}


Here is the service implementation:

public class DemoServiceImpl implements DemoService {

    @Override

    public Mono<String> requestMonoWithMonoArg(Mono<String> m1, Mono<String> m2) {

        return m1.zipWith(m2, new BiFunction<String, String, String>() {

            @Override

            public String apply(String s, String s2) {

                return s+" "+s2;

            }

        });

    }

    @Override

    public Flux<String> requestFluxWithFluxArg(Flux<String> f1, Flux<String> f2) {

        return f1.zipWith(f2, new BiFunction<String, String, String>() {

            @Override

            public String apply(String s, String s2) {

                return s+" "+s2;

            }

        });

    }

}


In the configuration file, the only thing to remember is to choose the protocol RSocket:

<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

       xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"

       xmlns="http://www.springframework.org/schema/beans"

       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

       http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">

    <!-- provider's application name, used for tracing dependency relationship -->

    <dubbo:application name="demo-provider"/>

    <!-- use registry center to export service -->

    <dubbo:registry address="zookeeper://127.0.0.1:2181"/>

    <!-- use dubbo protocol to export service on port 20880 -->

    <dubbo:protocol name="rsocket" port="20890"/>

    <!-- service implementation, as same as regular local bean -->

    <bean id="demoService" class="org.apache.dubbo.samples.basic.impl.DemoServiceImpl"/>

    <!-- declare the service interface to be exported -->

    <dubbo:service interface="org.apache.dubbo.samples.basic.api.DemoService" ref="demoService"/>

</beans>


Let’s start the server:

public class RsocketProvider {

    public static void main(String[] args) throws Exception {

        new EmbeddedZooKeeper(2181, false).start();

        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"spring/rsocket-provider.xml"});

        context.start();

        System.in.read(); // press any key to exit

    }

}


Next is the client. Let's take a look at the configuration first:

<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

       xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"

       xmlns="http://www.springframework.org/schema/beans"

       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

       http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">

    <!-- consumer's application name, used for tracing dependency relationship (not a matching criterion),

    don't set it same as provider -->

    <dubbo:application name="demo-consumer"/>

    <!-- use registry center to discover service -->

    <dubbo:registry address="zookeeper://127.0.0.1:2181"/>

    <!-- generate proxy for the remote service, then demoService can be used in the same way as the

    local regular interface -->

    <dubbo:reference id="demoService" check="true" interface="org.apache.dubbo.samples.basic.api.DemoService"/>

</beans>


Here is the client code:

public class RsocketConsumer {

    public static void main(String[] args) {

        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"spring/rsocket-consumer.xml"});

        context.start();

        DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy

        while (true) {

            try {

                Mono<String> monoResult = demoService.requestMonoWithMonoArg(Mono.just("A"), Mono.just("B"));

                monoResult.doOnNext(new Consumer<String>() {

                    @Override

                    public void accept(String s) {

                        System.out.println(s);

                    }

                }).block();

                Flux<String> fluxResult = demoService.requestFluxWithFluxArg(Flux.just("A","B","C"), Flux.just("1","2","3"));

                fluxResult.doOnNext(new Consumer<String>() {

                    @Override

                    public void accept(String s) {

                        System.out.println(s);

                    }

                }).blockLast();

            } catch (Throwable throwable) {

                throwable.printStackTrace();

            }

        }

    }

}


Now if you run both the server and the client, you should be able to see the print-out.

As you can see, the only major difference of this code from the “old-style” RPC code is the introduction of Mono and Flux . If you still use the request/response style coding, there is really not much change. Even for Flux, it still feels more like imperative programming rather than functional programming.

Under the hood, what RSocket did was encode all the information, not just the payload, into the frame. The frame encapsulates all the metadata, request commands, back pressure information, etc. All operations of RSocket are communicated via frames as well.

Summary

RSocket is a powerful implementation of reactive programming. Dubbo has been widely used in enterprise Java applications, especially in China. With the support of RSocket, Dubbo will remain a great option for your distributed applications.

Topics:
java ,distributed architecture ,reactive streams ,dubbo ,dubbo 3.0 ,rsocket ,reactive programming

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}