Over a million developers have joined DZone.
{{announcement.body}}
{{announcement.title}}

Kafka Python Tutorial for Fast Data Architecture

DZone's Guide to

Kafka Python Tutorial for Fast Data Architecture

In this Kafka Python tutorial, we will create a Python application that will publish data to a Kafka topic and another app that will consume the messages.

· Big Data Zone ·
Free Resource

Hortonworks Sandbox for HDP and HDF is your chance to get started on learning, developing, testing and trying out new features. Each download comes preconfigured with interactive tutorials, sample data and developments from the Apache community.

Fast Data Series Articles

  1. Installing Apache Mesos 1.6.0 on Ubuntu 18.04
  2. Kafka Tutorial for Fast Data Architecture
  3. Kafka Python Tutorial for Fast Data Architecture

This is the third article in my Fast Data Architecture series that walks you through implementing Bid Data using a SMACK Stack. This article builds on the others so if you have not read through those, I highly suggest you do so that you have the infrastructure you need to follow along in this tutorial.

This article will walk you though pulling website metrics from Clicky.com. I have another article where we will pull metrics from Google Analytics and publish the metrics to Apache Kafka: Kafka Python and Google Analytics.

In order to demonstrate how to analyze your big data, we will be configuring a big data pipeline that will pull site metrics from Clicky.com and push those metrics to a Kafka topic on our Kafka Cluster.

This is just one pipeline that you might want to implement in your Big Data Implementation. Website statistics can be a valuable part of your data as this can give you data about web site visitors, pages visited, etc. Combine this data with other data like social media shares when you perform your data analytics and you would be able to make some pretty neat business decisions about when is the best time for you to post site updates to social media in order to attract the most visitors. That is the main benefit of implementing big data: not necessarily the raw data itself but the knowledge you can extract from that raw data and make more informed decisions.

Image title

In this example, we will pull the 'pages' statistics from the Clicky.com API and push them to the admintome-pages Kafka topic. This will give us JSON data from AdminTome's top pages.

Clicky Web Analytics

In order to fully follow along in this article, you will need to have a website linked to Clicky.com. It's free so why not. Register your site at clicky.com. I personally use it because it has better metrics reporting for blogs (like abandon rate) than Google Analytics gives. You will need to add some code to your page so that clicky can start collecting metrics.

After your page is sending metrics to clicky you will need to get some values in order to use the Clicky API and pull metrics from our Python application. Go to preferences for your site and you will see two numbers that we will need:

  • Site ID

  • Site key

Don't publish these anywhere because they could give anyone access to your website data. We will need these numbers later when we connect to the API and pull our site statistics.

Preparing Kafka

First, we need to prepare our Kafka Cluster by adding a topic to our Kafka cluster that we will use to send messages to. As you can see from the diagram above, our topic in Kafka is going to be admintome-pages.

Login to the Mesos Master you ran Kafka-mesos from. If you followed the previous article, the master I used was mesos1.admintome.lab. Next, we will create the topic using the kafka-mesos.sh script:

$ cd kafka/
$ ./kafka-mesos.sh topic add admintome-pages --broker=0 --api=http://mslave2.admintome.lab:7000

Notice that the API parameter points to the Kafka scheduler we created using kafka-mesos in the last article. You can verify that you now have the correct topics:

$ ./kafka-mesos.sh topic list --api=http://mslave2.admintome.lab:7000
topics:
name: __consumer_offsets
partitions: 0:[0], 1:[0], 2:[0], 3:[0], 4:[0], 5:[0], 6:[0], 7:[0], 8:[0], 9:[0], 10:[0], 11:[0], 12:[0], 13:[0], 14:[0], 15:[0], 16:[0], 17:[0], 18:[0], 19:[0], 20:[0], 21:[0], 22:[0], 23:[0], 24:[0], 25:[0], 26:[0], 27:[0], 28:[0], 29:[0], 30:[0], 31:[0], 32:[0], 33:[0], 34:[0], 35:[0], 36:[0], 37:[0], 38:[0], 39:[0], 40:[0], 41:[0], 42:[0], 43:[0], 44:[0], 45:[0], 46:[0], 47:[0], 48:[0], 49:[0]
options: segment.bytes=104857600,cleanup.policy=compact,compression.type=producer

