DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Curious about the future of data-driven systems? Join our Data Engineering roundtable and learn how to build scalable data platforms.

Data Engineering: The industry has come a long way from organizing unstructured data to adopting today's modern data pipelines. See how.

Threat Detection: Learn core practices for managing security risks and vulnerabilities in your organization — don't regret those threats!

Managing API integrations: Assess your use case and needs — plus learn patterns for the design, build, and maintenance of your integrations.

Trending

  • Platform Engineering: A Strategic Response to the Growing Complexity of Modern Software Architectures
  • The Power of Market Disruption: How to Detect Fraud With Graph Data
  • AI-Powered Flashcard Application With Next.js, Clerk, Firebase, Material UI, and LLaMA 3.1
  • Writing Great Code: The Five Principles of Clean Code

Apache Kafka in Docker Container and Implement Its Functionalities with Python

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies. Here's how to use Apache Kafka with python.

By 
Fahad Ahammed user avatar
Fahad Ahammed
·
Mar. 25, 22 · Tutorial
Likes (6)
Comment
Save
Tweet
Share
4.5K Views

Join the DZone community and get the full member experience.

Join For Free

According to the website, Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

In this post, I am going to share a basic way to start using Apache Kafka with python.

What Are the Tools Needed?

  • docker-compose

Docker compose will be used to run kafka and its dependencies.

  1. zookeeper
  2. kafka
  3. kafka-ui

The compose file:

YAML
 
version: "2"

services:

  zookeeper:

    container_name: zookeeper

    image: docker.io/bitnami/zookeeper:3.7

    ports:

      - "2181:2181"

    volumes:

      - "zookeeper_data:/bitnami"

    environment:

      - ALLOW_ANONYMOUS_LOGIN=yes



  kafka:

    container_name: kafka

    image: docker.io/bitnami/kafka:2

    ports:

      - "9092:9092"

      - "9093:9093"

    volumes:

      - "kafka_data:/bitnami"

    environment:

      - ALLOW_PLAINTEXT_LISTENER=yes

      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181

      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT

      - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093

      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093

      - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT

    depends_on:

      - zookeeper



  kafka-ui:

    image: provectuslabs/kafka-ui

    container_name: kafka-ui

    ports:

      - "18080:8080"

    restart: always

    environment:

      - KAFKA_CLUSTERS_0_NAME=local

      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092

      - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181

    depends_on:

      - kafka

      - zookeeper



volumes:

  zookeeper_data:

    driver: local

  kafka_data:

    driver: local

As you can see, I have used zookeeper and kafka images from bitnami. To get a generic view of kafka instance, I have used an open-source project from Github. The project is called kafka-ui which is managed by provectus. It is a web UI that can be accessed via hosts localhost with the port of 18080, as you can see from the compose file.

Let's say you want to produce some message and ship it to kafka.

Python
 
# kproducer.py

import datetime

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9093')

try:

    for _ in range(100):

        the_dt = str(datetime.datetime.utcnow())

        val = f"Count: {_} at {the_dt}".encode(encoding='utf8')

        producer.send(topic="KafkaExplored", value=val)

    producer.close()

except Exception as ex:

    print(ex)


For consuming:

Python
 
# kconsumer.py

from kafka import KafkaConsumer

consumer = KafkaConsumer('KafkaExplored', bootstrap_servers='localhost:9093')

for msg in consumer:

    topic = msg[0]

    value = msg[6]

    print(msg)

    print(f"{topic}:{value.decode()}")


There are several kafka clients for python, but I have used kafka-python

First, you want to kconsumer.py and then from another terminal you can run the kproduce.py to produce the message.

In kconsumer.py terminal, you will get the messages:

Shell
 
ConsumerRecord(topic='KafkaExplored', partition=0, offset=98, timestamp=1625160633441, timestamp_type=0, key=None, value=b'Count: 98 at 2021-07-01 17:30:33.441293', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=39, serialized_header_size=-1)

KafkaExplored:Count: 98 at 2021-07-01 17:30:33.441293

ConsumerRecord(topic='KafkaExplored', partition=0, offset=99, timestamp=1625160633441, timestamp_type=0, key=None, value=b'Count: 99 at 2021-07-01 17:30:33.441448', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=39, serialized_header_size=-1)

KafkaExplored:Count: 99 at 2021-07-01 17:30:33.441448


If you want to explore the API for accessing kafka using this kafka-python, you can look into it here.

Published at DZone with permission of Fahad Ahammed. See the original article here.

Opinions expressed by DZone contributors are their own.

Partner Resources


Comments

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends: