Reactive Queue with Akka Reactive Streams
Join the DZone community and get the full member experience.
Join For Freereactive streams is a recently announced initiative to create a standard for asynchronous stream processing with built-in back-pressure, on the jvm. the working group is formed by companies such as typesafe, red hat, oracle, netflix, and others.
one of the early, experimental implementations is based on akka . preview version 0.3 includes actor producers & consumers, which opens up some new integration possibilities.
to test the new technology, i implemented a very simple reactive message queue . the code is at a poc stage, lacks error handling and such, but if used properly – works ;).
the queue is reactive, meaning that messages will be delivered to interested parties whenever there’s demand, without polling. back-pressure is applied both when sending messages (so that senders do not overwhelm the broker), and when receiving messages (so that the broker sends only as much messages as the receivers can consume).
let’s see how it works!
the queue
first, the queue itself is an actor, and doesn’t know anything about (reactive) streams. the code is in the
com.reactmq.queue
package. the actor accepts the following actor-messages (the term “message” is overloaded here, so i’ll use plain “message” to mean the messages we send to and receive from the queue, and “actor-messages” to be the scala class instances sent to actors):
-
sendmessage(content)
– sends a message with the specifiedstring
content. a reply (sentmessage(id)
) is sent back to the sender with the id of the message -
receivemessages(count)
– signals that the sender (actor) would like to receive up tocount
messages. the count is cumulated with previously signalled demand. -
deletemessage(id)
– unsurprisingly, deletes a message
the queue implementation is a simplified version of what’s in elasticmq . after a message is received, if it is not deleted (acknowledged) within 10 seconds, it becomes available for receiving again.
when an actor signals demand for messages (by sending
receivemessages
to the queue actor), it should expect any number of
receivedmessages(msgs)
actor-messages replies, containing the received data.
going reactive
to create and test our reactive queue, we need three applications:
we can run any number of
sender
s and
receiver
s, but of course we should run only one
broker
.
the first thing that we need to do is to connect the
sender
with the
broker
, and the
receiver
with the
broker
over a network. we can do that with the akka io extension and the reactive tcp extension. using a
connect
&
bind
pair, we get a stream of connections on the binding side:
// sender: val connectfuture = io(streamtcp) ? streamtcp.connect(settings, sendserveraddress) connectfuture.onsuccess { case binding: streamtcp.outgoingtcpconnection => logger.info("sender: connected to broker") // per-connection logic } // broker: val bindsendfuture = io(streamtcp) ? streamtcp.bind(settings, sendserveraddress) bindsendfuture.onsuccess { case serverbinding: streamtcp.tcpserverbinding => logger.info("broker: send bound") flow(serverbinding.connectionstream).foreach { conn => // per-connection logic }.consume(materializer) }
there’s a different address for sending and receiving messages.
the sender
let’s look at the per-connection logic of the
sender
first.
flow(1.second, () => { idx += 1; s"message $idx from $sendername" }) .map { msg => logger.debug(s"sender: sending $msg") createframe(msg) } .toproducer(materializer) .produceto(binding.outputstream)
we are creating a tick-flow which produces a new message every second (very convenient for testing). using the
map
stream transformer, we are creating a byte-frame with the message (more on that later). but that’s only a description of how our (very simple) stream should look like; it needs to be
materialized
using the
toproducer
method, which will provide concrete implementations of the stream transformation nodes. currently there’s only one
flowmaterializer
, which – again unsurprisingly – uses akka actors under the hood, to actually create the stream and the flow.
finally, we connect the producer we have just created to the tcp binding’s
outputstream
, which happens to be a consumer. and we now have a reactive over-the-network stream of messages, meaning that messages will be sent only when the
broker
can accept them. otherwise back-pressure will be applied all the way up to the tick producer.
the broker: sending messages
on the other side of the network sits the
broker
. let’s see what happens when a message arrives.
flow(serverbinding.connectionstream).foreach { conn => logger.info(s"broker: send client connected (${conn.remoteaddress})") val sendtoqueueconsumer = actorconsumer[string]( system.actorof(props(new sendtoqueueconsumer(queueactor)))) // sending messages to the queue, receiving from the client val reconcileframes = new reconcileframes() flow(conn.inputstream) .mapconcat(reconcileframes.apply) .produceto(materializer, sendtoqueueconsumer) }.consume(materializer)
first, we create a
flow
from the connection’s input stream – that’s going to be the incoming stream of bytes. next, we re-construct the
string
instances that were sent using our framing, and finally we direct that stream to a send-to-queue consumer.
the
sendtoqueueconsumer
is a per-connection bridge to the main queue actor. it uses the
actorconsumer
trait from akka’s reactive streams implementation, to automatically manage the demand that should be signalled upstream. using that trait we can create a reactive-stream-
consumer[_]
, backed by an actor – so a fully customisable sink.
class sendtoqueueconsumer(queueactor: actorref) extends actorconsumer { private var inflight = 0 override protected def requeststrategy = new maxinflightrequeststrategy(10) { override def inflightinternally = inflight } override def receive = { case onnext(msg: string) => queueactor ! sendmessage(msg) inflight += 1 case sentmessage(_) => inflight -= 1 } }
what needs to be provided to an
actorconsumer
, is a way of measuring how many stream items are currently processed. here, we are counting the number of messages that have been sent to the queue, but for which we have not yet received an id (so they are being processed by the queue).
the consumer receives new messages wrapped in the
onnext
actor-message; so
onnext
is sent to the actor by the stream, and
sentmessage
is sent in reply to a
sendmessage
by the queue actor.
receiving
the receiving part is done in a similar way, though it requires some extra steps. first, if you take a look at the
receiver
, you’ll see that we are reading bytes from the input stream, re-constructing messages from frames, and sending back the ids, hence acknowledging the message. in reality, we would run some message-processing-logic between receiving a message and sending back the id.
on the
broker
side, we create two streams for each connection.
one is a stream of messages sent to receivers, the other is a stream of acknowledged message ids from the receivers, which are simply transformed to sending
deletemessage
actor-messages to the queue actor.
similarly to the consumer, we need a per-connection receiving bridge from the queue actor, to the stream. that’s implemented in
receivefromqueueproducer
. here we are extending the
actorproducer
trait, which lets you fully control the process of actually creating the messages which go into the stream.
in this actor, the
request
actor-message is being sent by the stream, to signal demand. when there’s demand, we request messages from the queue. the queue will eventually respond with one or more
receivedmessages
actor-message (when there are any messages in the queue); as the number of messages will never exceed the signalled demand, we can safely call the
actorproducer.onnext
method, which sends the given items downstream.
framing
one small detail is that we need a custom framing protocol (thanks to roland kuhn for the
clarification
), as the tcp stream is just a stream of bytes, so we can get arbitrary fragments of the data, which need to be recombined later. luckily implementing such a framing is quite simple – see the
framing
class. each frame consists of the size of the message, and the message itself.
summing up
using
reactive streams
and the
akka
implementation it is very easy to create reactive applications with end-to-end back-pressure. the queue above, while missing a lot of features and proofing, won’t allow the
broker
to be overloaded by the
senders
, and on the other side the
receivers
to be overloaded by the
broker
. and all that, without the need to actually write any of the backpressure-handling code!
Published at DZone with permission of Adam Warski, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Trending
-
Effortlessly Streamlining Test-Driven Development and CI Testing for Kafka Developers
-
Exploratory Testing Tutorial: A Comprehensive Guide With Examples and Best Practices
-
Merge GraphQL Schemas Using Apollo Server and Koa
-
SRE vs. DevOps
Comments