Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

How (Not) to Use Reactive Streams in Java 9+

DZone's Guide to

How (Not) to Use Reactive Streams in Java 9+

Learn more about the proper, and improper, use of Reactive Streams in recent JDK versions.

· Java Zone ·
Free Resource

Java-based (JDBC) data connectivity to SaaS, NoSQL, and Big Data. Download Now.

Introduction

Reactive Streams is a standard for asynchronous data processing in a streaming fashion with non-blocking backpressure. Starting from Java 9, they have become a part of the JDK in the form of the java.util.concurrent.Flow.* interfaces.

Having the interfaces at hand may tempt you to write your own implementations. Surprising as it may seem, that’s not why they are in the JDK.

In this article, I’m going to describe the basic concepts of reactive stream processing and show how not to use the APIs included in JDK 9+. Plus, we’re going to ponder the possible directions in which JDK’s Reactive Streams support may go in the future.

Shameless Plug

If you prefer watching videos to reading, here’s a recording of one of my talks on this topic, which, by the way, was the third top-rated talk of Riga Dev Days 2016–18. Otherwise, please continue reading.

Stream Processing Recap

In a generalized stream processing architecture, which most of you have probably seen at least once, you can name a couple of main concepts:

  • the source of the data, sometimes called the producer,
  • the destination for the data, sometimes called the consumer,
  • one or more processing stages that do something with the data.

In such a pipeline, the data flows from the producer, through the processing stages, to the consumer:

Generalized stream processing architecture

Now, if you consider that the components above can have different processing speeds, there are two possible scenarios:

  1. If the downstream (i.e. the component that receives data) is faster than the upstream (the component that sends data), you’re all good, since the pipeline should be working smoothly.
  2. If, however, the upstream is faster, then the downstream becomes flooded with data and things start getting worse.

In the latter case, there is a couple of strategies to deal with the excess data:

  1. Buffer it  —  but buffers have limited capacity and you’re going to run out of memory sooner or later.
  2. Drop it  —  but then you lose data (which is usually not desired, but can make sense in certain cases — e.g. this is what networking hardware often does).
  3. Block until the consumer is done with it  —  but this may result in slowing down the entire pipeline.

The preferred way of dealing with those different processing capabilities is a technique called backpressure — which boils down to the slower consumer requesting a given amount of data from the faster producer — but only an amount the consumer is able to process at that time.

Coming back to the streaming pipeline diagram, you can think of backpressure as a special kind of signalling data flowing in the opposite direction (compared to the regular data that is being processed:

Streaming pipeline with backpressure

However, not every streaming pipeline with backpressure is necessarily a reactive one.

Reactive Streams

The crucial concept of Reactive Streams is processing infinite streams of data in an asynchronous and non-blocking manner so that the computing resources (think CPU cores or network hosts) can be used in parallel.

There are three key factors that make a stream reactive:

  • the data is processed asynchronously,
  • the backpressure mechanism is non-blocking,
  • the fact that the downstream can be slower than the upstream is somehow represented in the domain model.

Examples of the last one include the Twitter streaming API, where you can be disconnected if consuming too slow, or one of the built-in stages in Akka Streams — conflate — which lets you explicitly plan for a slow downstream.

Reactive Streams Support in the JDK

Starting from version 9, the Reactive Streams interfaces  —  formerly available as a separate library  —  have become a part of the JDK in the java.util.concurrent.Flow class.

The four interfaces seem fairly simple at first sight:

  • a Publisher<T> is responsible for publishing elements of type T and provides a subscribe method for subscribers to connect to it
  • a Subscriber<T> connects to a Publisher, receives a confirmation via onSubscribe, then receives data via the onNext callbacks and additional signals via onError and onComplete
  • a Subscription represents a link between a Publisher and a Subscriber and allows for backpressuring the publisher with request or terminating the link with cancel
  • a Processor combines the capabilities of a Publisher and a Subscriberin a single interface

OK, Let’s Code!

Having such simple interfaces at hand may tempt you to try to implement them. For example, you may write a trivial implementation of the Publisherthat publishes an arbitrary iterator of integers:

public class SimplePublisher implements Flow.Publisher<Integer> {

    private final Iterator<Integer> iterator;

    SimplePublisher(int count) {
        this.iterator = IntStream.rangeClosed(1, count).iterator();
    }

    @Override
    public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
        iterator.forEachRemaining(subscriber::onNext);
        subscriber.onComplete();
    }
}


Then, you can try to run it using some dummy subscriber that just prints out the received data:

public static void main(String[] args) {
    new SimplePublisher(10).subscribe(new Flow.Subscriber<>() {
        @Override
        public void onSubscribe(Flow.Subscription subscription) {}

        @Override
        public void onNext(Integer item) {
            System.out.println("item = [" + item + "]");
        }

        @Override
        public void onError(Throwable throwable) {}

        @Override
        public void onComplete() {
            System.out.println("complete");
        }
    });
}