name: admintome
partitions: 0:[0]

name: admintome-pages
partitions: 0:[0]

And there is our new topic ready to go! Now it's time to get to the fun stuff and start developing our Python application.

Now that we have Kafka ready to go we will start to develop our Kafka producer. The producer will get page metrics from the Clicky API and push those metrics in JSON form to our topic that we created earlier.

I assume that you have Python 3 installed on your system and virtualenv installed as well.

To get started we will need to setup our environment.

$ mkdir ~/Development/python/venvs
$ mkdir ~/Development/python/site-stats-intake
$ cd ~/Development/python/site-stats-intake
$ virtualenv ../venvs/intake
$ source ../venvs/intake/bin/activate
(intake) $ pip install kafka-python requests
(intake) $ pip freeze > requirements.txt

Next, we need to create our classes.

Clicky Class

We will create a new Python class called Clicky that we will use to interact with the Clicky API. Create a new file called clicky.py and add the following content:

import requests
import json


class Clicky(object):

    def __init__(self, site_id, sitekey):
        self.site_id = site_id
        self.sitekey = sitekey
        self.output = "json"

    def get_data(self, data_type):
        click_api_url = "https://api.clicky.com/api/stats/4"
        payload = {"site_id": self.site_id,
                   "sitekey": self.sitekey,
                   "type": data_type,
                   "output": self.output}
        response = requests.get(click_api_url, params=payload)
        raw_stats = response.text
        return raw_stats

    def get_pages_data(self):
        data = self.get_data("pages")
        return json.loads(data)

Save the file and exit.

In order to get our metrics, we need to send an HTTP GET request to the Clicky API URL which is

https://api.clicky.com/api/stats/4

We also need to include several parameters:

  • site_id: This is the Site ID number that we got earlier.
  • sitekey: This is the Site key number that also got earlier.
  • type: To get our top pages we set the type to 'pages.'
  • output: We set this to "json" so that the API will return JSON data.

Finally, we call the request Python module to perform an HTTP GET to our API URL with the parameters we specified. In the get_pages_data method, we return a dict that represents our JSON data. Next, we will code our Kafka class implementation.

MyKafka Class

This class will interact with our Kafka cluster and push website metrics to our topic for us. Create a new file called mykafka.py and add the following content:

from kafka import KafkaProducer
import json


class MyKafka(object):

    def __init__(self, kafka_brokers):
        self.producer = KafkaProducer(
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            bootstrap_servers=kafka_brokers
        )

    def send_page_data(self, json_data):
        self.producer.send('admintome-pages', json_data)

First, we import the kafka-python library, specifically the KafkaProducer class, that will let us code a Kafka producer and publish messages to our Kafka Topic.

from kafka import KafkaProducer

We now define our MyKafka class and create the constructor for it:

class MyKafka(object):
    def __init__(self, kafka_brokers):

This takes an argument that represents the Kafka brokers that will be used to connect to our Kafka cluster. This an array of strings in the form of:

[ "broker:ip", "broker:ip" ]

We will use only one broker where is the one we created in the last article: mslave1.admintome.lab:31000:

[ "mslave1.admintome.lab:31000" ]

We next instantiate a new KafkaProducer object named producer. Since we will be sending data to Kafka in the form of JSON we tell the KafkaProducer to use the JSON decoder dumps to parse the data using the value_serializer parameter. We also tell it to use our brokers with the bootstrap_servers parameter.

self.producer = KafkaProducer(
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            bootstrap_servers=kafka_brokers
        )

Finally, we create a new method that we will use to send the messages to our admintome-pages topic:

def send_page_data(self, json_data):
    self.producer.send('admintome-pages', json_data)

