Apache Kafka in Docker Container and Implement Its Functionalities with Python
Apache Kafka is an open-source distributed event streaming platform used by thousands of companies. Here's how to use Apache Kafka with python.
Join the DZone community and get the full member experience.
Join For FreeAccording to the website, Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
In this post, I am going to share a basic way to start using Apache Kafka with python.
What Are the Tools Needed?
- docker-compose
Docker compose will be used to run kafka and its dependencies.
- zookeeper
- kafka
- kafka-ui
The compose file:
version"2"
services
zookeeper
container_name zookeeper
image docker.io/bitnami/zookeeper3.7
ports
"2181:2181"
volumes
"zookeeper_data:/bitnami"
environment
ALLOW_ANONYMOUS_LOGIN=yes
kafka
container_name kafka
image docker.io/bitnami/kafka2
ports
"9092:9092"
"9093:9093"
volumes
"kafka_data:/bitnami"
environment
ALLOW_PLAINTEXT_LISTENER=yes
KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT
depends_on
zookeeper
kafka-ui
image provectuslabs/kafka-ui
container_name kafka-ui
ports
"18080:8080"
restart always
environment
KAFKA_CLUSTERS_0_NAME=local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
depends_on
kafka
zookeeper
volumes
zookeeper_data
driver local
kafka_data
driver local
As you can see, I have used zookeeper and kafka images from bitnami. To get a generic view of kafka instance, I have used an open-source project from Github. The project is called kafka-ui which is managed by provectus. It is a web UI that can be accessed via hosts localhost with the port of 18080, as you can see from the compose file.
Let's say you want to produce some message and ship it to kafka.
# kproducer.py
import datetime
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9093')
try:
for _ in range(100):
the_dt = str(datetime.datetime.utcnow())
val = f"Count: {_} at {the_dt}".encode(encoding='utf8')
producer.send(topic="KafkaExplored", value=val)
producer.close()
except Exception as ex:
print(ex)
For consuming:
# kconsumer.py
from kafka import KafkaConsumer
consumer = KafkaConsumer('KafkaExplored', bootstrap_servers='localhost:9093')
for msg in consumer:
topic = msg[0]
value = msg[6]
print(msg)
print(f"{topic}:{value.decode()}")
There are several kafka clients for python, but I have used kafka-python
First, you want to kconsumer.py
and then from another terminal you can run the kproduce.py to produce the message.
In kconsumer.py
terminal, you will get the messages:
ConsumerRecord(topic='KafkaExplored', partition=0, offset=98, timestamp=1625160633441, timestamp_type=0, key=None, value=b'Count: 98 at 2021-07-01 17:30:33.441293', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=39, serialized_header_size=-1)
KafkaExplored:Count: 98 at 2021-07-01 17:30:33.441293
ConsumerRecord(topic='KafkaExplored', partition=0, offset=99, timestamp=1625160633441, timestamp_type=0, key=None, value=b'Count: 99 at 2021-07-01 17:30:33.441448', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=39, serialized_header_size=-1)
KafkaExplored:Count: 99 at 2021-07-01 17:30:33.441448
If you want to explore the API for accessing kafka using this kafka-python, you can look into it here.
Published at DZone with permission of Fahad Ahammed. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments