DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Join us tomorrow at 1 PM EST: "3-Step Approach to Comprehensive Runtime Application Security"
Save your seat
  1. DZone
  2. Coding
  3. Java
  4. Akka Client, C++ Server Through RabbitMQ

Akka Client, C++ Server Through RabbitMQ

Jan Machacek user avatar by
Jan Machacek
·
Jan. 31, 13 · Interview
Like (0)
Save
Tweet
Share
8.87K Views

Join the DZone community and get the full member experience.

Join For Free

Over the next few weeks, I will tell you about all the discoveries I made in the project storm that made me skip blogging for three weeks. And today, we begin with some Akka AMQP client talking to RabbitMQ and C++ on the other end.

The motivation

I have some image processing code in a static library that I needed to use in my Akka application. First, I grabbed JNI. That turned out to be a mistake, mainly because of my bad C++ code, which loved SIG_SEGV. You can have as much actor supervision as you like, when the JVM is dead, it takes your actors with it. So, out with JNI, and in with AMQP. This allowed me to use treat the C++ application as if it were an actor. Even better, the data I was sending to the C++ code was binary in nature, so AMQP was an excellent fit again.

The Akka code

The Akka code is trivial. You just need to grab the AMQP client (my clone at https://github.com/janm399/amqp-client, build it and use it in your project.

object Main extends Application {
  // boot up Akka
  val actorSystem = ActorSystem()
  // prepare the AMQP connection factory
  val connectionFactory = new ConnectionFactory()
  connectionFactory.setHost("localhost")
  // connect to the AMQP exchange
  val amqpExchange = ExchangeParameters(name = "amq.direct", 
                                        exchangeType = "", passive = true)

  // create a "connection owner" actor, which will try and 
  // reconnect automatically if the connection ins lost
  val connection = actorSystem.actorOf(
                     Props(new ConnectionOwner(connectionFactory)))
  // make a RPC client
  val client = ConnectionOwner.createChildActor(
                     connection, Props(new RpcClient()))

  // mechanics
  implicit val timeout = Timeout(1000, TimeUnit.MILLISECONDS)

  (client ? Request(Publish(...) :: Nil))) onComplete {
    case response => ...
  }
}

All that we need to fill in is the message that we are sending and what we’re doing with the response. I shall leave the second to your imagination, but let’s for now focus on the message.

We will be sending a direct message–we wish to establish point-to-point-to-point communication. (When you ask for a response, your client will get its own private queue, where the server will place the response.) We will use RabbitMQ’s default direct exchange, amq.direct. In addition to the exchange, you need the routing key, so that RabbitMQ knows which queue it needs to place the request to. But what value do we use?

Let’s create the queue and give call it cppdemo. Then we need to bind the queue to some exchange and routing key. In our case, the message will reach the cppdemo queue when sent to the amq.direct exchange with the cppdemo.basic routing key.

Back to our Scala code, then

...
(client ? Request(Publish("amq.direct", "cppdemo.basic", ...) :: Nil)))
  onComplete {
    case response => ...
  }
...

The only thing that remains are the bytes that make up the message…

C++

And down the rabbit hole we go. Before we jump into the C++ code, and before we complete the Akka code, we need to do some setup.

Tooling

We shall use Boost and the RabbitMQ C and C++ clients at https://github.com/alanxz/rabbitmq-c and https://github.com/alanxz/SimpleAmqpClient. You will also need cmake. We will be needing static libraries for both the RabbitMQ clients, so we build rabbitmq-c by

cmake . -DBUILD_STATIC_LIBS=true
cmake --build .
sudo cmake --build . --target install

And follow on with the C++ SimpleAmqpClient

cmake . -DBUILD_SHARED_LIBS=false 
cmake --build .
sudo cmake --build . --target install

The main code

We’re now ready to write our C++ code. We will establish the connection to the RabbitMQ server and set up RCP server. We bind our server to the same queue and listen for a message that conforms to the following structure:

const int32_t message_signature = 0x1000acca;
// sorry, no k in hex!

typedef struct {
  int32_t signature;
  int32_t size1;
  int32_t size2;
  
} message_header_t;

In addition to the messages, we define some error type that is returned from our processing function.

struct ProcessingError: virtual boost::exception { };
typedef 
  boost::error_info<struct errinfo_message_, std::string const> errinfo_message;

Now, onwards to our main code, which I shall leave without the careful dissection. It is very easy and you should be able to follow it.

std::string process(BasicMessage::ptr_t request) {
  const amqp_bytes_t& bytes = request->getAmqpBody();
  if (bytes.len < sizeof(message_header_t)) 
    throw ProcessingError() << errinfo_message("message too small");
  const message_header_t* header = 
    static_cast<message_header_t*>(bytes.bytes);
  if (header->signature != message_signature) 
    throw ProcessingError() << errinfo_message("bad signature");
  
  // we're good.
  size_t totalSize = sizeof(message_header_t) + 
                     header->size1 + header->size2;
  if (bytes.len != totalSize) 
    throw ProcessingError() << errinfo_message("bad message size");

  return "it worked!";
}

int main() {
  try {
    Channel::ptr_t channel = Channel::Create();
    
    channel->BindQueue("cppdemo", "amq.direct", "cppdemo.basic");
    
    std::string tag;
    tag = channel->BasicConsume("cppdemo", "", true, true, false, 2);
    
    while (true) {
      // consume the message
      Envelope::ptr_t env = channel->BasicConsumeMessage(tag);
      BasicMessage::ptr_t request = env->Message();
      try {
        std::string body = process(request);
        BasicMessage::ptr_t response = BasicMessage::Create();
        channel->BasicPublish("amq.direct", request->ReplyTo(), body);
      } catch (ProcessingError &e) {
        const std::string* msg = 
          boost::get_error_info<errinfo_message>(e);
        std::cerr << (*msg) << std::endl;
      }      
    }
  } catch (std::runtime_error &e) {
    std::cout << "Error " << e.what() << std::endl;
  }
  
}

n short, we wait for messages; when one arrives, we check that its structure is OK and then we do some processing and send back a response. The brilliant thing is that we can start as many separate processes of this program as we like; if one dies (perhaps with my faviourite SIG_SEGV, no messages are lost and the other programs are still OK.) Simples!

Back to Scala

Back in the comfortable Scala world, we just have to construct the message correctly. Remember: we have the signature 0x1000acca, followed by two sizes and then bytes that add up to the both sizes. And so, we will do just that:

val os = new ByteArrayOutputStream()
os.write(0x1000face)
os.write(0x1)
os.write(0x1)

os.write(0xa)
os.write(0xb)
val request = Publish("amq.direct", "cppdemo.basic", os.toByteArray)
(client ? Request(request :: Nil)) onComplete {
  case Success(r: Response) => println("*** " + r.deliveries.head.body)
}

As I said before, I will leave how you process the response to you. You may wish to extract the body from the deliveries, or you may want to do some other funky processing–over to you!

Summary

So, the good news is that having to use native code in your JVM-based applications does not have to immediately mean JNI; and Akka is particularly good fit for messaging infrastructures. Drop in the necessary AMQP client code, build the C++ code and you’re all up and running.

Akka (toolkit)

Published at DZone with permission of Jan Machacek, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • The Role of Data Governance in Data Strategy: Part II
  • Mr. Over, the Engineer [Comic]
  • Distributed Stateful Edge Platforms
  • Last Chance To Take the DZone 2023 DevOps Survey and Win $250! [Closes on 1/25 at 8 AM]

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: