Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Reactive Streams: AMQP 1.0 Is a Really Good ''Reactive'' Protocol

DZone's Guide to

Reactive Streams: AMQP 1.0 Is a Really Good ''Reactive'' Protocol

Enjoying the Reactive Streams API? If you're setting up communications between microservices in a distributed system, take a look at how AMQP might fit in.

· IoT Zone ·
Free Resource

Header

During the recent Christmas holidays, I decided to spend some of my spare time digging into the reactive programming paradigm, the “Reactive streams” manifesto, and the related ReactiveX implementation (more specifically, RxJava).

This blog post doesn’t intend to be a discussion about what reactive streams are or what reactive programming is — you can find a lot of really useful resources on those points on the Internet. But because I’m a messaging and IoT guy, in this article, I’ll try to describe some (really trivial) thoughts I had “discovering” the reactive streams API  and comparing it to the AMQP 1.0 protocol.

On December 30th, I tweeted …

Selection_004

As you can see, I defined AMQP 1.0 as a “reactive” protocol because I found all the semantics and the related definitions from the reactive streams API in the AMQP 1.0 specification.

What I’m going to describe is a mapping at 20,000 feet without digging into all the possible problems we can encounter doing that, just because it seemed rather trivial to me; I’d like to open a discussion on it or give input to the other people who are thinking about that.

This article could also be useful when it comes to using a reactive programming model in a microservices based system where a “good” messaging protocol for supporting such a model is needed.

The Reactive Streams API

We know that AMQP 1.0 is really a peer-to-peer protocol so we can establish communication between two clients directly or through an intermediary (one or more) such as a broker (for allowing store-and-forward) or a router (providing direct-messaging as well). In all these use cases, it’s always about having a “sender” and a “receiver”, which can be just mapped to a “publisher” and a “subscriber” in the reactive streams API terms (if you think about ReactiveX, then you know them as “observable” and “observer”).

The reactive streams API is defined with four main interfaces and some methods that can be mapped in terms of specific AMQP 1.0 “performatives”.

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

public interface Subscription {
    public void request(long n);
    public void cancel();
}


The above interfaces describe how a “subscriber” can subscribe in order to receive a stream of events published by a “publisher” and how this one “pushes” events to the subscriber. The API also defines how it’s possible to for the subscriber to avoid being overwhelmed by events if it’s slower than the publisher, using a “request for events” mechanism on the subscription. Finally, the subscriber can be notified when the stream is completed (if it’s not infinite) or if an error occurrs.

During this post, I won’t consider the Processor interface, which enables an entity to be both a “publisher” and “subscriber” and is mainly used for implementing “operators” in the stream processing chain.

Attaching as… Subscribing

The Publisher interface provides a subscribe method that is called by the subscriber when it wants to start receiving all the published events on the related stream (in a “push” fashion).

If we assign a name to the stream, which could be an “address” in AMQP terms, then such an operation could be an “attach” performative sent by the subscriber, which acts as a receiver on the established link. In the opposite direction, the publisher can reply with an “attach” performative (on the same address), acting as a sender. This operation could be mapped as the onSubscribe method call on the Subscriber interface.

FIG1

FIG.1 – attach as Publisher.subscribe, Subscriber.onSubscribe

Credits-Based Flow Control and Transfer... for Back-Pressure and Pushing Events

One of the main reactive streams concepts that convinced me that AMQP is really a “reactive” protocol was back-pressure. It provides a way for handling scenarios where a subscriber is slower than the publisher, avoiding being overwhelmed by a lot of events it can’t handle (losing them); the subscriber can notify the publisher of the the maximum number of events it can receive. In my mind, it’s something that AMQP provides out-of-box with the credits-based flow control (something that it’s not available with the MQTT 3.1.1 protocol for example).

In terms of the reactive streams API, such a feature is provided by the Subscription interface with the request method; calling this method, the subscriber states the maximum number of events to the publisher. In AMQP terms, it means that the receiver sends a “flow” performative specifying the credits number as the maximum number of messages it can handle in a specific moment in time.

At this point, the publisher can start to push events to the subscriber through the onNext method call on the Subscriber interface. Even in this case, in AMQP terms, the sender starts to send one or more “transfer” performatives to the receiver with the message payload (representing the event).

FIG2

FIG.2 – flow as Subscription.request(n) and transfer as Subscriber.onNext

Detaching… for Canceling, Completed Streams, or Errors

In order to complete this 20,000-foot mapping, there are few other methods provided by the reactive streams API I haven’t covered yet.

First of all, the subscriber can decide to not receiving events anymore by calling the cancel method on the Subscription interface. In AMQP terms, this could be a simple “detach” performative sent by the receiver during the “normal” message (event) exchanges.

FIG3

FIG.3 – detach from receiver as Subscription.cancel

Finally, it’s important to remember that the reactive streams API takes into account finite streams of events and errors as well.

Regarding finite streams of events, the Subscriber interface exposes the onComplete method, which is called when the publisher has no more events to push anymore — so the stream is completed. In AMQP, it could mean a “detach” performative sent by the sender without any error conditions.

FIG4

FIG.4 – detach from sender as Subscriber.onComplete

At the same time, the reactive streams API defines a way to handle errors without catching exceptions, but rather handling them as special events. The Subscriber interface provides the onError method, which is called when an error happens and the subscriber is notified about it (in any case, such an error is represented by a specific Throwable implementation). In AMQP, it could mean a “detach” performative sent by the sender (as it happens for a completed stream), but this time with an error condition providing specific error information.

FIG5

FIG.5 – detach from sender as Subscriber.onError

Conclusion

Maybe you could have a different opinion (and I’d like to hear about that) but, at a first glance, it seemed to me that AMQP 1.0 is really THE protocol suited for implementing the reactive streams API — and the related reactive programming paradigm when it comes to microservices in a distributed system and designing their communication in a reactive way. It provides the main communication patterns (request/reply but mainly publish/subscribe for this specific use case), and it provides flow control for the back pressure as well. It’s a really “push”-oriented protocol compared to the “pull” nature of HTTP, for example. MQTT could be another protocol used in a reactive fashion, but it lacks flow control (at least in the current 3.1.1 specification).

Topics:
iot ,reactive streams ,amqp ,reactive programming

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}