Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Using Kylo for Self-Service Data Ingestion, Cleansing, and Validation

DZone's Guide to

Using Kylo for Self-Service Data Ingestion, Cleansing, and Validation

Learn about performing data ingestion, cleansing, and validation without *any* coding using the Kylo data lake platform.

· Big Data Zone ·
Free Resource

Hortonworks Sandbox for HDP and HDF is your chance to get started on learning, developing, testing and trying out new features. Each download comes preconfigured with interactive tutorials, sample data and developments from the Apache community.

Kylo is a feature-rich data lake platform built on Apache Hadoop and Apache Spark. Kylo provides a business-friendly data lake solution and enables self-service data ingestion, data wrangling, data profiling, data validation, data cleansing/standardization, and data discovery. Its intuitive user interface allows IT professionals to access the data lake (without having to code).

Though there are many tools to ingest batch data and/or streaming or real-time data, Kylo supports both. It provides a plug-in architecture with a variety of extensions. Apache NiFi templates provide incredible flexibility for batch and streaming use cases.

In this post, let's discuss ingesting data from Apache Kafka, performing data cleansing and validation at real-time, and persisting the data into Apache Hive table.

Prerequisites

  • Install Kafka
  • Deploy Kylo, where the deployment requires knowledge on different components/technologies such as:
    • AngularJS for Kylo UI
    • Apache Spark for data wrangling, data profiling, data validation, data cleansing, and schema detection
    • JBoss ModeShape and MySQL for Kylo Metadata Server
    • Apache NiFi for pipeline orchestration
    • Apache ActiveMQ for interprocess communication
    • Elasticsearch for search-based data discovery
    • All Hadoop technologies but most preferably HDFS, YARN, and Hive

To learn more about basics and installation of Kylo in an AWS EC2 instance, refer to our previous blog on Kylo setup for data lake management.

Data Description

A user transaction dataset with 68K rows, generated by Treselle team, is used as the source file. The input dataset has time, UUID, user, business, address, amount, and disputed columns.

Sample dataset:select

Examples of invalid and missing values in the dataset:select

Use Case

  • Publish user transaction dataset into Kafka.
  • Ingest data from Kafka using Kylo data ingestion template and standardize and validate data.

Synopsis

  • Customize data ingest pipeline template.
  • Define categories for feeds.
  • Define feeds with source and destination.
  • Cleanse and validate data.
  • Schedule feeds.
  • Monitor feeds.

Self-Service Data Ingest, Data Cleansing, and Data Validation

Kylo utilizes Spark to provide a pre-defined pipeline template, which implements multiple best practices around data ingestion. By default, it comes up with file system and databases. It helps business users in simplifying the configuration of ingest data from new sources such as JMS, Kafka, HDFS, HBase, FTP, SFTP, REST, HTTP, TCP, IMAP, AMQP, POP3, MQTT, WebSocket, Flume, Elasticsearch and Solr, Microsoft Azure Event Hub, Microsoft Exchange using Exchange Web Services (EWS), Couchbase, MongoDB, Amazon S3, SQS, DynamoDB, and Splunk.

Apache NiFi, a scheduler and orchestration engine, provides an integrated framework for designing new types of pipelines with 250+ processors (data connectors and transforms). The pre-defined data ingest template is modified by adding Kafka, S3, HDFS, and FTP as shown in the below screenshot:select

Get, Consume, and Fetch processors are used to ingest the data. The Get and Consume versions of Kafka processors in NiFi is as follows:

  • GetKafka 1.3.0: Fetches messages from the earlier version of Apache Kafka (specifically 0.8.x versions). The complementary NiFi processor used to send messages is PutKafka.

  • ConsumeKafka_0_10 1.3.0: Consumes messages from the newer version of Apache Kafka specifically built against the Kafka 0.10.x Consumer API.

Based on need, a custom processor or other custom extension for NiFi can be written and packaged as a NAR file and deployed into NiFi.

Customizing Data Ingest Pipeline Template

Upon updating and saving the data ingest template in NiFi, the same template can be customized in Kylo UI. The customization steps involve:

  • Customizing feed destination table
  • Adding input properties
  • Adding additional properties
  • Performing access control
  • Registering the template

selectselect

Defining Categories for Feeds

All the feeds created in Kylo should be categorized. The process group in NiFi is launched to execute the feeds. The “Transaction raw data” category is created to categorize the feeds.select

Defining Feeds With Source and Destination

Kylo UI is self-explanatory to create and schedule feeds. To define feeds, perform the following:

  • Choose the data ingest template.
  • Provide feed name, category, and description.

select

  • Choose input Data Source to ingest data.
  • Customize the configuration parameter related to that source; for example, transactionRawTopic in Kafka and batch size 10000.

select

  • Define output feed table using either of the following methods:
    • Manually define the table columns and its data type.
    • Upload the sample file and update the data type as per the data in the column.
  • Preview the data under Feed Details section in the top right corner:

select

  • Define partitioning output table by choosing Source Field and Partition Formula, for example, time as source field and year as partition formula to partition the data.

select

Cleansing and Validating Data

The feed creation wizard UI allows end-users to configure cleansing and standardization functions to manipulate data into conventional or canonical formats (for example, simple data type conversion such as dates or stripping special characters) or data protection (for example, masking credit cards, PII, and so on).

It allows users to define field-level validation to protect data against quality issues and provides schema validation automatically. It provides an extensible Java API to develop custom validation, custom cleansing, and standardization routines as per needs. It provides predefined rules for standardization and validation of different data types.select

To clean and validate data, perform the following:

  • Apply different predefined standardization rules for time, user, address, and amount columns as shown below:

selectselect

  • Apply standardization and validation for different columns as shown in the below screenshot:

select

  • Define the data ingestion merge strategy in the output table.
  • Choose Dedupe and merge to ignore duplicated batch data and insert it into the desired output table.

select

  • Use the Target Format section to define data storage and compression options.
    • Supported storage formats: ORC, Parquet, Avro, TextFile, and RCFile
    • Compression options: Snappy and Zlib

select

Scheduling Feeds

To schedule the feeds using a cron or timer-based mechanism, enable the Enable feed immediately option to enable the feeds immediately without waiting for a cron job or timer criteria needs.select

Monitoring Feeds

After scheduling the feeds, the actual execution will be performed in NiFi. Feeds status can be edited and monitored. The feed details can be changed at any time and the feeds can be re-scheduled.

select

An overview of created feed job status can be seen under jobs in the Operation sections. By drilling down the jobs, you can identify the details of each job and perform debugging upon feed job execution failure.

select

The Job Activity section provides details such as completed, running, and so on of a specific feed recurring activity.

select

The Operational Job Statistics section provides details such as success rate, flow rate per second, flow duration, and steps duration of specific job statistics.

selectselect

Conclusion

In this blog, we discussed data ingestion, cleansing, and validation without any coding in the Kylo data lake platform. The ingested data output from Kafka is shown in a Hive table in Ambari looks as follows:select

In my next blog, we'll discuss data profiling and search-based data discovery.

select

References

Hortonworks Community Connection (HCC) is an online collaboration destination for developers, DevOps, customers and partners to get answers to questions, collaborate on technical articles and share code examples from GitHub.  Join the discussion.

Topics:
data ingestion ,data cleaning ,data validation ,apache hadoop ,apache spark ,apache kafka ,apache hive ,amazon ec2 ,profiling

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}