DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workkloads.

Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • FastAPI Got Me an OpenAPI Spec Really... Fast
  • Building a Dynamic Chat Application: Setting up ChatGPT in FastAPI and Displaying Conversations in ReactJS
  • Own ChatGPT Bot in Telegram
  • Techniques You Should Know as a Kafka Streams Developer

Trending

  • How to Configure and Customize the Go SDK for Azure Cosmos DB
  • Why Documentation Matters More Than You Think
  • Subtitles: The Good, the Bad, and the Resource-Heavy
  • Scaling Mobile App Performance: How We Cut Screen Load Time From 8s to 2s
  1. DZone
  2. Software Design and Architecture
  3. Integration
  4. Kafka Event Streaming AI and Automation

Kafka Event Streaming AI and Automation

Explore how to use ChatGPT to create an IoT Kafka event consumer and API Logic Server to logic to produce temperature reading events outside a defined range.

By 
Tyler Band user avatar
Tyler Band
·
Oct. 17, 23 · Tutorial
Likes (8)
Comment
Save
Tweet
Share
21.1K Views

Join the DZone community and get the full member experience.

Join For Free

Apache Kafka has emerged as a clear leader in corporate architecture for moving from data at rest (DB transactions) to event streaming. There are many presentations that explain how Kafka works and how to scale this technology stack (either on-premise or cloud). Building a microservice using ChatGPT to consume messages and enrich, transform, and persist is the next phase of this project. In this example, we will be consuming input from an IoT device (RaspberryPi) which sends a JSON temperature reading every few seconds.

Flow from Raspberry Pi Temp sensor to Temperature Readings

Consume a Message

As each Kafka event message is produced (and logged), a Kafka microservice consumer is ready to handle each message. I asked ChatGPT to generate some Python code, and it gave me the basics to poll and read from the named "topic." What I got was a pretty good start to consume a topic, key, and JSON payload. The ChatGPT created code to persist this to a database using SQLAlchemy. I then wanted to transform the JSON payload and use API Logic Server (ALS - an open source project on GitHub) rules to unwarp the JSON, validate, calculate, and produce a new set of message payloads based on the source temperature outside a given range.

Shell
 
ChatGPT: “design a Python Event Streaming Kafka Consumer interface”


Note: ChatGPT selected Confluent Kafka libraries (and using their Docker Kafka container)- you can modify your code to use other Python Kafka libraries. 

ChatGPT prompt: Designing a Python Event Streaming Kafka interface


Install SQLAlchemy


Create an SQLAlchemy Model


Initializing the database


Persist Kafka messages

SQLAlchemy Model

Using API Logic Server (ALS: a Python open-source platform), we connect to a MySQL database. ALS will read the tables and create an SQLAlchemy ORM model, a react-admin user interface, safrs-JSON Open API (Swagger), and a running REST web service for each ORM endpoint. The new Temperature table will hold the timestamp, the IoT device ID, and the temperature reading. Here we use the ALS command line utility to create the ORM model:

Shell
 
ApiLogicServer create --project_name=iot --db_url=mysql+pymysql://root:password@127.0.0.1:3308/iot


The API Logic Server generated class used to hold our Temperature values.

Python
 
class Temperature(SAFRSBase, Base):
    __tablename__ = 'Temperature'
    _s_collection_name = 'Temperature' # type: ignore
    __bind_key__ = 'None'

    Id = Column(Integer, primary_key=True)
    DeviceId = Column(Integer, nullable=False)
    TempReading = Column(Integer, nullable=False)
    CreateDT = Column(TIMESTAMP, server_default=text("CURRENT_TIMESTAMP"), nullable=False)
    KafkaMessageSent = Column(Booelan, default=text("False"))


Changes

So instead of saving the Kafka JSON consumer message again in a SQL database (and firing rules to do the work), we unwrap the JSON payload (util.row_to_entity) and insert it into the Temperature table instead of saving the JSON payload.  We let the declarative rules handle each temperature reading.

Python
 
    entity = models.Temperature()
    util.row_to_entity(message_data, entity) 
    session.add(entity)


When the consumer receives the message, it will add it to the session which will trigger the commit_event rule (below).

Declarative Logic: Produce a Message

