Enriching Your Data With Kapacitor
Learn how to use Kapacitor to enrich data coming from Telegraf to InfluxDB, as well as to add other types of data—potentially avoiding the need to create custom plugins.
Join the DZone community and get the full member experience.
Join For FreeFrom time to time, we have seen requests from the community around querying InfluxDB based on a business period such as the typical business day or broken down into shift periods. Consider the following request: How do I summarize my data for the entire month of August just for business hours, defined by Monday through Friday between 8 AM and 5 PM? InfluxQL does not currently possess any functions for filtering based on time. We are limited to:
SELECT * FROM “mymeasurement” WHERE time >= ‘2017-08-01 08:00:00.000000’ and time <= ‘2017-08-31 17:00:00.000000’;
So how do we accomplish this? The provided Telegraf plugins typically just send a timestamp value, and also have the ability to send some static tags in addition to the metrics and tags associated with the configured plugin. The solution is to use Kapacitor as a "pre-processor" to "decorate" or enrich your data with a computed value that represents a time period that you desire to query.
For the purposes of this article, we are running on localhost
for Telegraf, InfluxDB, and Kapacitor, but in a full-fledged environment, these will be running on different hosts.
The first step is to configure Telegraf to write to Kapacitor instead of directly to InfluxDB. In the [[outputs.influxdb]]
section of your telegraf.conf
file, there are three key settings to consider:
[[outputs.influxdb]]
urls = [http://localhost:9092]
database = “kap_telegraf”
retention_policy = “autogen”
The URLs parameter must point to port 9092 (Kapacitor's default listening port) instead of port 8086 for InfluxDB. The database parameter should point to a non-existent database (you can ignore Telegraf's warning about the database not being found). The retention_policy
parameter should be set to autogen
or a specific retention policy that you have previously created in your instance.
Caution: Leaving retention_policy
set to ""
(default) is not the same as autogen
, which is specified as the default retention policy when InfluxDB is initialized.
All other settings in telegraf.conf
can be configured normally for your instance.
The next step is to create a TICKscript that will process the data coming from Telegraf. In this example, we are interested in creating a tag that will contain a true or false value if the data point is during business hours that we described above.
stream
|from()
.database('kap_telegraf')
|eval(lambda: if(((weekday("time") >= 1 AND weekday("time") <= 5) AND (hour("time") >= 8 AND (hour("time")*100+minute("time")) <= 1700)), 'true', 'false'))
.as('business_hours')
.tags('business_hours')
.keep()
|delete()
.field('business_hours')
|influxDBOut()
.database('telegraf')
.retentionPolicy('autogen')
.tag('kapacitor_augmented','true')
In this TICKscript, we are streaming from the non-existent database kap_telegraf
that we configured above in our telegraf.conf
file. The .from()
method for the stream()
node only needs the database to match to. We then pass control to an eval()
node that will evaluate if the point has arrived in the window that we have designed. In this case, we utilize the weekday()
, hour()
, and minute()
functions described here to evaluate the "time" value.
The first part of the condition evaluates whether the day of the week falls between Monday (1) and Friday (5). The second part of the condition evaluates the hour value, being careful to consider that we want to stop at the 00
mark in hour 17 (5 PM). To do that, we are multiplying the hour by 100 and adding the result of the minute()
function to compare to the ending time of 1700. If the time
value falls within this range, the eval()
node returns true as a field called business_hours
. However, since we want to query on this value, we should have it as a tag, so we chain a .tags()
method to change the value to a tag called business_hours
.
Important: The eval()
node will eliminate all other fields and tags from the stream, so we want to specify .keep()
to retain these values from the stream.
At this point, we have both a field and a tag called business_hours
that contains the output of the eval()
node. We should filter this out of the stream by calling a delete()
node that specifies the field to be deleted via the .fields()
method.
Finally, we pass control in the stream to an influxDBOut()
node that specifies the destination database and retention policy to write to. We have added an additional static tag for this example called kapacitor-augmented
. All other data like measurement name is carried through, and it is only necessary to provide new information. In this case, we are going to write to the telegraf
database and the autogen
retention policy.
Once you have created the TICKscript (referenced below as businesshours.tick
), we must tell Kapacitor that we want to run it. For this example, we will utilize the Kapacitor CLI to configure the task.
$ kapacitor define business_hours -type stream -dbrp kap_telegraf.autogen -tick business_hours.tick
This command defines the task called business_hours
as a stream type listening for writes to kap_telegraf.autogen
, which is the database name and retention policy name that was configured in Telegraf above. Once we have successfully created the task, we need to enable it for processing by Kapacitor.
$ kapacitor enable business_hours
To show the status, we can ask Kapacitor to list the current tasks:
$ kapacitor list tasks
ID Type Status Executing Databases and Retention Policies
business_hours stream enabled true ["kap_telegraf"."autogen"]
Once data begins the flow through Kapacitor to InfluxDB, you can then add your condition AND business_hours='true'
to the first query we specified:
SELECT * FROM “mymeasurement” WHERE time >= ‘2017-08-01 08:00:00.000000’ and time <= ‘2017-08-31 17:00:00.000000’ AND business_hours=’true’;
In summary, we have shown a simple example of how to use Kapacitor to enrich your data coming from Telegraf into InfluxDB. This method could be used to add other types of data, potentially obviating the need to create custom Telegraf plugins to meet your business needs.
Published at DZone with permission of Alan Caldera, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments