Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

How to Use Logstash Aggregations

DZone's Guide to

How to Use Logstash Aggregations

This is how we use the aggregation plugin in Logstash to make a nonintrusive aggregation of all of our logs with almost no logs lost and with fewer computing resources.

· Performance Zone
Free Resource

At PicScout, we use ELK (Elastic-Logstash-Kibana) for centralized logging and monitoring of our system. The way it works is that we log a single message for each task that our process does (using log4net with the UDP appender). These messages are handled by Logstash and saved into the ElasticSearch DB. Then, using Kibana, we see a dashboard and aggregation of these messages with a very nice UI.

Starting Point

Image title So if everything is in the clear, why am I writing this post?

Well, we wanted to scale. Scaling means more process, i.e. doing more tasks, and doing more tasks means having more messages, and having more messages means more data is being handled by Logstash and sent to the ElasticSearch DB.

So it seems like we also need to scale our ELK system, right?

Well, we could do that, but before we go ahead and buy some hardware, let’s think of other ways to deal with this kind of scaling.

We thought of three options:

  1. Send only some of the data to ELK and extrapolate the data (i.e. send only 50% of the messages to the ElasticSearch DB and multiply the Kibana dashboards by 2). This can give us good results, assuming we decide which messages to drop randomly:

  2. Image title

  3. Aggregation at the application level. This will require developing some code to handle in-memory aggregations before sending a message:

  4. Image title

  5. Aggregation at the Logstash level. This will not require any changes in the application but will require a change in the Logstash script, which will aggregate the result before sending a message.

  6. Image title

Eventually, we decided to go with Option 3 because it was less intrusive — we didn’t have to change our process code and we didn’t have to “lose” data.

How did we do that?

It turns out that Logstash has a nifty plugin called aggregate.

Sound simple? Not so much in our case. As you can see from the documentation, none of the supported use cases work for us since our use case is a no start/no end event that runs a “forever” type of case.

Then how did we manage to achieve scaling? Let's look at the final script and go over it piece by piece:

Given our grok:

match => [ “message”, “%{TIMESTAMP_ISO8601:origtime}%{GREEDYDATA} %{WORD:progress}:%{WORD:action} elapsed:%{NUMBER:elapsed:int}%{GREEDYDATA}”]

This is the aggregation filter:

aggregate {
       task_id => “%{action}_%{progress}”
       code =>
“
       map[‘avg’] || = 0;
       map[‘avg’] += event.get(‘elapsed’);
       map[‘my_count’] || = 0;
       map[‘my_count’] += 1;
 
       if (map[‘my_count’] == ${LogstashAggregationCount})#Environment variable
       event.set(‘elapsedAvg’, (map[‘avg’] / map[‘my_count’]))
       event.set(‘Aggregetion’, true)
       map[‘avg’] = 0
       map[‘my_count’] = 0
       end
“
}
if [Aggregetion] {
       mutate {
       remove_field => [“message”, “tags”, “elapsed”, “type”]
       }
       aggregate {
       task_id => “%{action}_%{progress}”
       code => “”
       end_of_task => true
       }
}
if (![Aggregetion]) {
       drop {}
}

Now, let's go over it:

task_id => "%{action}_%{progress}"

The line above defines our specific aggregation map. Each aggregation in our system will create its own map with its own data so that it works as expected and we don't mix different types of logs. In this case, task_id is composed of our log action and progress.

Next, we have our code segment:

code => “
map[‘my_count’] || = 0;
       map[‘my_count’] += 1;
       map[‘avg’] || = 0;
       map[‘avg’] += event.get(‘elapsed’);
      
 
       if (map[‘my_count’] == ${LogstashAggregationCount})#Environment variable
       event.set(‘elapsedAvg’, (map[‘avg’] / map[‘my_count’]))
       event.set(‘Aggregetion’, true)
       map[‘avg’] = 0
       map[‘my_count’] = 0
       end
“

Let’s go over the syntax real quick. Logstash uses Ruby as the code language.

  • map['<name> ']  is our predefined map in which we can store our aggregation data.

  • event is the received log after grok, which means we can get parts of the log by name as long as we have a corresponding grok variable defined.

First, we initialize our counter variable my_count. This will control the amount of aggregation we want to do, as in how many logs we want to aggregate in this aggregation.

|| = is the equivalent of checking if it is undefined/nil/false. Initialize it as a 0 full explanation.

Then, we can start adding our aggregation logic. In this case, we want to aggregate the elapsed time of our action by averaging it.

So, we start by summing all the elapsed times of our logs into map['avg'].

We do this by adding the elapsed data from ourevent variable: map['avg'] += event.get('elapsed');.

Next, we have our most important condition: 

if (map['my_count'] == ${LogstashAggregationCount})#Environment variable

This condition decides whether it's time to send the aggregated data.

Since we will probably have more than one aggregations in our Logstash, it's a good idea to have the aggregation counter in a single place. The easiest way to do so is by adding an environment variable in our Logstash machine and reading it from the Logstash like so: ${EnvironmentVariable}.

Note that if it’s not defined in the machine, this will throw an exception.

Now we can do the actual aggregation and send our aggregated log:

event.set(‘elapsedAvg’, (map[‘avg’] / map[‘my_count’]))
       event.set(‘Aggregetion’, true)

       map[‘avg’] = 0

       map[‘my_count’] = 0

The first thing is to add the aggregated average using the event.set property. This will add a new variable to our log named elapsedAvg with our calculated average.

Next, we add a new variable named Aggregation with a true value.

This will help us remove the unaggregated logs before reaching the ElasticSearch database.

This happens in the code:

if (![Aggregetion]) {

       drop {}

}

Lastly, we have the final optional “mutation” of the aggregated log: 

if [Aggregetion] {

       mutate {

       remove_field => [“message”, “tags”, “elapsed”, “type”]
       }

       aggregate {

       task_id => “%{action}_%{progress}”
       code => “”
       end_of_task => true
       }

}

This code removes irrelevant variables from our log. For example, we don't need the elapsed time variable anymore since we have our new elapsedAvg field.

Finally, we tell the aggregation framework to end this aggregation map. This is necessary because, by default, all maps older than 1,800 seconds are automatically deleted. So to prevent data loss, we invoke it ourselves.

This is the basic idea of how we use the aggregation plugin in Logstash to make a nonintrusive aggregation of all of our logs with next-to-no log lost and with one-hundredth of the computing resources.

Thanks to Idan Ambar and Jony Feldman.

Topics:
logstash ,kibana ,elasticsearch ,logging ,performance ,tutorial ,aggregation ,monitoring

Published at DZone with permission of Adi BatSheva. See the original article here.

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}