DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones AWS Cloud
by AWS Developer Relations
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Partner Zones
AWS Cloud
by AWS Developer Relations
Building Scalable Real-Time Apps with AstraDB and Vaadin
Register Now

Trending

  • Designing a New Framework for Ephemeral Resources
  • Zero Trust Network for Microservices With Istio
  • What Is End-To-End Testing? E2E Testing Tutorial With Examples and Best Practices
  • Generics in Java and Their Implementation

Trending

  • Designing a New Framework for Ephemeral Resources
  • Zero Trust Network for Microservices With Istio
  • What Is End-To-End Testing? E2E Testing Tutorial With Examples and Best Practices
  • Generics in Java and Their Implementation
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Kafka to HDFS/S3 Batch Ingestion Through Spark

Kafka to HDFS/S3 Batch Ingestion Through Spark

Learn the basics of batch and data integration using Apache Spark and Spark jobs. We just know it'll spark your interest.

Swapnil Chougule user avatar by
Swapnil Chougule
·
Updated Mar. 12, 19 · Tutorial
Like (3)
Save
Tweet
Share
23.48K Views

Join the DZone community and get the full member experience.

Join For Free

There are multiple use cases where we need the consumption of data from Kafka to HDFS/S3 or any other sink in batch mode, mostly for historical data analytics purposes. At first glance, this topic seems pretty straight forward. But it is important in data platforms driven by live data (E-commerce, AdTech, Cab-aggregating platforms, etc.).

Need for Batch Consumption From Kafka

If we look at the architecture of some data platforms of some companies as published by them:

Uber(Cab-aggregating platform): https://eng.uber.com/uber-big-data-platform/

Flipkart(E-Commerce): https://tech.flipkart.com/overview-of-flipkart-data-platform-20c6d3e9a196

Lambda Architecture — Separate realtime & batch processing pipeline

We can understand such data platforms rely on both stream processing systems for real-time analytics and batch processing for historical analysis. They generate data at very high speeds, as thousands of user use their services at the same time. Data ingestion system are built around Kafka. They are followed by lambda architectures with separate pipelines for real-time stream processing and batch processing. Real-time stream processing pipelines are facilitated by Spark Streaming, Flink, Samza, Storm, etc.

Available Options for Batch Consumption

LinkedIn has contributed some products to the open source community for Kafka batch ingestion – Camus (Deprecated) and Gobblin. Confluent's Kafka HDFS connector is also another option based on the Kafka Connect framework.

Is Spark an Option?

Spark as a compute engine is very widely accepted by most industries. Most of the old data platforms based on MapReduce jobs have been migrated to Spark-based jobs, and some are in the phase of migration. In short, batch computation is being done using Spark. As a result, organizations' infrastructure and expertise have been developed around Spark.

So, the now question is: can Spark solve the problem of batch consumption of data inherited from Kafka? The answer is yes.

The advantages of doing this are: having a unified batch computation platform, reusing existing infrastructure, expertise, monitoring, and alerting.

Execution Flow of Spark Job

Assumptions:

Kafka: 0.10.1 onward

Spark. 2.x.x

  1. Get the earliest offset of Kafka topics using the Kafka consumer client (org.apache.kafka.clients.consumer.KafkaConsumer) – beginningOffests API (if available, get lastsaved/committed offsets from the location where Step 8 saves. This is the offset where the previous run left off – Step 8).
  2. Find the latest offset of the Kafka topic to be read. Read the latest offsets using the Kafka consumer client (org.apache.kafka.clients.consumer.KafkaConsumer) – the endOffests API of respective topics.
  3. The Spark job will read data from the Kafka topic starting from offset derived from Step 1 until the offsets are retrieved in Step 2.
  4. Create a Kafka source in Spark for batch consumption. We need to generate values for the startingOffsets and endingOffsets options for the Spark read API, as shown below:
val df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .option("startingOffsets", """{"topic1":{"0":0,"1":0}}""")
  .option("endingOffsets", """{"topic1":{"0":50,"1":50}""")
  .load()

5. Once that's done, we will get a Spark DataFrame, and we can extend this further as a Spark batch job.

6. Further data operations might include: data parsing, integration with external systems (like schema registry or lookup reference data), filtering of data, partitioning of data, etc.

7. Upon successful completion of all operations, use the Spark Write API to write data to HDFS/S3. Spark supports different file formats, including Parquet, Avro, JSON, and CSV, out-of-the-box through the Write APIs.

8. And, finally, save these Kafka topic endOffsets to file system – local or HDFS (or commit them to ZooKeeper). This will be used for the next run of starting the offset for a Kafka topic. Here we are making sure the job's next run will read from the offset where the previous run left off.

Challenges and Solutions:

1. A single instance of a job at a given time

Make sure only a single instance of the job runs for any given time. Multiple jobs running at the same time will result in inconsistent data.

This can be resolved by using any scheduler – Airflow, Oozie, Azkaban, etc. Alternately, you can write your logic for this if you are using your custom scheduler.

2. Time-based consumption:

Some use cases need batch consumption of data based on time. Here we can use the Kafka consumer client's offsetForTimes API to get offsets corresponding to given time.

Public java.util.Map<TopicPartition,OffsetAndTimestamp> offsetsForTimes(java.util.Map<TopicPartition,java.lang.Long> timestampsToSearch)

3. Small Files Problem:

There is a good chance we can hit small file problems due to the high number of Kafka partitions and non-optimal frequency of jobs being scheduling.

One way around this is optimally tuning the frequency in job scheduling or repartitioning the data in our Spark jobs (coalesce). But one thing to note here is repartitioning/coalescing in Spark jobs will result in the shuffle of data and it is a costly operation.

4. Advanced: Handle sudden high loads from Kafka:

We will tune job scheduling frequency and job resource allocations optimally to avoid load from Kafka, but we might face unexpected high loads of data from Kafka due to heavy traffic sometimes. It might result in Spark job failures, as the job doesn’t have enough resources as compared to the volume of data to be read.

Constraints should be applied to the Spark Read API. Limit the maximum number of messages to be read from Kafka through a single run of a job. Tweak endoffsets accordingly and read messages (read messages should equal the max number messages to be read) in the same job. Save these newly calculated endoffsets for the next run of the job. Additional data will be caught up in subsequent runs of the job.

Scheduling

Scheduler tools: Airflow, Oozie, and Azkaban are good options. One can go go for cron-based scheduling or custom schedulers. Make surea single instance of the job runs at a given time.

Monitoring and Alerting

If you need to monitor Kafka Clusters and Spark Jobs for 24x7 production environment, there are a few good tools/frameworks available, like Cruise Control for Kafka and Dr. Elephant and SparkLint for Spark jobs.

Here one important metric to be monitored is Kafka consumer lag. It is different between Kafka topics' latest offsets and the offsets until the Spark job has consumed data in the last run. Increasing the consumer lag indicates the Spark job's data consumption rate is lagging behind data production rate in a Kafka topic. Action needs to be taken here. It will give key insights into tuning job frequency and increasing resources for Spark jobs.

Improvements

The above-mentioned architecture ensures at least once delivery semantics in case of failures. It can be extended further to support exactly once delivery semantics in case of failures.

kafka career Data (computing) job scheduling file IO Stream processing

Opinions expressed by DZone contributors are their own.

Trending

  • Designing a New Framework for Ephemeral Resources
  • Zero Trust Network for Microservices With Istio
  • What Is End-To-End Testing? E2E Testing Tutorial With Examples and Best Practices
  • Generics in Java and Their Implementation

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com

Let's be friends: