Apache Flume to Multiplex or Replicate Big Data
The solution from Apache Flume to address multiplexing and replicating requirements is really elegant and very easy to set up.
Join the DZone community and get the full member experience.
Join For FreeIn the Big Data world, irrespective of the type and domain of the application, one of the commonly required services is ETL to collect, aggregate, and move data from a single source or many sources to a centralized data store or multiple destinations. Apache Flume is one of the prominent tools in this space. One of the important aspects of ETL is to fan out data to selected channels, which is commonly known as multiplexing or fan out data to all configured channels, which is data replication.
This moving of data is nothing but copying of a Flume event from an external source to a destination. A Flume event is a unit of data flow. In the case of multiplexing, events are routed to a selected channel based on provided event’s attribute matches a preconfigured value. In the case of replication, events are copied to all channels.
Let’s look at some common scenarios that involve multiplexing or replicating with the help of clickstream analytics or web server log processing application.
- Consider a requirement of storing logs or analyzing/processing the logs based on the status code. For example, all logs with successful status codes should be stored in Cassandra and the rest of the logs should be fed to Apache Spark to discover interesting facts about such requests. This is a typical case of multiplexing.
- You might be required to move logs to ElasticSearch, Cassandra, or Spark Engine. Here, the event is sent to all channels, which is replicating.
Apache Flume is a very mature tool in this space and supports multiplexing and replicating superbly. Let’s see how to configure Flume to perform multiplexing and replicating.
Requirements
From the source, we are getting employee data in the format as below:
employee_role
, employee_id
, employee_name
, employee_city
.
Now, the source could be real-time streaming or it could be any other source. In the example, I have used Netcat service as a source that listens on a given port and turns each line of text into an event. Now, we want to address following requirements.
Multiplexing
We have to store data HDFS in the text format, and data has to be stored separately based on role. Manager’s data is to be stored at flume_multiplexing_data/manager
and developer’s data at flume_multiplexing_data/developer
. Here, we have two sinks pertaining to HDFS and corresponding two channels.
For example, following data should be stored under flume_multiplexing_data/manager
with records with employee_role = 1
:
Role,ID,Name,City
1, E1,Viren,mumbai
1, E3,Ofer,kolkata
And following data should be stored under flume_multiplexing_data/developer
with records with employee_role = 2
:
2, E4,Sanjeev,delhi
2, E6,Amruta,banglore
Replicating
In this scenario, we need to move data to all configured channels. Let’s consider we have three sinks: HDFS, Hive, and Avro. The requirement is to copy data from the source is to copy to all configured sinks. Plainly speaking, multiplexing and replicating are sides of the same coin with a difference of additional filtering process.
Configuring Apache Flume for Multiplexing
It’s absolutely essential to understand the basic crux of multiplexing which is event attribute. Extraction of event attribute has to be done while fetching or streaming data. Flume provides regex_extractor
interceptor to do the same. First and foremost, regular expressions have to be supplied to Flume. Based on this, regex_extractor
interceptor extracts regex match groups and appends the match groups on the event as headers. It also supports pluggable serializers for formatting the match groups before adding them as event headers. Let’s start with flume configuration, I will keep on appending configuration as we progress.
Configure source as Netcat; provide port and host details.
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
Configure source as Netcat; provide port and host details.
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
Define details for a channel that buffers events in memory.
# Use a channel c1 which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100
# Use a channel c2 which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 100
The next step is to define interceptor regex_extractor
to extract different patterns, it's a interceptor of type regex_extractor
. Then, for the interceptor, define serializer t
. The event attribute is added in as a header as field role
. The regular expression ^(\d)
is for single-digit matching. For different pattern, you just have to change the regular expression.
# Describe regex_extractor to extract different patterns
# Capture event attribute and add it as event header with attribure name as "role"
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_extractor
a1.sources.r1.interceptors.i1.regex = ^(\\d)
a1.sources.r1.interceptors.i1.serializers = t
a1.sources.r1.interceptors.i1.serializers.t.name = role
By multiplexing, Flume allows multiplexing event flow to one or more destinations. The manager’s data gets routed to channel c1
and the developer’s data gets routed to channel c2
. For both multiplexing and replicating, the common configuration is to state a list of channels for a source and the policy for fanning it out. This is done by adding a channel selector that can be replicating or multiplexing. For multiplexing, selection rules are specified and based on extracted value and events are mapped to different channels. In this example, two different events (developers and managers records) are mapped to two different channels having different HDFS sinks. These different sinks have different file paths to store managers and developers data separately. Note that event attribute value is mapped to a channel.
# Define channel selector and define mapping
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.1 = c1
a1.sources.r1.selector.mapping.2 = c2
Define HDFS sinks as follows:
# Describe first HDFS sink k1 to store manager's data only, its associated with channel c1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = flume_data_multiplexing/manager
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=Text
# Describe HDFS sink k2 to store developer’s data only, its associated with channel c2
a1.sinks.k2.channel = c2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = flume_data_multiplexing/developer/
a1.sinks.k2.hdfs.fileType=DataStream
a1.sinks.k2.hdfs.writeFormat=Text
Bind the source and sink to the channel.
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k2.channel = c2
a1.sinks.k1.channel = c1
Configuring Apache Flume for Replicating
As compared to multiplexing, replicating is very simple and straight forward. Again channel selector is used to simply send events to all of the configured channels.
Neither regex_extractor
interceptor is required or nor mapping of event attribute value is required, and rest of the configuration remains the same.
# Configure channel selector
a1.sources.r1.selector.type = replicating
How to Run the Example(s)
First of all, you need Apache Flume and HDFS to run the example.
Store Flume configuration in some directory and run the agent using the following command:
##For multiplexing example -
flume-ng agent --conf conf --conf-file flume_multiplexing_conf.conf --name a1 -Dflume.root.logger=INFO,console
##For replicating example –
flume-ng agent --conf conf --conf-file flume_replicating_conf.conf --name a1 -Dflume.root.logger=INFO,console
Run the following command to send events to Flume.
telnet localhost 44444
Provide input as follows:
1, E1,Viren,mumbai
2, E2,John,chennai
1, E3,Ofer,kolkata
2, E4,Sanjeev,delhi
1, E5,Ramesh,pune
2, E6,Amruta,banglore
To view multiplexing results, navigate to HDFS, and you would find two directories, one at hdfs://<HADOOP_HOME>/flume_data_multiplexing/manager with filtered data containing only records of the manager and another one at hdfs://<HADOOP_HOME>/flume_data_multiplexing/developer.
To view multiplexing results, navigate to HDFS and you'll find two directories: one at hdfs://<HADOOP_HOME>/flume_data_replicating/employee_records1 with entire data and other one at hdfs://<HADOOP_HOME>/flume_data_replicating/employee_records2 with all records.
The solution from Apache Flume to address multiplexing and replicating requirements is really elegant. You just require to understand the different components and attributes of configuration and set them up, and that’s it. You can play with examples by changing regular expression, adding different sinks, compression, and so on.
You can find complete projects on my GitHub: multiplexing and replicating.
Opinions expressed by DZone contributors are their own.
Comments