Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Kafka to HDFS/S3 Batch Ingestion Through Spark

DZone's Guide to

Kafka to HDFS/S3 Batch Ingestion Through Spark

Learn the basics of batch and data integration using Apache Spark and Spark jobs. We just know it'll spark your interest.

· Big Data Zone ·
Free Resource

The Architect’s Guide to Big Data Application Performance. Get the Guide.

There are multiple use cases where we need the consumption of data from Kafka to HDFS/S3 or any other sink in batch mode, mostly for historical data analytics purposes. At first glance, this topic seems pretty straight forward. But it is important in data platforms driven by live data (E-commerce, AdTech, Cab-aggregating platforms, etc.).

Need for Batch Consumption From Kafka

If we look at the architecture of some data platforms of some companies as published by them:

Uber(Cab-aggregating platform): https://eng.uber.com/uber-big-data-platform/

Flipkart(E-Commerce): https://tech.flipkart.com/overview-of-flipkart-data-platform-20c6d3e9a196

Lambda Architecture — Separate realtime & batch processing pipeline

We can understand such data platforms rely on both stream processing systems for real-time analytics and batch processing for historical analysis. They generate data at very high speeds, as thousands of user use their services at the same time. Data ingestion system are built around Kafka. They are followed by lambda architectures with separate pipelines for real-time stream processing and batch processing. Real-time stream processing pipelines are facilitated by Spark Streaming, Flink, Samza, Storm, etc.

Available Options for Batch Consumption

LinkedIn has contributed some products to the open source community for Kafka batch ingestion – Camus (Deprecated) and Gobblin. Confluent's Kafka HDFS connector is also another option based on the Kafka Connect framework.

Is Spark an Option?

Spark as a compute engine is very widely accepted by most industries. Most of the old data platforms based on MapReduce jobs have been migrated to Spark-based jobs, and some are in the phase of migration. In short, batch computation is being done using Spark. As a result, organizations' infrastructure and expertise have been developed around Spark.

So, the now question is: can Spark solve the problem of batch consumption of data inherited from Kafka? The answer is yes.

The advantages of doing this are: having a unified batch computation platform, reusing existing infrastructure, expertise, monitoring, and alerting.

Execution Flow of Spark Job

Assumptions:

Kafka: 0.10.1 onward

Spark. 2.x.x

  1. Get the earliest offset of Kafka topics using the Kafka consumer client (org.apache.kafka.clients.consumer.KafkaConsumer) – beginningOffests API (if available, get lastsaved/committed offsets from the location where Step 8 saves. This is the offset where the previous run left off – Step 8).
  2. Find the latest offset of the Kafka topic to be read. Read the latest offsets using the Kafka consumer client (org.apache.kafka.clients.consumer.KafkaConsumer) – the endOffests API of respective topics.
  3. The Spark job will read data from the Kafka topic starting from offset derived from Step 1 until the offsets are retrieved in Step 2.
  4. Create a Kafka source in Spark for batch consumption. We need to generate values for the startingOffsets and endingOffsets options for the Spark read API, as shown below:
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("startingOffsets", """{"topic1":{"0":0,"1":0}}""")
  .option("endingOffsets", """{"topic1":{"0":50,"1":50}""")
  .load()

5. Once that's done, we will get a Spark DataFrame, and we can extend this further as a Spark batch job.

6. Further data operations might include: data parsing, integration with external systems (like schema registry or lookup reference data), filtering of data, partitioning of data, etc.

7. Upon successful completion of all operations, use the Spark Write API to write data to HDFS/S3. Spark supports different file formats, including Parquet, Avro, JSON, and CSV, out-of-the-box through the Write APIs.

8. And, finally, save these Kafka topic endOffsets to file system – local or HDFS (or commit them to ZooKeeper). This will be used for the next run of starting the offset for a Kafka topic. Here we are making sure the job's next run will read from the offset where the previous run left off.

Challenges and Solutions:

1. A single instance of a job at a given time

Make sure only a single instance of the job runs for any given time. Multiple jobs running at the same time will result in inconsistent data.

This can be resolved by using any scheduler – Airflow, Oozie, Azkaban, etc. Alternately, you can write your logic for this if you are using your custom scheduler.

2. Time-based consumption:

Some use cases need batch consumption of data based on time. Here we can use the Kafka consumer client's offsetForTimes API to get offsets corresponding to given time.

Public java.util.Map<TopicPartition,OffsetAndTimestamp> offsetsForTimes(java.util.Map<TopicPartition,java.lang.Long> timestampsToSearch)

3. Small Files Problem:

There is a good chance we can hit small file problems due to the high number of Kafka partitions and non-optimal frequency of jobs being scheduling.

One way around this is optimally tuning the frequency in job scheduling or repartitioning the data in our Spark jobs (coalesce). But one thing to note here is repartitioning/coalescing in Spark jobs will result in the shuffle of data and it is a costly operation.

4. Advanced: Handle sudden high loads from Kafka:

We will tune job scheduling frequency and job resource allocations optimally to avoid load from Kafka, but we might face unexpected high loads of data from Kafka due to heavy traffic sometimes. It might result in Spark job failures, as the job doesn’t have enough resources as compared to the volume of data to be read.

Constraints should be applied to the Spark Read API. Limit the maximum number of messages to be read from Kafka through a single run of a job. Tweak endoffsets accordingly and read messages (read messages should equal the max number messages to be read) in the same job. Save these newly calculated endoffsets for the next run of the job. Additional data will be caught up in subsequent runs of the job.

Scheduling

Scheduler tools: Airflow, Oozie, and Azkaban are good options. One can go go for cron-based scheduling or custom schedulers. Make surea single instance of the job runs at a given time.

Monitoring and Alerting

If you need to monitor Kafka Clusters and Spark Jobs for 24x7 production environment, there are a few good tools/frameworks available, like Cruise Control for Kafka and Dr. Elephant and SparkLint for Spark jobs.

Here one important metric to be monitored is Kafka consumer lag. It is different between Kafka topics' latest offsets and the offsets until the Spark job has consumed data in the last run. Increasing the consumer lag indicates the Spark job's data consumption rate is lagging behind data production rate in a Kafka topic. Action needs to be taken here. It will give key insights into tuning job frequency and increasing resources for Spark jobs.

Improvements

The above-mentioned architecture ensures at least once delivery semantics in case of failures. It can be extended further to support exactly once delivery semantics in case of failures.

Learn how taking a DataOps approach will help you speed up processes and increase data quality by providing streamlined analytics pipelines via automation and testing. Learn More.

Topics:
kafka ,hdfs ,batch processing ,big data ,data ingestion ,apache spark

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}