Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Apache Flume to Multiplex or Replicate Big Data

DZone's Guide to

Apache Flume to Multiplex or Replicate Big Data

The solution from Apache Flume to address multiplexing and replicating requirements is really elegant and very easy to set up.

· Big Data Zone
Free Resource

See how the beta release of Kubernetes on DC/OS 1.10 delivers the most robust platform for building & operating data-intensive, containerized apps. Register now for tech preview.

In the Big Data world, irrespective of the type and domain of the application, one of the commonly required services is ETL to collect, aggregate, and move data from a single source or many sources to a centralized data store or multiple destinations. Apache Flume is one of the prominent tools in this space. One of the important aspects of ETL is to fan out data to selected channels, which is commonly known as multiplexing or fan out data to all configured channels, which is data replication.

This moving of data is nothing but copying of a Flume event from an external source to a destination. A Flume event is a unit of data flow. In the case of multiplexing, events are routed to a selected channel based on provided event’s attribute matches a preconfigured value. In the case of replication, events are copied to all channels. 

Let’s look at some common scenarios that involve multiplexing or replicating with the help of clickstream analytics or web server log processing application.

  • Consider a requirement of storing logs or analyzing/processing the logs based on the status code. For example, all logs with successful status codes should be stored in Cassandra and the rest of the logs should be fed to Apache Spark to discover interesting facts about such requests. This is a typical case of multiplexing.
  • You might be required to move logs to ElasticSearch, Cassandra, or Spark Engine. Here, the event is sent to all channels, which is replicating.

Apache Flume is a very mature tool in this space and supports multiplexing and replicating superbly. Let’s see how to configure Flume to perform multiplexing and replicating.

Requirements

From the source, we are getting employee data in the format as below:

employee_roleemployee_idemployee_nameemployee_city.

Now, the source could be real-time streaming or it could be any other source. In the example, I have used Netcat service as a source that listens on a given port and turns each line of text into an event. Now, we want to address following requirements.

Multiplexing

We have to store data HDFS in the text format, and data has to be stored separately based on role. Manager’s data is to be stored at flume_multiplexing_data/manager and developer’s data at flume_multiplexing_data/developer. Here, we have two sinks pertaining to HDFS and corresponding two channels.

For example, following data should be stored under flume_multiplexing_data/manager with records with employee_role = 1:      

Role,ID,Name,City
1, E1,Viren,mumbai
1, E3,Ofer,kolkata

And following data should be stored under flume_multiplexing_data/developer with records with employee_role = 2:

2, E4,Sanjeev,delhi
2, E6,Amruta,banglore

Replicating

In this scenario, we need to move data to all configured channels. Let’s consider we have three sinks: HDFS, Hive, and Avro. The requirement is to copy data from the source is to copy to all configured sinks. Plainly speaking, multiplexing and replicating are sides of the same coin with a difference of additional filtering process.

Configuring Apache Flume for Multiplexing

It’s absolutely essential to understand the basic crux of multiplexing which is event attribute. Extraction of event attribute has to be done while fetching or streaming data. Flume provides regex_extractor  interceptor to do the same. First and foremost, regular expressions have to be supplied to Flume. Based on this, regex_extractor interceptor extracts regex match groups and appends the match groups on the event as headers. It also supports pluggable serializers for formatting the match groups before adding them as event headers. Let’s start with flume configuration, I will keep on appending configuration as we progress.

Configure source as Netcat; provide port and host details. 

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

Configure source as Netcat; provide port and host details. 

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

Define details for a channel that buffers events in memory.

# Use a channel c1 which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100

# Use a channel c2 which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 100

The next step is to define interceptor regex_extractor to extract different patterns, it's a interceptor of type regex_extractor. Then, for the interceptor, define serializer t. The event attribute is added in as a header as field role. The regular expression ^(\d) is for single-digit matching. For different pattern, you just have to change the regular expression.

# Describe regex_extractor to extract different patterns
# Capture event attribute and add it as event header with attribure name as "role"
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_extractor
a1.sources.r1.interceptors.i1.regex = ^(\\d)
a1.sources.r1.interceptors.i1.serializers = t
a1.sources.r1.interceptors.i1.serializers.t.name = role

By multiplexing, Flume allows multiplexing event flow to one or more destinations. The manager’s data gets routed to channel c1 and the developer’s data gets routed to channel c2. For both multiplexing and replicating, the common configuration is to state a list of channels for a source and the policy for fanning it out. This is done by adding a channel selector that can be replicating or multiplexing. For multiplexing, selection rules are specified and based on extracted value and events are mapped to different channels. In this example, two different events (developers and managers records) are mapped to two different channels having different HDFS sinks. These different sinks have different file paths to store managers and developers data separately. Note that event attribute value is mapped to a channel.

# Define channel selector and define mapping
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = type
a1.sources.r1.selector.mapping.1 = c1
a1.sources.r1.selector.mapping.2 = c2

Define HDFS sinks as follows:

# Describe first HDFS sink k1 to store manager's data only, its associated with channel c1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = flume_data_multiplexing/manager
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=Text

# Describe HDFS sink k2 to store developer’s data only, its associated with channel c2
a1.sinks.k2.channel = c2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = flume_data_multiplexing/developer/
a1.sinks.k2.hdfs.fileType=DataStream
a1.sinks.k2.hdfs.writeFormat=Text

Bind the source and sink to the channel.

# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k2.channel = c2
a1.sinks.k1.channel = c1

Configuring Apache Flume for Replicating

As compared to multiplexing, replicating is very simple and straight forward. Again channel selector is used to simply send events to all of the configured channels.

Neither regex_extractor interceptor is required or nor mapping of event attribute value is required, and rest of the configuration remains the same.

# Configure channel selector
a1.sources.r1.selector.type = replicating

How to Run the Example(s)

First of all, you need Apache Flume and HDFS to run the example.

Store Flume configuration in some directory and run the agent using the following command:

##For multiplexing example - 
flume-ng agent --conf conf --conf-file flume_multiplexing_conf.conf --name a1 -Dflume.root.logger=INFO,console

##For replicating example – 
flume-ng agent --conf conf --conf-file flume_replicating_conf.conf --name a1 -Dflume.root.logger=INFO,console

Run the following command to send events to Flume.

telnet localhost 44444

Provide input as follows:

1, E1,Viren,mumbai
2, E2,John,chennai
1, E3,Ofer,kolkata
2, E4,Sanjeev,delhi
1, E5,Ramesh,pune
2, E6,Amruta,banglore

To view multiplexing results, navigate to HDFS, and you would find two directories, one at hdfs://<HADOOP_HOME>/flume_data_multiplexing/manager with filtered data containing only records of the manager and another one at hdfs://<HADOOP_HOME>/flume_data_multiplexing/developer.

To view multiplexing results, navigate to HDFS and you'll find two directories: one at hdfs://<HADOOP_HOME>/flume_data_replicating/employee_records1 with entire data and other one at hdfs://<HADOOP_HOME>/flume_data_replicating/employee_records2 with all records.

The solution from Apache Flume to address multiplexing and replicating requirements is really elegant. You just require to understand the different components and attributes of configuration and set them up, and that’s it. You can play with examples by changing regular expression, adding different sinks, compression, and so on.

You can find complete projects on my GitHub: multiplexing and replicating.

New Mesosphere DC/OS 1.10: Production-proven reliability, security & scalability for fast-data, modern apps. Register now for a live demo.

Topics:
apache flume ,multiplexing ,big data ,tutorial ,replcation

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}