Apache Flume: Regex Filtering
There's so much precious data out there that it can be difficult for humans to get meaning out of it sometimes. Apache Flume to the rescue!
Join the DZone community and get the full member experience.Join For Free
In today's Big Data world, applications generate huge amounts of electronic data — and these huge electronic data repositories contain valuable, precious pieces of information. It’s very difficult for a human analyst or domain expert to make an interesting discovery or find patterns that can help the decision-making process. We need to have automated processes to effectively utilize enormous, information-rich data for planning and investment decision-making. Before data is processed, it is absolutely essential to collect the data, aggregate and transform the data, and eventually move the data to a repository on which different analytics and data mining tools can operate.
One of the popular tools to carry out all these activities is Apache Flume. Such data is generally in the form of events or logs. Apache Flume has three main components:
Source: The source of data could be enterprise servers, file systems, the cloud, data repositories, and so on.
Sink: Sink is the target repository on which data can be stored. It could be a centralized place like HDFS, a processing engine like Apache Spark, or a data repository/search engine like ElasticSearch.
Channel: Channel stores the event until it is consumed by the sink. Channel acts as passive storage. Channel supports recoverability from failure and high reliability; examples of channels are file channels backed by local file systems and memory-based channel.
Flume is highly configurable and supports many sources, channels, serializers, and sinks. It also supports data streaming. Flume's powerful feature is the interceptor, which supports the capability to modify/drop an event in-flight. One of the supported interceptors is
regex_filter interprets an event body as text and matches it against supplied regular expressions, and based on the matched pattern and expression, events are included or excluded. We are going to see
regex_filter in detail.
From the source, we are getting data in the form of street number, name, city, and role. Now, the source could be real-time streaming or it could be any other source. In the example, I have used Netcat service as a source that listens on a given port and turns each line of text into an event. The requirement is to save the data into HDFS in text format. Before saving the data to HDFS, the data has to be filtered on the basis of role. Only managers’ records need to be stored in HDFS; data with other roles has to be ignored. For example, the following data is allowed:
The following data is not allowed:
How to Achieve This Requirement
This can be achieved by using
regex_filter interceptor. This interceptor will filter events on the basis of role, and only interested events are sent to the sink; meanwhile, other events are ignored.
## Describe regex_filter interceptor and configure exclude events attribute a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = regex_filter a1.sources.r1.interceptors.i1.regex = developer a1.sources.r1.interceptors.i1.excludeEvents = true
The HDFS sink allows data to be stored in HDFS in the format of text/sequence files. It can also store data in a compressed format.
a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 ## assumption is that Hadoop is CDH a1.sinks.k1.hdfs.path = hdfs://quickstart.cloudera:8020/user/hive/warehouse/managers a1.sinks.k1.hdfs.fileType= DataStream a1.sinks.k1.hdfs.writeFormat = Text
How to Run the Example
First of all, you would require Hadoop to run the example as a sink in HDFS. If you don’t have a Hadoop cluster, then change sink to log and then just set up Flume. Store
regex_filter_flume_conf.conf in some directory and run the agent using the following command.
flume-ng agent --conf conf --conf-file regex_filter_flume_conf.conf --name a1 -Dflume.root.logger=INFO,console
Note that agent name is
a1. I have used Netcat as a source.
a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444
Once the Flume agent starts, run the following command to send event to Flume.
telnet localhost 40000
It’s almost done now; just provide the input text.
1,alok,mumbai,manager 2,jatin,chennai,manager 3,yogesh,kolkata,developer 4,ragini,delhi,manager 5,jyotsana,pune,developer 6,valmiki,banglore,manager
Navigate to HDFS, and you would observe that a file would have got created in HDFS under hdfs://quickstart.cloudera:8020/user/hive/warehouse/managers with filtered data containing only records of the manager.
The complete flume configuration —
regex_filter_flume_conf.conf — is as follows:
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source - netcat a1.sources.r1.type = netcat a1.sources.r1.bind = localhost a1.sources.r1.port = 44444 # Describe the HDFS sink a1.channels = c1 a1.sinks = k1 a1.sinks.k1.type = hdfs a1.sinks.k1.channel = c1 a1.sinks.k1.hdfs.path = hdfs://quickstart.cloudera:8020/user/hive/warehouse/managers a1.sinks.k1.hdfs.fileType= DataStream a1.sinks.k1.hdfs.writeFormat = Text ## Describe regex_filter interceptor and configure exclude events attribute a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = regex_filter a1.sources.r1.interceptors.i1.regex = developer a1.sources.r1.interceptors.i1.excludeEvents = true # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
You can find the complete project here.
Opinions expressed by DZone contributors are their own.