Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Message Tracking Solution Using Apache Camel, JBoss Fuse, Elastic Stack, and Syslog

DZone's Guide to

Message Tracking Solution Using Apache Camel, JBoss Fuse, Elastic Stack, and Syslog

Learn how Apache Camel integrated with JBoss Fuse, Syslog, and Elastic Stack can be set up to route messages, manage logs, and generate reports.

Free Resource

The State of API Integration 2018: Get Cloud Elements’ report for the most comprehensive breakdown of the API integration industry’s past, present, and future.

It took a long time to plan to write a blog on Apache Camel, and I finally came up with an idea that felt interesting to me, and I hope you will enjoy it too.

The solution to integrate Camel with JBoss Fuse, Syslog and Elastic Stack eventually becomes a design. I would like to start by stating a problem and then discuss solving it step by step.

The Problem

I had a use case where the incoming and outgoing messages were used for tracking and analysis purposes. There was already a process in place for doing same by using wiretap endpoint in Camel to route messages asynchronously for storing the messages as a file.

Example for Wiretap config:

file:/apps/esb/log/service/Rq/?fileName=$simple{date:now:yyyy}/$simple{date:now:MM}/$simple{date:now:dd}/$simple{RequestID}.xml

However, this created a problem; whenever a high load hit the server, it eventually resulted in high usage of memory in the container. There were 170 and more services which were deployed in production, and changing those services would be a risk to follow up on testing and re-deploying. Luckily, these properties were externalized, and then I figured out a solution to just change the configuration.

In this post, I will be explaining the solution to the problem by leveraging Apache Camel custom component features and extending further to build analytics.

Solution

We can split our problems into two phases. In the first phase, I will discuss fixing the actual problem of logging the messages in files, and next, enhancing it to create properly structured data, centralizing the messages and visualizing for analytics.

Refer to the code from the GitHub repository.

Phase 1

Just like any other producer endpoints such as HTTP, log, file, etc, we are going to create our own Camel custom producer endpoint, in which the exchange messages are pushed to the component. In our case, the LogManagerProducer class will receive the exchange. MessageType is the POJO class where we will be mapping all the necessary parameters from the exchange, then convert into a JSON payload. This data format will remove new line characters and in later stages parsing using logstash becomes easier.

MessageType messageType = new MessageType();
messageType.setRequestId(exchange.getProperty("transactionId", String.class));
messageType.setMessageType(endpoint.getMessageType());
messageType.setServiceName(endpoint.getServiceName());
messageType.setLoggingTime(endpoint.getLogTime());
messageType.setData(exchange.getIn().getBody(String.class));

Gson gson = new GsonBuilder().disableHtmlEscaping().create();

SyslogMessageTransmitter transmitter = new SyslogMessageTransmitter();
transmitter.send(gson.toJson(messageType));

Below is the code snippet from the SyslogMessageTransmitter class to configure the Syslog message. Refer this link to understand syslog protocol features. The implementation logic is taken from CloudBees-community, which is simple to use. Moreover, I am not interested in rewriting code which is already available for doing the same task.

TcpSyslogMessageSender messageSender = new TcpSyslogMessageSender();
messageSender.setDefaultFacility(Facility.LOCAL0);
messageSender.setDefaultSeverity(Severity.INFORMATIONAL);
//to run the Junit, change the Ip address and port of the syslog server
messageSender.setSyslogServerHostname("localhost");
messageSender.setSyslogServerPort(port);
messageSender.setMessageFormat(MessageFormat.RFC_3164);
messageSender.setSsl(false);
messageSender.setMaxRetryCount(3);
messageSender.sendMessage(message);

Here, the send method is responsible for pushing the message via TCP channel.

Rsyslog Configuration

Change the Rsyslog server configuration as below and restart the service:

/etc/rsyslog.config

Uncomment
$ModLoad imtcp
$InputTCPServerRun 514

add# ESB Transaction Logs
local0.info /
 var / log / esb - transaction - logs.log

However, I have also created a sample project to communicate with the log manager component and you'll find that in the repo as camel-log-demo.

With the above details, we are now ready to deploy two projects into a JBoss Fuse container and messages can be seen in the /var/log/esb-transaction-logs.log file.

In this phase, we looked at how to develop a Camel custom component and configure Rsyslog, through which messages are logged to a single file. Rsyslog has built-in features to use log rotation policies. However, in the next phase, we can extend the current capability to centralize the messages from different servers.

Phase 2

We can see the design solution of our project; Elastic Stack is used to visualize the messages and acts as a centralized server to manage logs and generate reports in our case.

In the large enterprise, multiple nodes are used for load balancing messages across servers. The main problem we will be facing in such cases is to identify which server has served a particular incoming request. Typically, we will log into each server checking log messages. However, this will eventually become a painful task.

For instance, the request message can contain a unique ID, and it will help to trace certain messages and its execution, but still, we have not arrived at a complete solution to centralize logs.

Install Elastic Stack on the log server; it has independent components such as Logstash, Elasticsearch, and Kibana.

Logstash is used to parse the message, and the configuration is listed below:

File Name: indexpattern.config

input {
 tcp {
  type => 'esbtranslog'
  port => 514
 }
}
filter {
 if [type] == "esbtranslog" {
  grok {
   match => {
    message => "<%{WORD:ignore}\>%{SYSLOGTIMESTAMP:time} %{SYSLOGHOST:clientip} -: %{GREEDYDATA:request}"
   }
  }
  json {
   source => "request"
   target => "log"
   remove_field => ["request"]
  }
 }
}
output {
 if [type] == "esbtranslog" {
  elasticsearch {
   hosts => 'http://localhost:9200'
   index => 'logstash-local-%{+YYYY.MM.dd}'
   document_type => '%{type}'
  }
 }
}

Add the below configuration for Rsyslog:

/etc/rsyslog.d/pushtoserver.conf

local0.info  @@server-ip:514

Now, let's integrate all three configured components:

Logstash/bin> logstash -f indexpattern.config

Elasticsearch/bin> elasticsearch

Kibana/bin> kibana

Copy two bundles, camel-log-demo and camel-logmanager-component, into the Jboss-fuse/deploy directory.

Finally, open http://ip-address:5601 and you can see the log message as shown below:

Image title

To conclude, I have just discussed the high level design with some coding snippets. The main objective was to convey the design for logging messages since, in these fast-paced growing technologies, logging has no standard.

This is my first post, I am very excited to share with you.

I hope it will be helpful.

Your API is not enough. Learn why (and how) leading SaaS providers are turning their products into platforms with API integration in the ebook, Build Platforms, Not Products from Cloud Elements.

Topics:
apache camel ,jboss fuse ,elastic stack ,rsyslog ,integration ,messaging

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}