Akka Client, C++ Server Through RabbitMQ
Join the DZone community and get the full member experience.
Join For FreeOver the next few weeks, I will tell you about all the discoveries I made in the project storm that made me skip blogging for three weeks. And today, we begin with some Akka AMQP client talking to RabbitMQ and C++ on the other end.
The motivation
I have some image processing code in a static library that I needed to use in my Akka application. First, I grabbed JNI. That turned out to be a mistake, mainly because of my bad C++ code, which loved SIG_SEGV
. You can have as much actor supervision as you like, when the JVM is dead, it takes your actors with it. So, out with JNI, and in with AMQP. This allowed me to use treat the C++ application as if it were an actor. Even better, the data I was sending to the C++ code was binary in nature, so AMQP was an excellent fit again.
The Akka code
The Akka code is trivial. You just need to grab the AMQP client (my clone at https://github.com/janm399/amqp-client, build it and use it in your project.
object Main extends Application { // boot up Akka val actorSystem = ActorSystem() // prepare the AMQP connection factory val connectionFactory = new ConnectionFactory() connectionFactory.setHost("localhost") // connect to the AMQP exchange val amqpExchange = ExchangeParameters(name = "amq.direct", exchangeType = "", passive = true) // create a "connection owner" actor, which will try and // reconnect automatically if the connection ins lost val connection = actorSystem.actorOf( Props(new ConnectionOwner(connectionFactory))) // make a RPC client val client = ConnectionOwner.createChildActor( connection, Props(new RpcClient())) // mechanics implicit val timeout = Timeout(1000, TimeUnit.MILLISECONDS) (client ? Request(Publish(...) :: Nil))) onComplete { case response => ... } }
All that we need to fill in is the message that we are sending and what we’re doing with the response. I shall leave the second to your imagination, but let’s for now focus on the message.
We will be sending a direct message–we wish to establish point-to-point-to-point communication. (When you ask for a response, your client will get its own private queue, where the server will place the response.) We will use RabbitMQ’s default direct exchange, amq.direct
. In addition to the exchange, you need the routing key, so that RabbitMQ knows which queue it needs to place the request to. But what value do we use?
Let’s create the queue and give call it cppdemo
. Then we need to bind the queue to some exchange and routing key. In our case, the message will reach the cppdemo
queue when sent to the amq.direct
exchange with the cppdemo.basic
routing key.
Back to our Scala code, then
... (client ? Request(Publish("amq.direct", "cppdemo.basic", ...) :: Nil))) onComplete { case response => ... } ...
The only thing that remains are the bytes that make up the message…
C++
And down the rabbit hole we go. Before we jump into the C++ code, and before we complete the Akka code, we need to do some setup.
Tooling
We shall use Boost and the RabbitMQ C and C++ clients at https://github.com/alanxz/rabbitmq-c and https://github.com/alanxz/SimpleAmqpClient. You will also need cmake. We will be needing static libraries for both the RabbitMQ clients, so we build rabbitmq-c
by
cmake . -DBUILD_STATIC_LIBS=true cmake --build . sudo cmake --build . --target install
And follow on with the C++ SimpleAmqpClient
cmake . -DBUILD_SHARED_LIBS=false cmake --build . sudo cmake --build . --target install
The main code
We’re now ready to write our C++ code. We will establish the connection to the RabbitMQ server and set up RCP server. We bind our server to the same queue and listen for a message that conforms to the following structure:
const int32_t message_signature = 0x1000acca; // sorry, no k in hex! typedef struct { int32_t signature; int32_t size1; int32_t size2; } message_header_t;
In addition to the messages, we define some error type that is returned from our processing function.
struct ProcessingError: virtual boost::exception { }; typedef boost::error_info<struct errinfo_message_, std::string const> errinfo_message;
Now, onwards to our main code, which I shall leave without the careful dissection. It is very easy and you should be able to follow it.
std::string process(BasicMessage::ptr_t request) { const amqp_bytes_t& bytes = request->getAmqpBody(); if (bytes.len < sizeof(message_header_t)) throw ProcessingError() << errinfo_message("message too small"); const message_header_t* header = static_cast<message_header_t*>(bytes.bytes); if (header->signature != message_signature) throw ProcessingError() << errinfo_message("bad signature"); // we're good. size_t totalSize = sizeof(message_header_t) + header->size1 + header->size2; if (bytes.len != totalSize) throw ProcessingError() << errinfo_message("bad message size"); return "it worked!"; } int main() { try { Channel::ptr_t channel = Channel::Create(); channel->BindQueue("cppdemo", "amq.direct", "cppdemo.basic"); std::string tag; tag = channel->BasicConsume("cppdemo", "", true, true, false, 2); while (true) { // consume the message Envelope::ptr_t env = channel->BasicConsumeMessage(tag); BasicMessage::ptr_t request = env->Message(); try { std::string body = process(request); BasicMessage::ptr_t response = BasicMessage::Create(); channel->BasicPublish("amq.direct", request->ReplyTo(), body); } catch (ProcessingError &e) { const std::string* msg = boost::get_error_info<errinfo_message>(e); std::cerr << (*msg) << std::endl; } } } catch (std::runtime_error &e) { std::cout << "Error " << e.what() << std::endl; } }
n short, we wait for messages; when one arrives, we check that its structure is OK and then we do some processing and send back a response. The brilliant thing is that we can start as many separate processes of this program as we like; if one dies (perhaps with my faviourite SIG_SEGV
, no messages are lost and the other programs are still OK.) Simples!
Back to Scala
Back in the comfortable Scala world, we just have to construct the message correctly. Remember: we have the signature 0x1000acca
, followed by two sizes and then bytes that add up to the both sizes. And so, we will do just that:
val os = new ByteArrayOutputStream() os.write(0x1000face) os.write(0x1) os.write(0x1) os.write(0xa) os.write(0xb) val request = Publish("amq.direct", "cppdemo.basic", os.toByteArray) (client ? Request(request :: Nil)) onComplete { case Success(r: Response) => println("*** " + r.deliveries.head.body) }
As I said before, I will leave how you process the response to you. You may wish to extract the body from the deliveries, or you may want to do some other funky processing–over to you!
Summary
So, the good news is that having to use native code in your JVM-based applications does not have to immediately mean JNI; and Akka is particularly good fit for messaging infrastructures. Drop in the necessary AMQP client code, build the C++ code and you’re all up and running.
Published at DZone with permission of Jan Machacek, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Trending
-
4 Expert Tips for High Availability and Disaster Recovery of Your Cloud Deployment
-
Best Practices for Securing Infrastructure as Code (Iac) In the DevOps SDLC
-
Using OpenAI Embeddings Search With SingleStoreDB
-
Explainable AI: Making the Black Box Transparent
Comments