If you run it and check the output, it should have produced:

item = [1]
item = [2]
item = [3]
item = [4]
item = [5]
item = [6]
item = [7]
item = [8]
item = [9]
item = [10]
complete


So, it works, right? It looks like it does, but you may have a gut feeling that something is missing. For example, the publisher doesn’t emit elements according to any demand but rather just sends them downstream all at once.

It turns out that there’s a way to prove that this naive implementation is far from correct. This can be achieved by running a couple of tests from the Reactive Streams TCK. The TCK (or Technology Compatibility Kit) is nothing more than a test framework that verifies if an implementation of reactive components is correct in terms of the components interacting correctly with each other. Its goal is to ensure that all custom Reactive Streams implementations can smoothly work together  —  connected by the abstract interfaces   — while performing all the data transfers, signaling, and backpressure correctly.

To create a test case for SimplePublisher, you need to add a proper dependency to your build definition and extend the TCK’s FlowPublisherVerification:

public class SimplePublisherTest extends FlowPublisherVerification<Integer> {

    public SimplePublisherTest() {
        super(new TestEnvironment());
    }

    @Override
    public Flow.Publisher<Integer> createFlowPublisher(long elements) {
        return new SimplePublisher((int) elements);
    }

    @Override
    public Flow.Publisher<Integer> createFailedFlowPublisher() {
        return null;
    }
}


After running the test case for the naive publisher, you can see that it indeed has some issues:

Results of running the TCK publisher tests against SimplePublisher

Actually, there’s just a single test case that has passed; all others have problems. This clearly demonstrates that the trivial implementation is not a proper one.

The numbers in the names of the test cases refer to the respective items in the Reactive Streams specification where you can further explore the concepts behind those requirements.

It turns out that most of the problems can be eliminated by a couple of small changes, i.e.:

  • introducing an implementation of Subscription to link the publisher with its subscribers, which would emit elements according to demand
  • adding some basic error handling
  • adding some simple state within the subscription to correctly handle termination.

For details, please have a look at the history of the commits in the repository with the example code.

However, eventually, you’re going to come to a point where the problems become less trivial and harder to solve.

Since the implementation is synchronous, there’s an issue with unbounded recursion resulting from the subscription’s request() calling the subscriber’s onNext(), where, in turn, the subscriber calls request() again, etc.

The other serious issue has to do with handling infinite demand (i.e. the subscriber requesting Long.MAX_VALUE elements, possibly a couple of times). If you’re not careful enough here, you may end up either spawning too many threads or overflowing some long value where you could possibly store the accumulated demand.

Don’t Try This at Home

The bottomline of the example above is that the reactive components are really not trivial to implement correctly. So, unless you’re authoring yet another Reactive Streams implementation, you shouldn’t really be implementing them yourself, but rather use the existing implementations, which are verified with the TCK.

And if you decide to write your own implementation anyway, be sure to understand all the details of the specification and remember to run the TCK against your code.

The Purpose of the New Interfaces

So what are the interfaces there for, you may ask yourself? The actual goal of having them included in the JDK is to provide something called a Service Provider Interface (or SPI) layer. This should eventually serve as a unification layer for different components that have reactive and streaming nature, but may expose their own custom APIs, and thus not be able to interoperate with other similar implementations.

The other, equally important goal is to point the future development of the JDK in a proper direction, leading to a point where the existing streaming abstractions, which are already present in the JDK and widely used, make use of some common interfaces — once again to improve interoperability.

Existing Streaming Abstractions

So what streaming abstractions are already there in the JDK (with streamingmeaning processing large, possibly infinite, amounts of data chunk by chunk, without reading everything into memory upfront)? Those include:

  • java.io.InputStream/OutputStream
  • java.util.Iterator
  • java.nio.channels.*
  • javax.servlet.ReadListener/WriteListener
  • java.sql.ResultSet
  • java.util.Stream
  • java.util.concurrent.Flow.*

Although all of the above abstractions expose some kind of streaming-like behavior, they miss a common API that would let you connect them easily, e.g. to use a Publisher to read data from one file and a Subscriber to write it to another one.

The advantage of having such unification layer is the possibility to use a single call:

publisher.subscribe(subscriber)


Use this to handle all the hidden complexities of reactive stream processing (like backpressure and signalling).

Towards an Ideal World

What could be the possible results of making the various abstractions use the common interfaces? Let’s see a few examples.

Minimum Operation Set

The current Reactive Streams support in the JDK is limited to the four interfaces described earlier. If you have ever used some reactive library before  —  Akka Streams, RxJava, or Project Reactor  —  you’re aware that their power lies in various stream combinators (like map or filter to name the simplest ones) available out of the box. Those combinators are, however, missing from the JDK, although you’d probably expect at least a couple of them to be available.

To solve this problem, Lightbend has proposed a POC of Reactive Streams Utilities  —  a library with the basic operations built-in and with the possibility to provide the more complex ones as plug-ins delegating to an existing implementation, specified by a JVM system parameter like:

