Using Kylo for Self-Service Data Ingestion, Cleansing, and Validation
Learn about performing data ingestion, cleansing, and validation without *any* coding using the Kylo data lake platform.
Join the DZone community and get the full member experience.Join For Free
Kylo is a feature-rich data lake platform built on Apache Hadoop and Apache Spark. Kylo provides a business-friendly data lake solution and enables self-service data ingestion, data wrangling, data profiling, data validation, data cleansing/standardization, and data discovery. Its intuitive user interface allows IT professionals to access the data lake (without having to code).
Though there are many tools to ingest batch data and/or streaming or real-time data, Kylo supports both. It provides a plug-in architecture with a variety of extensions. Apache NiFi templates provide incredible flexibility for batch and streaming use cases.
In this post, let's discuss ingesting data from Apache Kafka, performing data cleansing and validation at real-time, and persisting the data into Apache Hive table.
- Install Kafka
- Deploy Kylo, where the deployment requires knowledge on different components/technologies such as:
- AngularJS for Kylo UI
- Apache Spark for data wrangling, data profiling, data validation, data cleansing, and schema detection
- JBoss ModeShape and MySQL for Kylo Metadata Server
- Apache NiFi for pipeline orchestration
- Apache ActiveMQ for interprocess communication
- Elasticsearch for search-based data discovery
- All Hadoop technologies but most preferably HDFS, YARN, and Hive
To learn more about basics and installation of Kylo in an AWS EC2 instance, refer to our previous blog on Kylo setup for data lake management.
A user transaction dataset with 68K rows, generated by Treselle team, is used as the source file. The input dataset has time, UUID, user, business, address, amount, and disputed columns.
- Publish user transaction dataset into Kafka.
- Ingest data from Kafka using Kylo data ingestion template and standardize and validate data.
- Customize data ingest pipeline template.
- Define categories for feeds.
- Define feeds with source and destination.
- Cleanse and validate data.
- Schedule feeds.
- Monitor feeds.
Self-Service Data Ingest, Data Cleansing, and Data Validation
Kylo utilizes Spark to provide a pre-defined pipeline template, which implements multiple best practices around data ingestion. By default, it comes up with file system and databases. It helps business users in simplifying the configuration of ingest data from new sources such as JMS, Kafka, HDFS, HBase, FTP, SFTP, REST, HTTP, TCP, IMAP, AMQP, POP3, MQTT, WebSocket, Flume, Elasticsearch and Solr, Microsoft Azure Event Hub, Microsoft Exchange using Exchange Web Services (EWS), Couchbase, MongoDB, Amazon S3, SQS, DynamoDB, and Splunk.
Apache NiFi, a scheduler and orchestration engine, provides an integrated framework for designing new types of pipelines with 250+ processors (data connectors and transforms). The pre-defined data ingest template is modified by adding Kafka, S3, HDFS, and FTP as shown in the below screenshot:
Get, Consume, and Fetch processors are used to ingest the data. The Get and Consume versions of Kafka processors in NiFi is as follows:
GetKafka 1.3.0: Fetches messages from the earlier version of Apache Kafka (specifically 0.8.x versions). The complementary NiFi processor used to send messages is PutKafka.
ConsumeKafka_0_10 1.3.0: Consumes messages from the newer version of Apache Kafka specifically built against the Kafka 0.10.x Consumer API.
Based on need, a custom processor or other custom extension for NiFi can be written and packaged as a NAR file and deployed into NiFi.
Customizing Data Ingest Pipeline Template
Upon updating and saving the data ingest template in NiFi, the same template can be customized in Kylo UI. The customization steps involve:
- Customizing feed destination table
- Adding input properties
- Adding additional properties
- Performing access control
- Registering the template
Defining Categories for Feeds
Defining Feeds With Source and Destination
Kylo UI is self-explanatory to create and schedule feeds. To define feeds, perform the following:
- Choose the data ingest template.
- Provide feed name, category, and description.
- Choose input Data Source to ingest data.
- Customize the configuration parameter related to that source; for example, transactionRawTopic in Kafka and batch size 10000.
- Define output feed table using either of the following methods:
- Manually define the table columns and its data type.
- Upload the sample file and update the data type as per the data in the column.
- Preview the data under Feed Details section in the top right corner:
Define partitioning output table by choosing Source Field and Partition Formula, for example, time as source field and year as partition formula to partition the data.
Cleansing and Validating Data
The feed creation wizard UI allows end-users to configure cleansing and standardization functions to manipulate data into conventional or canonical formats (for example, simple data type conversion such as dates or stripping special characters) or data protection (for example, masking credit cards, PII, and so on).
It allows users to define field-level validation to protect data against quality issues and provides schema validation automatically. It provides an extensible Java API to develop custom validation, custom cleansing, and standardization routines as per needs. It provides predefined rules for standardization and validation of different data types.
To clean and validate data, perform the following:
Apply different predefined standardization rules for time, user, address, and amount columns as shown below:
- Apply standardization and validation for different columns as shown in the below screenshot:
- Define the data ingestion merge strategy in the output table.
- Choose Dedupe and merge to ignore duplicated batch data and insert it into the desired output table.
- Use the Target Format section to define data storage and compression options.
- Supported storage formats: ORC, Parquet, Avro, TextFile, and RCFile
- Compression options: Snappy and Zlib
After scheduling the feeds, the actual execution will be performed in NiFi. Feeds status can be edited and monitored. The feed details can be changed at any time and the feeds can be re-scheduled.
An overview of created feed job status can be seen under jobs in the Operation sections. By drilling down the jobs, you can identify the details of each job and perform debugging upon feed job execution failure.
The Job Activity section provides details such as completed, running, and so on of a specific feed recurring activity.
The Operational Job Statistics section provides details such as success rate, flow rate per second, flow duration, and steps duration of specific job statistics.
In this blog, we discussed data ingestion, cleansing, and validation without any coding in the Kylo data lake platform. The ingested data output from Kafka is shown in a Hive table in Ambari looks as follows:
In my next blog, we'll discuss data profiling and search-based data discovery.
Published at DZone with permission of Rathnadevi Manivannan. See the original article here.
Opinions expressed by DZone contributors are their own.