That's all there is to it. Now we will write our Main class that will control everything.

Main Class

Create a new file called main.py and add the following content:

from clicky import Clicky
from mykafka import MyKafka
import logging
import time
import os
from logging.config import dictConfig


class Main(object):

    def __init__(self):
        if 'KAFKA_BROKERS' in os.environ:
            kafka_brokers = os.environ['KAFKA_BROKERS'].split(',')
        else:
            raise ValueError('KAFKA_BROKERS environment variable not set')

        if 'SITE_ID' in os.environ:
            self.site_id = os.environ['SITE_ID']
        else:
            raise ValueError('SITE_ID environment variable not set')

        if 'SITEKEY' in os.environ:
            self.sitekey = os.environ['SITEKEY']
        else:
            raise ValueError('SITEKEY environment variable not set')

        logging_config = dict(
            version=1,
            formatters={
                'f': {'format':
                      '%(asctime)s %(name)-12s %(levelname)-8s %(message)s'}
            },
            handlers={
                'h': {'class': 'logging.StreamHandler',
                      'formatter': 'f',
                      'level': logging.DEBUG}
            },
            root={
                'handlers': ['h'],
                'level': logging.DEBUG,
            },
        )
        self.logger = logging.getLogger()

        dictConfig(logging_config)
        self.logger.info("Initializing Kafka Producer")
        self.logger.info("KAFKA_BROKERS={0}".format(kafka_brokers))
        self.mykafka = MyKafka(kafka_brokers)

    def init_clicky(self):
        self.clicky = Clicky(self.site_id, self.sitekey)
        self.logger.info("Clicky Stats Polling Initialized")

    def run(self):
        self.init_clicky()
        starttime = time.time()
        while True:
            data = self.clicky.get_pages_data()
            self.logger.info("Successfully polled Clicky pages data")
            self.mykafka.send_page_data(data)
            self.logger.info("Published page data to Kafka")
            time.sleep(300.0 - ((time.time() - starttime) % 300.0))


if __name__ == "__main__":
    logging.info("Initializing Clicky Stats Polling")
    main = Main()
    main.run()

The end state of this example is to build a Docker container that we will then run on Marathon. With that in mind, we don't want to hardcode some of our sensitive information (like our clicky site id and site key) in our code. We want to be able to pull those from environment variables. If they are not set then we through an exception and exit out.

        if 'KAFKA_BROKERS' in os.environ:
            kafka_brokers = os.environ['KAFKA_BROKERS'].split(',')
        else:
            raise ValueError('KAFKA_BROKERS environment variable not set')

        if 'SITE_ID' in os.environ:
            self.site_id = os.environ['SITE_ID']
        else:
            raise ValueError('SITE_ID environment variable not set')

        if 'SITEKEY' in os.environ:
            self.sitekey = os.environ['SITEKEY']
        else:
            raise ValueError('SITEKEY environment variable not set')

We also configure logging so that we can see what is going on with our application. I have coded an infinite loop in our code that will poll clicky and push the metrics to our Kafka topic every five minutes.

    def run(self):
        self.init_clicky()
        starttime = time.time()
        while True:
            data = self.clicky.get_pages_data()
            self.logger.info("Successfully polled Clicky pages data")
            self.mykafka.send_page_data(data)
            self.logger.info("Published page data to Kafka")
            time.sleep(300.0 - ((time.time() - starttime) % 300.0))

Save the file and exit.

Running Our Application

To test that everything works you can try running the application after you set your environment variables:

(intake) $ export KAFKA_BROKERS="mslave1.admintome.lab:31000"
(intake) $ export SITE_ID="{your site id}"
(intake) $ export SITEKEY="{your sitekey}"
(intake) $ python main.py
2018-06-25 15:34:32,259 root INFO Initializing Kafka Producer
2018-06-25 15:34:32,259 root INFO KAFKA_BROKERS=['mslave1.admintome.lab:31000']
2018-06-25 15:34:32,374 root INFO Clicky Stats Polling Initialized
2018-06-25 15:34:32,754 root INFO Successfully polled Clicky pages data
2018-06-25 15:34:32,755 root INFO Published page data to Kafka