-Djava.flow.provider=akka


HTTP

How do we recieve a file uploaded via HTTP and upload it somewhere else, in a reactive fashion, of course?

Since Servlet version 3.1, there’s asynchronous Servlet IO. Also, starting from JDK 9, there’s a new HTTP client (which was in the jdk.incubating.httpmodule in Java 9/10 but is considered stable from Java 11 on). Apart from a nicer API, the new client also supports Reactive Streams as an input/output. Among others, it provides a POST(Publisher<ByteBuffer>) method.

Now, if the HttpServletRequest provided a publisher to expose the request body, uploading the received file would become:

POST(BodyPublisher.fromPublisher(req.getPublisher())


This occurs with all of the reactive features under the hood — just by using that single line of code.

Database Access

When it comes to a universal way to access a relational database in a reactive way, there was some hope brought by the Asynchronous Database Access API (ADBA), which, unfortunately, hasn’t made it to the JDK so far.

There’s also R2DBC  —  an endeavor to bring a reactive programming API to relational data stores. It currently supports H2 and Postgres and plays nicely with Spring Data JPA, which may be an advantage that helps with wider adoption.

Then, there are some vendor-specific asynchronous drivers. But we’re still missing a perfect solution that would let you do something like:

Publisher<User> users = entityManager    
.createQuery("select u from users")    
  .getResultPublisher()


This is basically a plain old JPA call, just with a Publisher of users instead of a List.

This Is Still Not the Reality

Just to remind you once again  —  the above examples are a look into the future. They are not here yet. In which direction the JDK ecosystem goes is a matter of time and efforts from the community.

An Actual Use of the Unification Layer

Although the unification of HTTP and databases is not yet there, it’s already possible to actually connect the various Reactive Streams implementations using the unified interfaces found in the JDK.

In this example, I’m going to use Project Reactor’s Flux as the publisher, Akka Streams’ Flow as the processor, and RXJava as the subscriber. Note: the example code below uses Java 10 vars, so if you plan to try it yourself, be sure to have a proper JDK.

public class IntegrationApp {

    public static void main(String[] args) {
        var reactorPublisher = reactorPublisher();
        var akkaStreamsProcessor = akkaStreamsProcessor();

        reactorPublisher.subscribe(akkaStreamsProcessor);

        Flowable
            .fromPublisher(FlowAdapters.toProcessor(akkaStreamsProcessor))
            .subscribe(System.out::println);
    }

    private static Publisher<Long> reactorPublisher() {
        var numberFlux = Flux.interval(Duration.ofSeconds(1));
        return JdkFlowAdapter.publisherToFlowPublisher(numberFlux);
    }

    private static Processor<Long, Long> akkaStreamsProcessor() {
        var negatingFlow = Flow.of(Long.class).map(i -> -i);
        return JavaFlowSupport.Flow.toProcessor(negatingFlow).run(materializer);
    }

    private static ActorSystem actorSystem = ActorSystem.create();
    private static ActorMaterializer materializer = ActorMaterializer.create(actorSystem);
}


Looking at main, you can see that there are three components that form the pipeline: the reactorPublisher, the akkaStreamsProcessor, and the Flowable, which prints to standard output.

When you look at the return types of the factory methods, you will notice that they are nothing more than the common Reactive Streams interfaces (a Publisher<Long> and a Processor<Long, Long>), which are used to seamlessly connect the different implementations.

Also, as you can see, the various libraries don’t return the unified types out of the box (i.e. they internally use a different type hierarchy), but they need some glue code that would convert their internal types to the ones from java.util.concurrent.Flow.* — like the JdkFlowAdapter or the JavaFlowSupport.

Last but not least, you can spot some differences between the different libraries in terms of how they expose the internals of the streaming engine. While Project Reactor tends to completely hide the internals, Akka Streams, on the other hand, require you to explicitly define a materializer  —  the runtime for the streaming pipeline.

Summary

Here is a couple of key takeaways from this article:

  • Reactive Streams support in the JDK is not a full implementation of the specification but only the common interfaces,
  • the interfaces are there to serve as an SPI (Service Provider Interface)  —  a unification layer for different Reactive Streams implementations,
  • implementing the interfaces yourself is not trivial and not recommended, unless you’re creating some new library; if you decide to implement them, make sure that all the tests from the TCK are green  —  this gives you a good chance that your library will work smoothly with other reactive components.

If you wish to experiment with the TCK and the SimplePublisher example, the code is available on my GitHub.

And if you’re interested in digging deeper into Reactive Streams implementation, I truly recommend the Advanced Reactive Java blog and SoftwareMill Tech blog for more posts like this one.

Connect any Java based application to your SaaS data.  Over 100+ Java-based data source connectors.

Topics:
java ,reactive streams ,JDK ,streams ,TCK ,API ,Java 9 ,Java streams ,reactive ,tutorial

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}