DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • Develop With OCI Real-Time Speech Transcription and Oracle Database NL2SQL/Select AI To Speak With Your Data
  • AI Speaks for the World... But Whose Humanity Does It Learn From?
  • Artificial Intelligence, Real Consequences: Balancing Good vs Evil AI [Infographic]
  • Beyond ChatGPT, AI Reasoning 2.0: Engineering AI Models With Human-Like Reasoning

Trending

  • Infrastructure as Code (IaC) Beyond the Basics
  • Mastering Advanced Traffic Management in Multi-Cloud Kubernetes: Scaling With Multiple Istio Ingress Gateways
  • How Large Tech Companies Architect Resilient Systems for Millions of Users
  • Designing for Sustainability: The Rise of Green Software
  1. DZone
  2. Data Engineering
  3. AI/ML
  4. Building a Generative AI Processor in Python

Building a Generative AI Processor in Python

Why not create a Python Processor for Apache NiFi 2.0.0? In this tutorial, discover whether the challenge to do so is easy or difficult.

By 
Tim Spann user avatar
Tim Spann
DZone Core CORE ·
Timothy Spann user avatar
Timothy Spann
·
Jan. 23, 24 · Tutorial
Likes (5)
Comment
Save
Tweet
Share
4.2K Views

Join the DZone community and get the full member experience.

Join For Free

It was a really snowy day when I started this. I saw the IBM WatsonX Python SDK and realized I needed to wire up my Gen AI Model (LLM) to send my context-augmented prompt from Slack. Why not create a Python Processor for Apache NiFi 2.0.0? I guess that won’t be hard. It was easy!

Snowy day graphic

IBM WatsonXAI has a huge list of powerful foundation models that you can choose from, just don't pick those v1 models as they are going to be removed in a few months. 

  • GitHub, IBM/watsonxdata-python-sdk: This is used for wastonx.data Python SDK.

Select a foundation model

After we picked a model I tested it in WatsonX’s Prompt Lab. Then I ported it to a simple Python program. Once that worked I started adding the features like properties and the transform method. That’s it.

Source Code

Here is the link to the source code.

Now we can drop our new LLM calling processor into a flow and use it as any other built-in processor. For example, the Python API requires that Python 3.9+ is available on the machine hosting NiFi.

Package-Level Dependencies

Add to requirements.txt.

Basic Format for the Python Processor

Basic format for the Python processor

You need to import various things from the nifiapi library. You then set up your class, CallWatsonXAI. You need to include class Java definition and ProcessDetails that include NiFi version, dependencies, a description, and some tags.

class ProcessorDetails:
        version = '0.0.1-SNAPSHOT',
        dependencies = ['pandas']


Define All The Properties For the Processor

Define properties for the processor

Set up PropertyDescriptors for each property

You need to set up PropertyDescriptors for each property that include things like a name, description, required, validators, expression_language_scope, and more.

Transform Main Method

Transform Main Method

Here we include the imports needed. You can access properties via context.getProperty. You can then set attributes for outputs as shown via attributes. We then set contents for Flow File output. And finally, relationship, which for all guide is success. You should add something to handle errors. I need to add that.

If you need to, redeploy, debug, or fix something. 

While you may delete the entire work directory while NiFi is stopped, doing so may result in NiFi taking significantly longer to startup the next time, as it must source all extensions' dependencies from PyPI, as well as expand all Java extensions' NAR files.

  • See: NiFi Python Developer's Guide

So to deploy it, we just need to copy the Python file to the nifi-2.0.0/python/extensions directory and possibly restart your NiFi server(s). I would start developing locally on your laptop with either a local GitHub build or Docker.

Now that we have written a processor, let's use it in a real-time streaming data pipeline application.

Example Application

Example application

Building off our previous application that receives Slack messages, we will take those Slack queries send them against PineCone or Chroma vector databases and take that context and send it along with our call to IBM’s WatsonX AI REST API for Generative AI (LLM).

You can find those previous details here:

  • Building a Real-Time Slackbot With Generative AI
  • Codeless Generative AI Pipelines with Chroma Vector DB & Apache NiFi
  • Streaming LLM with Apache NiFi (HuggingFace)
  • Augmenting and Enriching LLM with Real-Time Context

NiFi flow

NiFi Flow

  1. Listen HTTP: On port 9518/slack; NiFi is a universal REST endpoint
  2. QueryRecord: JSON cleanup
  3. SplitJSON: $.*
  4. EvalJSONPath: Output attribute for $.inputs
  5. QueryChroma: Call server on port 9776 using ONNX model, export 25 Rows
  6. QueryRecord: JSON->JSON; Limit 1
  7. SplitRecord: JSON->JSON; Into 1 row
  8. EvalJSONPath: Export the context from $.document
  9. ReplaceText: Make context the new Flow File
  10. UpdateAttribute: Update inputs
  11. CallWatsonX: Our Python processor to call IBM
  12. SplitRecord: 1 Record, JSON -> JSON
  13. EvalJSONPath: Add attributes
  14. AttributesToJSON: Make a new Flow file from attributes
  15. QueryRecord: Validate JSON
  16. UpdateRecord: Add generated text, inputs, ts, UUID
  17. Kafka Path, PublishKafkaRecord_2_6: Send results to Kafka.
  18. Kafka Path, RetryFlowFile: If Apache Kafka send fails, try again.
  19. Slack Path, SplitRecord : Split into 1 record for display.
  20. Slack Path, EvaluateJSONPath: Pull out fields to display.
  21. Slack Path, PutSlack : Send formatted message to #chat group.

This is a full-fledged Retrieval Augmented Generation (RAG) application utilizing ChromaDB. (The NiFi flow can also use Pinecone. I am working on Milvus, SOLR, and OpenSearch next.)

Full-fledged Retrieval Augmented Generation (RAG) application

Enjoy how easy it is to add Python code to your distributed NiFi applications.

AI Apache NiFi Python (language) Slack (software) generative AI

Published at DZone with permission of Tim Spann, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Develop With OCI Real-Time Speech Transcription and Oracle Database NL2SQL/Select AI To Speak With Your Data
  • AI Speaks for the World... But Whose Humanity Does It Learn From?
  • Artificial Intelligence, Real Consequences: Balancing Good vs Evil AI [Infographic]
  • Beyond ChatGPT, AI Reasoning 2.0: Engineering AI Models With Human-Like Reasoning

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!