We are now sending messages to our Kafka Topic! We will build our Docker container next and deploy it to Marathon. Finally, we will wrap up by writing a test consumer that will get our messages from our topic.

I have created a GitHub repository for all the code used in this article: https://github.com/admintome/clicky-state-intake

Now that we have our application code written, we can create a Docker container so that we can deploy it to Marathon. Create a Dockerfile file in your application directory with the following contents:

FROM python:3

WORKDIR /usr/src/app

COPY requirements.txt ./
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD [ "python", "./main.py" ]

Build the container

$ docker build -t  {your docker hub username}site-stats-intake .

After the Docker build is completed, you will want to push it to your Docker repository that your Mesos Slaves have access to. For me, this is Docker Hub:

$ docker push -t admintome/site-stats-intake

Then log in to each of your Mesos slaves and pull the image down:

$ docker pull admintome/site-stats-intake

We are now ready to create a Marathon application deployment for our application.

Go to your Marathon GUI.

http://mesos1.admintome.lab:8080

Click on the Create Application Button. Then click the JSON mode button:

Image title

Paste in the following JSON code:

{
  "id": "site-stats-intake",
  "cmd": null,
  "cpus": 1,
  "mem": 128,
  "disk": 0,
  "instances": 1,
  "container": {
    "docker": {
      "image": "admintome/site-stats-intake"
    },
    "type": "DOCKER"
  },
  "networks": [
    {
      "mode": "host"
    }
  ],
  "env": {
    "KAFKA_BROKERS": "192.168.1.x:port",
    "SITE_ID": "{your site_id}",
    "SITEKEY": "{your sitekey}"
  }
}

Be sure to substitute the correct values for KAFKA_BROKERS, SITE_ID, and SITEKEY in the env section for your environment.

Finally, click on the Create Application button to deploy the application. After a few seconds, you should see the application is Running.

Image title

To see the logs click on the site-stats-intake application then click on the stderr link to download a text file containing the logs.

Now that we have our application deployed to Marathon we will write a short consumer that we will run on our development system to show us what messages have been received.

This will be a simple Kafka consumer that will check out the topic and display all messages on the topic. Not really useful at this point but it lets us know that our little polling application is working correctly.

Create a new file called consumer.py and add the following contents:

import sys
from kafka import KafkaConsumer

consumer = KafkaConsumer('admintome-pages', bootstrap_servers="mslave1.admintome.lab:31000",
                         auto_offset_reset='earliest')

try:
    for message in consumer:
        print(message.value)
except KeyboardInterrupt:
    sys.exit()

Save and exit the file. This has the Kafka broker hardcoded because we simply are using it to test everything. Make sure to update the bootstrap-servers parameter with your broker name and port.

Now run the command and you should see a ton of JSON that represents your most visited pages:

(intake) $ python consumer.py
b'[{"type": "pages", "dates": [{"date": "2018-06-25", "items": [{"value": "145", "value_percent": "43.2", "title": "Kafka Tutorial for Fast Data Architecture - AdminTome Blog", "stats_url": "http://clicky.com/stats/visitors?site_id=101045340&date=2018-06-25&href=%2Fblog%2Fkafka-tutorial-for-fast-data-architecture%2F", "url": "http://www.admintome.com/blog/kafka-tutorial-for-fast-data-architecture/"},...

We now have a data pipeline that has some data that we can use. The next step will be to use that data and analyze it. In the article, we will install and configure the next part of our SMACK stack which is Apache Spark. We will also configure it analyze our data and give us something meaningful.

Hortonworks Community Connection (HCC) is an online collaboration destination for developers, DevOps, customers and partners to get answers to questions, collaborate on technical articles and share code examples from GitHub.  Join the discussion.

Topics:
big data ,python ,kafka ,data ingestion

Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}