Using API Logic Server (an automation framework built using SQLAlchemy, Flask, and LogicBank spreadsheet-like rules engine: formula, sum, count, copy, constraint, event, etc), we add a declarative commit_event rule on the ORM entity Temperature. As each message is persisted to the Temperature table, the commit_event rule is called.  If the temperature reading exceeds the MAX_TEMP or less than MIN_TEMP, we will send a Kafka message on the topic “TempRangeAlert”. We also add a constraint to make sure we receive data within a normal range (32 -132). We will let another event consumer handle the alert message.

Python
 
from confluent_kafka import Producer
conf = {'bootstrap.servers': 'localhostd:9092'}
producer = Producer(conf)
MAX_TEMP = arg.MAX_TEMP or 102
MIN_TEMP = arg.MIN_TTEMP or 78
    
def produce_message(
    row: models.KafkaMessage, 
    old_row: models.KafkaMessage, 
    logic_row: LogicRow):
  
    if logic_row.isInserted() and row.TempReading > MAX_TEMP:
        produce(topic="TempRangeAlert", 
               key=row.Id,
               value=f"The temperature {row.TempReading}F exceeds {MAX_TEMP}F on Device {row.DeviceId}")
    row.KafkaMessageSent = True

  if logic_row.isInserted() and row.TempReading < MIN_TEMP:
        produce(topic="TempRangeAlert", 
               key=row.Id,
               value=f"The temperature {row.TempReading}F less than {MIN_TEMP}F on Device {row.DeviceId}")

    row.KafkaMessageSent = True
    
   Rules.constraint(models.Temperature, 
                    as_expression= lambda row: row.TempReading < 32 or row.TempReading > 132, 
                    error_message= "Temperature {row.TempReading} is out of range"
   Rules.commit_event(models.Temperature, calling=produce_message)


Only produce an alert message if the temperature reading is greater than MAX_TEMP  or less than MIN_TEMP.  Constraint will check the temperature range before calling the commit event (note that rules are always unordered and can be introduced as specifications change).

TDD Behave Testing

Using TDD (Test Driven Development), we can write a Behave test to insert records directly into the Temperature table and then check the return value KafkaMessageSent. Behave begins with a Feature/Scenario (.feature file). For each scenario, we write a corresponding Python class using Behave decorators.

Feature Definition

Plain Text
 
Feature: TDD Temperature Example

Scenario: Temperature Processing
  Given A Kafka Message Normal (Temperature)
  When Transactions normal temperature is submitted
  Then Check KafkaMessageSent Flag is False

Scenario: Temperature Processing
  Given A Kafka Message Abnormal (Temperature)
  When Transactions abnormal temperature is submitted
  Then Check KafkaMessageSent Flag is True


TDD Python Class

Python
 
from behave import *
import safrs

db = safrs.DB 
session = db.session

def insertTemperature(temp:int) -> bool:
    entity = model.Temperature()
    entity.TempReading = temp
    entity.DeviceId = 'local_behave_test'
    session.add(entity) 
    return entity.KafkaMessageSent 

@given('A Kafka Message Normal (Temperature)')
def step_impl(context):
    context.temp = 76
    assert True

@when('Transactions normal temperature is submitted')
def step_impl(context):
    context.response_text = insertTemperature(context.temp)

@then('Check KafkaMessageSent Flag is False')
def step_impl(context):
    assert context.response_text  == False


Summary

Using ChatGPT to generate the Kafka message code for both the Consumer and Producer seems like a good starting point. Install Confluent Docker for Kafka. Using API Logic Server for the declarative logic rules allows us to add formulas, constraints, and events to the normal flow of transactions into our SQL database and produce (and transform) new Kafka messages is a great combination. ChatGPT and declarative logic is the next level of "paired programming."

API kafka Python (language) ChatGPT raspberry pi microservice

Opinions expressed by DZone contributors are their own.

Related

  • FastAPI Got Me an OpenAPI Spec Really... Fast
  • Building a Dynamic Chat Application: Setting up ChatGPT in FastAPI and Displaying Conversations in ReactJS
  • Own ChatGPT Bot in Telegram
  • Techniques You Should Know as a Kafka Streams Developer

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!