DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • A Complete Guide to Modern AI Developer Tools
  • Getting Started With GenAI on BigQuery: A Step-by-Step Guide
  • Create Your Own AI-Powered Virtual Tutor: An Easy Tutorial
  • Create Your Own AI-Powered Virtual Tutor: An Easy Tutorial

Trending

  • Recurrent Workflows With Cloud Native Dapr Jobs
  • Hybrid Cloud vs Multi-Cloud: Choosing the Right Strategy for AI Scalability and Security
  • My LLM Journey as a Software Engineer Exploring a New Domain
  • While Performing Dependency Selection, I Avoid the Loss Of Sleep From Node.js Libraries' Dangers
  1. DZone
  2. Data Engineering
  3. AI/ML
  4. Building a Real-Time Slackbot With Generative AI

Building a Real-Time Slackbot With Generative AI

Learn how to build a cool Slackbot with Apache NiFi, LLM, Foundation Models, and streaming. We will cover model choices and integration.

By 
Tim Spann user avatar
Tim Spann
DZone Core CORE ·
Nov. 29, 23 · Tutorial
Likes (5)
Comment
Save
Tweet
Share
3.1K Views

Join the DZone community and get the full member experience.

Join For Free

In this article, I will show you how to use Cloudera DataFlow powered by Apache NiFi to interact with IBM WatsonX.AI foundation large language models in real time. We can work with any of the foundation models such as Google FLAN T5 XXL or IBM Granite models.

How do IBM and Cloudera collaborate on Cloudera DataFlow?

I’ll show you how easy it is to build a real-time data pipeline feeding your Slack-like and mobile applications questions directly to secure WatsonX.AI models running in IBM Cloud. We will handle all the security, management, lineage, and governance with Cloudera Data Flow. As part of decision-making, we can choose different WatsonX.AI models on the fly based on what type of prompt it is. For example, if we want to continue a sentence versus answering a question I can pick different models. For questions answering Google FLAN T5 XXL works well. If I want to continue sentences I would use one of the IBM Granite models.

You will notice how amazingly fast the WatsonX.AI models return the results we need. I do some quick enrichment and transformation and then send them out their way to Cloudera Apache Kafka to be used for continuous analytics and distribution to many other applications, systems, platforms, and downstream consumers. We will also output our answers to the original requester which could be someone in a Slack channel or someone in an application. All of this happens in real-time, with no code, full governance, lineage, data management, and security at any scale and on any platform.

The power of IBM and Cloudera together in private, public, and hybrid cloud environments for real-time data and AI is just getting started. Try it today.

Flak Federation

Step By Step Real-Time Flow

Tim Spann and timchat screenshot

First, in Slack, I type a question:

“Q: What is a good way to integrate Generative AI and Apache NiFi?”

NiFi Flow Top

NiFi Flow Top

Once that question is typed, the Slack server sends these events to our registered service. This can be hosted anywhere publicly facing. 

  • (Click here for Slack API link)

Slack API

Slack API

Once enabled, your server will start receiving JSON events for each Slack post. This is easy to receive and parse in NiFi. Cloudera DataFlow enables receiving secure HTTPS REST calls in the public cloud-hosted edition with ease, even in Designer mode.

NiFi Top Flow 2

NiFi Top Flow 2

In the first part of the flow, we received the REST JSON Post, which is as follows.

Slackbot 1.0 (+https://api.slack.com/robots)
application/json
POST
HTTP/1.1

{
  "token" : "qHvJe59yetAp1bao6wmQzH0C",
  "team_id" : "T1SD6MZMF",
  "context_team_id" : "T1SD6MZMF",
  "context_enterprise_id" : null,
  "api_app_id" : "A04U64MN9HS",
  "event" : {
    "type" : "message",
    "subtype" : "bot_message",
    "text" : "==== NiFi to IBM <http://WatsonX.AI|WatsonX.AI> LLM Answers\n\nOn Date: Wed, 15 Nov 20


This is a very rich detailed JSON file that we could push immediately raw to an Apache Iceberg Open Cloud Lakehouse, a Kafka topic, or an object store as a JSON document (Enhancement Option). I am just going to parse what I need.

EvaluateJSONPath

EvaluateJSONPath

We parse out the channel ID and plain text of the post. I only want messages from general (“C1SD6N197”). Then I copy the texts to an inputs field as is required for Hugging Face.

We check our input: if it’s stocks or weather (more to come) we avoid calling the LLM.

Check our input

SELECT * FROM FLOWFILE
WHERE upper(inputs) like '%WEATHER%'
AND not upper(inputs) like '%LLM SKIPPED%'

SELECT * FROM FLOWFILE
WHERE upper(inputs) like '%STOCK%'
AND not upper(inputs) like '%LLM SKIPPED%'

SELECT * FROM FLOWFILE
WHERE (upper(inputs) like 'QUESTION:%'
OR upper(inputs) like 'Q:%') and not upper(inputs) like '%WEATHER%'
and not upper(inputs) like '%STOCK%'


For Stocks processing:

Stocks processing - image 1

Stocks processing - image 2

Stocks processing - image 3

Stocks processing - image 4

Stocks processing - image 5

To parse what stock we need I am using my Open NLP processor to get it.

Parse what stock we need using Open NLP processor to get it

So you will need to download the processor and the Entity extraction models.

  • GitHub - tspannhw/nifi-nlp-processor: Apache NiFi NLP Processor
  • Open NLP Example Apache NiFi Processor

Then we pass that company name to an HTTP REST endpoint from AlphaVantage that converts the Company Name to Stock symbols. In free accounts, you only get a few calls a day, so if we fail we then bypass this step and try to just use whatever you passed in.

Using RouteOnContent we filter an Error message out.

Then we use a QueryRecord processor to convert from CSV to JSON and filter.

Use a QueryRecord processor to convert from CSV to JSON and filter

SELECT name as companyName, symbol  FROM FLOWFILE
ORDER BY matchScore DESC
LIMIT 1


We do a SplitRecord to ensure we are only one record. We then run EvaluateJsonPath to get our fields as attributes.

Run EvaluateJsonPath to get our fields as attributes

In an UpdateAttribute we trim the symbol just in case.

${stockSymbol:trim()}


We then pass that stock symbol to Twelve Data via InvokeHTTP to get our stock data.

We then get a lot of stock data back.

{
  "meta" : {
    "symbol" : "IBM",
    "interval" : "1min",
    "currency" : "USD",
    "exchange_timezone" : "America/New_York",
    "exchange" : "NYSE",
    "mic_code" : "XNYS",
    "type" : "Common Stock"
  },
  "values" : [ {
    "datetime" : "2023-11-15 10:37:00",
    "open" : "152.07001",
    "high" : "152.08000",
    "low" : "151.99500",
    "close" : "152.00999",
    "volume" : "8525"
  }, {
    "datetime" : "2023-11-15 10:36:00",
    "open" : "152.08501",
    "high" : "152.12250",
    "low" : "152.08000",
    "close" : "152.08501",
    "volume" : "15204"
  } ...


We then run EvaluateJSONPath to grab the exchange information.

Run EvaluateJSONPath to grab the exchange information

We fork the record to just get one record as this is just to return to Slack. We use UpdateRecord calls to enrich the stock data with other values. We then run a QueryRecord to limit us to 1 record to send to Slack.

SELECT * FROM FLOWFILE
ORDER BY 'datetime' DESC
LIMIT 1


We run an EvaluateJsonPath to get the most value fields to display.

Run an EvaluateJsonPath to get the most value fields to display

We then run a PutSlack with our message.

LLM Skipped. Stock Value for ${companyName} [${nlp_org_1}/${stockSymbol}] on ${date} is ${closeStockValue}.  stock date ${stockdateTime}.  stock exchange ${exchange}

What is the current stock value for IBM?

We also have a separate flow that is split from Company Name.

In the first step, we call Yahoo Finance to get RSS headlines for that stock.

https://feeds.finance.yahoo.com/rss/2.0/headline?s=${stockSymbol:trim()}&region=US&lang=en-US

Call Yahoo Finance to get RSS headlines for that stock

We use QueryRecord to convert RSS/XML Records to JSON.

We then run a SplitJSON to break out the news items.

We run a SplitRecord to limit to 1 record. We use EvaluateJSONPath to get the fields we need for our Slack message.

Use EvaluateJSONPath to get the fields we need for our Slack message

We then run UpdateRecord to finalize our JSON.

Run UpdateRecord to finalize our JSON.

We then send this message to Slack.

Property: Webhook text

LLM Skipped. Stock News Information for ${companyName} [${nlp_org_1}/${stockSymbol}] on ${date}
${title} : ${description}.
${guid} article date ${pubdate}

Message to Slack

For those who selected weather, we follow a similar route (we should add caching with Redis @ Aiven) to stocks. We use my OpenNLP processor to extract locations you might want to have weather on.

Use OpenNLP processor to extract locations you might want to have weather on

The next step is taking the output of the processor and building a value to send to our Geoencoder.

weatherlocation = ${nlp_location_1:notNull():ifElse(${nlp_location_1}, "New York City")}


If we can’t find a valid location, I am going to say “New York City." We could use some other lookup. I am doing some work on loading all locations and could do some advanced PostgreSQL searches on that - or perhaps OpenSearch or a vectorized datastore.

I pass that location to Open Meteo to find the geo via InvokeHTTP.

https://geocoding-api.open-meteo.com/v1/search?name=${weatherlocation:trim():urlEncode()}&count=1&language=en&format=json


We then parse the values we need from the results.

{
  "results" : [ {
    "id" : 5128581,
    "name" : "New York",
    "latitude" : 40.71427,
    "longitude" : -74.00597,
    "elevation" : 10.0,
    "feature_code" : "PPL",
    "country_code" : "US",
    "admin1_id" : 5128638,
    "timezone" : "America/New_York",
    "population" : 8175133,
    "postcodes" : [ "10001", "10002", "10003", "10004", "10005", "10006", "10007", "10008", "10009", "10010", "10011", "10012", "10013", "10014", "10016", "10017", "10018", "10019", "10020", "10021", "10022", "10023", "10024", "10025", "10026", "10027", "10028", "10029", "10030", "10031", "10032", "10033", "10034", "10035", "10036", "10037", "10038", "10039", "10040", "10041", "10043", "10044", "10045", "10055", "10060", "10065", "10069", "10080", "10081", "10087", "10090", "10101", "10102", "10103", "10104", "10105", "10106", "10107", "10108", "10109", "10110", "10111", "10112", "10113", "10114", "10115", "10116", "10117", "10118", "10119", "10120", "10121", "10122", "10123", "10124", "10125", "10126", "10128", "10129", "10130", "10131", "10132", "10133", "10138", "10150", "10151", "10152", "10153", "10154", "10155", "10156", "10157", "10158", "10159", "10160", "10161", "10162", "10163", "10164", "10165", "10166", "10167", "10168", "10169", "10170", "10171", "10172", "10173", "10174", "10175", "10176", "10177", "10178", "10179", "10185", "10199", "10203", "10211", "10212", "10213", "10242", "10249", "10256", "10258", "10259", "10260", "10261", "10265", "10268", "10269", "10270", "10271", "10272", "10273", "10274", "10275", "10276", "10277", "10278", "10279", "10280", "10281", "10282", "10285", "10286" ],
    "country_id" : 6252001,
    "country" : "United States",
    "admin1" : "New York"
  } ],
  "generationtime_ms" : 0.92196465
}

Parse the values we need from the results

We then parse the results so we can call another API to get the current weather for that latitude and longitude via InvokeHTTP.

https://api.weather.gov/points/${latitude:trim()},${longitude:trim()}


The results are geo-json.

{
    "@context": [
        "https://geojson.org/geojson-ld/geojson-context.jsonld",
        {
            "@version": "1.1",
            "wx": "https://api.weather.gov/ontology#",
            "s": "https://schema.org/",
            "geo": "http://www.opengis.net/ont/geosparql#",
            "unit": "http://codes.wmo.int/common/unit/",
            "@vocab": "https://api.weather.gov/ontology#",
            "geometry": {
                "@id": "s:GeoCoordinates",
                "@type": "geo:wktLiteral"
            },
            "city": "s:addressLocality",
            "state": "s:addressRegion",
            "distance": {
                "@id": "s:Distance",
                "@type": "s:QuantitativeValue"
            },
            "bearing": {
                "@type": "s:QuantitativeValue"
            },
            "value": {
                "@id": "s:value"
            },
            "unitCode": {
                "@id": "s:unitCode",
                "@type": "@id"
            },
            "forecastOffice": {
                "@type": "@id"
            },
            "forecastGridData": {
                "@type": "@id"
            },
            "publicZone": {
                "@type": "@id"
            },
            "county": {
                "@type": "@id"
            }
        }
    ],
    "id": "https://api.weather.gov/points/40.7143,-74.006",
    "type": "Feature",
    "geometry": {
        "type": "Point",
        "coordinates": [
            -74.006,
            40.714300000000001
        ]
    },
    "properties": {
        "@id": "https://api.weather.gov/points/40.7143,-74.006",
        "@type": "wx:Point",
        "cwa": "OKX",
        "forecastOffice": "https://api.weather.gov/offices/OKX",
        "gridId": "OKX",
        "gridX": 33,
        "gridY": 35,
        "forecast": "https://api.weather.gov/gridpoints/OKX/33,35/forecast",
        "forecastHourly": "https://api.weather.gov/gridpoints/OKX/33,35/forecast/hourly",
        "forecastGridData": "https://api.weather.gov/gridpoints/OKX/33,35",
        "observationStations": "https://api.weather.gov/gridpoints/OKX/33,35/stations",
        "relativeLocation": {
            "type": "Feature",
            "geometry": {
                "type": "Point",
                "coordinates": [
                    -74.0279259,
                    40.745251000000003
                ]
            },
            "properties": {
                "city": "Hoboken",
                "state": "NJ",
                "distance": {
                    "unitCode": "wmoUnit:m",
                    "value": 3906.1522008034999
                },
                "bearing": {
                    "unitCode": "wmoUnit:degree_(angle)",
                    "value": 151
                }
            }
        },
        "forecastZone": "https://api.weather.gov/zones/forecast/NYZ072",
        "county": "https://api.weather.gov/zones/county/NYC061",
        "fireWeatherZone": "https://api.weather.gov/zones/fire/NYZ212",
        "timeZone": "America/New_York",
        "radarStation": "KDIX"
    }
}


We use EvaluateJSONPath to grab a forecast URL.

Use EvaluateJSONPath to grab a forecast URL

Then we call that forecast URL via invokeHTTP.

That produces a larger JSON output that we will parse for the results we want to return to Slack.

 
{
    "@context": [
        "https://geojson.org/geojson-ld/geojson-context.jsonld",
        {
            "@version": "1.1",
            "wx": "https://api.weather.gov/ontology#",
            "geo": "http://www.opengis.net/ont/geosparql#",
            "unit": "http://codes.wmo.int/common/unit/",
            "@vocab": "https://api.weather.gov/ontology#"
        }
    ],
    "type": "Feature",
    "geometry": {
        "type": "Polygon",
        "coordinates": [
            [
                [
                    -74.025095199999996,
                    40.727052399999998
                ],
                [
                    -74.0295579,
                    40.705361699999997
                ],
                [
                    -74.000948300000005,
                    40.701977499999998
                ],
                [
                    -73.996479800000003,
                    40.723667899999995
                ],
                [
                    -74.025095199999996,
                    40.727052399999998
                ]
            ]
        ]
    },
    "properties": {
        "updated": "2023-11-15T14:34:46+00:00",
        "units": "us",
        "forecastGenerator": "BaselineForecastGenerator",
        "generatedAt": "2023-11-15T15:11:39+00:00",
        "updateTime": "2023-11-15T14:34:46+00:00",
        "validTimes": "2023-11-15T08:00:00+00:00/P7DT17H",
        "elevation": {
            "unitCode": "wmoUnit:m",
            "value": 2.1335999999999999
        },
        "periods": [
            {
                "number": 1,
                "name": "Today",
                "startTime": "2023-11-15T10:00:00-05:00",
                "endTime": "2023-11-15T18:00:00-05:00",
                "isDaytime": true,
                "temperature": 51,
                "temperatureUnit": "F",
                "temperatureTrend": null,
                "probabilityOfPrecipitation": {
                    "unitCode": "wmoUnit:percent",
                    "value": null
                },
                "dewpoint": {
                    "unitCode": "wmoUnit:degC",
                    "value": 2.2222222222222223
                },
                "relativeHumidity": {
                    "unitCode": "wmoUnit:percent",
                    "value": 68
                },
                "windSpeed": "1 to 7 mph",
                "windDirection": "SW",
                "icon": "https://api.weather.gov/icons/land/day/bkn?size=medium",
                "shortForecast": "Partly Sunny",
                "detailedForecast": "Partly sunny, with a high near 51. Southwest wind 1 to 7 mph."
            },
            {
                "number": 2,
                "name": "Tonight",
                "startTime": "2023-11-15T18:00:00-05:00",
                "endTime": "2023-11-16T06:00:00-05:00",
                "isDaytime": false,
                "temperature": 44,
                "temperatureUnit": "F",
                "temperatureTrend": null,
                "probabilityOfPrecipitation": {
                    "unitCode": "wmoUnit:percent",
                    "value": null
                },
                "dewpoint": {
                    "unitCode": "wmoUnit:degC",
                    "value": 3.8888888888888888
                },
                "relativeHumidity": {
                    "unitCode": "wmoUnit:percent",
                    "value": 82
                },
                "windSpeed": "8 mph",
                "windDirection": "SW",
                "icon": "https://api.weather.gov/icons/land/night/sct?size=medium",
                "shortForecast": "Partly Cloudy",
                "detailedForecast": "Partly cloudy, with a low around 44. Southwest wind around 8 mph."
            },
            {
                "number": 3,
                "name": "Thursday",
                "startTime": "2023-11-16T06:00:00-05:00",
                "endTime": "2023-11-16T18:00:00-05:00",
                "isDaytime": true,
                "temperature": 60,
                "temperatureUnit": "F",
                "temperatureTrend": "falling",
                "probabilityOfPrecipitation": {
                    "unitCode": "wmoUnit:percent",
                    "value": null
                },
                "dewpoint": {
                    "unitCode": "wmoUnit:degC",
                    "value": 5.5555555555555554
                },
                "relativeHumidity": {
                    "unitCode": "wmoUnit:percent",
                    "value": 82
                },
                "windSpeed": "6 mph",
                "windDirection": "SW",
                "icon": "https://api.weather.gov/icons/land/day/few?size=medium",
                "shortForecast": "Sunny",
                "detailedForecast": "Sunny. High near 60, with temperatures falling to around 58 in the afternoon. Southwest wind around 6 mph."
            },
            {
                "number": 4,
                "name": "Thursday Night",
                "startTime": "2023-11-16T18:00:00-05:00",
                "endTime": "2023-11-17T06:00:00-05:00",
                "isDaytime": false,
                "temperature": 47,
                "temperatureUnit": "F",
                "temperatureTrend": null,
                "probabilityOfPrecipitation": {
                    "unitCode": "wmoUnit:percent",
                    "value": null
                },
                "dewpoint": {
                    "unitCode": "wmoUnit:degC",
                    "value": 6.1111111111111107
                },
                "relativeHumidity": {
                    "unitCode": "wmoUnit:percent",
                    "value": 80
                },
                "windSpeed": "3 mph",
                "windDirection": "SW",
                "icon": "https://api.weather.gov/icons/land/night/few?size=medium",
                "shortForecast": "Mostly Clear",
                "detailedForecast": "Mostly clear, with a low around 47. Southwest wind around 3 mph."
            },
            {
                "number": 5,
                "name": "Friday",
                "startTime": "2023-11-17T06:00:00-05:00",
                "endTime": "2023-11-17T18:00:00-05:00",
                "isDaytime": true,
                "temperature": 63,
                "temperatureUnit": "F",
                "temperatureTrend": "falling",
                "probabilityOfPrecipitation": {
                    "unitCode": "wmoUnit:percent",
                    "value": 20
                },
                "dewpoint": {
                    "unitCode": "wmoUnit:degC",
                    "value": 12.222222222222221
                },
                "relativeHumidity": {
                    "unitCode": "wmoUnit:percent",
                    "value": 86
                },
                "windSpeed": "2 to 10 mph",
                "windDirection": "S",
                "icon": "https://api.weather.gov/icons/land/day/bkn/rain,20?size=medium",
                "shortForecast": "Partly Sunny then Slight Chance Light Rain",
                "detailedForecast": "A slight chance of rain after 1pm. Partly sunny. High near 63, with temperatures falling to around 61 in the afternoon. South wind 2 to 10 mph. Chance of precipitation is 20%."
            },
            {
                "number": 6,
                "name": "Friday Night",
                "startTime": "2023-11-17T18:00:00-05:00",
                "endTime": "2023-11-18T06:00:00-05:00",
                "isDaytime": false,
                "temperature": 51,
                "temperatureUnit": "F",
                "temperatureTrend": null,
                "probabilityOfPrecipitation": {
                    "unitCode": "wmoUnit:percent",
                    "value": 70
                },
                "dewpoint": {
                    "unitCode": "wmoUnit:degC",
                    "value": 12.777777777777779
                },
                "relativeHumidity": {
                    "unitCode": "wmoUnit:percent",
                    "value": 100
                },
                "windSpeed": "6 to 10 mph",
                "windDirection": "SW",
                "icon": "https://api.weather.gov/icons/land/night/rain,60/rain,70?size=medium",
                "shortForecast": "Light Rain Likely",
                "detailedForecast": "Rain likely. Cloudy, with a low around 51. Chance of precipitation is 70%. New rainfall amounts between a quarter and half of an inch possible."
            },
            {
                "number": 7,
                "name": "Saturday",
                "startTime": "2023-11-18T06:00:00-05:00",
                "endTime": "2023-11-18T18:00:00-05:00",
                "isDaytime": true,
                "temperature": 55,
                "temperatureUnit": "F",
                "temperatureTrend": null,
                "probabilityOfPrecipitation": {
                    "unitCode": "wmoUnit:percent",
                    "value": 70
                },
                "dewpoint": {
                    "unitCode": "wmoUnit:degC",
                    "value": 11.111111111111111
                },
                "relativeHumidity": {
                    "unitCode": "wmoUnit:percent",
                    "value": 100
                },
                "windSpeed": "8 to 18 mph",
                "windDirection": "NW",
                "icon": "https://api.weather.gov/icons/land/day/rain,70/rain,30?size=medium",
                "shortForecast": "Light Rain Likely",
                "detailedForecast": "Rain likely before 1pm. Partly sunny, with a high near 55. Chance of precipitation is 70%."
            },
            {
                "number": 8,
                "name": "Saturday Night",
                "startTime": "2023-11-18T18:00:00-05:00",
                "endTime": "2023-11-19T06:00:00-05:00",
                "isDaytime": false,
                "temperature": 40,
                "temperatureUnit": "F",
                "temperatureTrend": null,
                "probabilityOfPrecipitation": {
                    "unitCode": "wmoUnit:percent",
                    "value": null
                },
                "dewpoint": {
                    "unitCode": "wmoUnit:degC",
                    "value": 1.1111111111111112
                },
                "relativeHumidity": {
                    "unitCode": "wmoUnit:percent",
                    "value": 65
                },
                "windSpeed": "12 to 17 mph",
                "windDirection": "NW",
                "icon": "https://api.weather.gov/icons/land/night/few?size=medium",
                "shortForecast": "Mostly Clear",
                "detailedForecast": "Mostly clear, with a low around 40."
            },
            {
                "number": 9,
                "name": "Sunday",
                "startTime": "2023-11-19T06:00:00-05:00",
                "endTime": "2023-11-19T18:00:00-05:00",
                "isDaytime": true,
                "temperature": 50,
                "temperatureUnit": "F",
                "temperatureTrend": null,
                "probabilityOfPrecipitation": {
                    "unitCode": "wmoUnit:percent",
                    "value": null
                },
                "dewpoint": {
                    "unitCode": "wmoUnit:degC",
                    "value": -0.55555555555555558
                },
                "relativeHumidity": {
                    "unitCode": "wmoUnit:percent",
                    "value": 65
                },
                "windSpeed": "10 to 14 mph",
                "windDirection": "W",
                "icon": "https://api.weather.gov/icons/land/day/few?size=medium",
                "shortForecast": "Sunny",
                "detailedForecast": "Sunny, with a high near 50."
            },
            {
                "number": 10,
                "name": "Sunday Night",
                "startTime": "2023-11-19T18:00:00-05:00",
                "endTime": "2023-11-20T06:00:00-05:00",
                "isDaytime": false,
                "temperature": 38,
                "temperatureUnit": "F",
                "temperatureTrend": null,
                "probabilityOfPrecipitation": {
                    "unitCode": "wmoUnit:percent",
                    "value": null
                },
                "dewpoint": {
                    "unitCode": "wmoUnit:degC",
                    "value": -0.55555555555555558
                },
                "relativeHumidity": {
                    "unitCode": "wmoUnit:percent",
                    "value": 67
                },
                "windSpeed": "13 mph",
                "windDirection": "NW",
                "icon": "https://api.weather.gov/icons/land/night/few?size=medium",
                "shortForecast": "Mostly Clear",
                "detailedForecast": "Mostly clear, with a low around 38."
            },
            {
                "number": 11,
                "name": "Monday",
                "startTime": "2023-11-20T06:00:00-05:00",
                "endTime": "2023-11-20T18:00:00-05:00",
                "isDaytime": true,
                "temperature": 46,
                "temperatureUnit": "F",
                "temperatureTrend": null,
                "probabilityOfPrecipitation": {
                    "unitCode": "wmoUnit:percent",
                    "value": null
                },
                "dewpoint": {
                    "unitCode": "wmoUnit:degC",
                    "value": -1.6666666666666667
                },
                "relativeHumidity": {
                    "unitCode": "wmoUnit:percent",
                    "value": 70
                },
                "windSpeed": "13 mph",
                "windDirection": "NW",
                "icon": "https://api.weather.gov/icons/land/day/sct?size=medium",
                "shortForecast": "Mostly Sunny",
                "detailedForecast": "Mostly sunny, with a high near 46."
            },
            {
                "number": 12,
                "name": "Monday Night",
                "startTime": "2023-11-20T18:00:00-05:00",
                "endTime": "2023-11-21T06:00:00-05:00",
                "isDaytime": false,
                "temperature": 38,
                "temperatureUnit": "F",
                "temperatureTrend": null,
                "probabilityOfPrecipitation": {
                    "unitCode": "wmoUnit:percent",
                    "value": null
                },
                "dewpoint": {
                    "unitCode": "wmoUnit:degC",
                    "value": -1.1111111111111112
                },
                "relativeHumidity": {
                    "unitCode": "wmoUnit:percent",
                    "value": 70
                },
                "windSpeed": "10 mph",
                "windDirection": "N",
                "icon": "https://api.weather.gov/icons/land/night/sct?size=medium",
                "shortForecast": "Partly Cloudy",
                "detailedForecast": "Partly cloudy, with a low around 38."
            },
            {
                "number": 13,
                "name": "Tuesday",
                "startTime": "2023-11-21T06:00:00-05:00",
                "endTime": "2023-11-21T18:00:00-05:00",
                "isDaytime": true,
                "temperature": 49,
                "temperatureUnit": "F",
                "temperatureTrend": null,
                "probabilityOfPrecipitation": {
                    "unitCode": "wmoUnit:percent",
                    "value": 30
                },
                "dewpoint": {
                    "unitCode": "wmoUnit:degC",
                    "value": 2.7777777777777777
                },
                "relativeHumidity": {
                    "unitCode": "wmoUnit:percent",
                    "value": 73
                },
                "windSpeed": "9 to 13 mph",
                "windDirection": "E",
                "icon": "https://api.weather.gov/icons/land/day/bkn/rain,30?size=medium",
                "shortForecast": "Partly Sunny then Chance Light Rain",
                "detailedForecast": "A chance of rain after 1pm. Partly sunny, with a high near 49. Chance of precipitation is 30%."
            },
            {
                "number": 14,
                "name": "Tuesday Night",
                "startTime": "2023-11-21T18:00:00-05:00",
                "endTime": "2023-11-22T06:00:00-05:00",
                "isDaytime": false,
                "temperature": 46,
                "temperatureUnit": "F",
                "temperatureTrend": null,
                "probabilityOfPrecipitation": {
                    "unitCode": "wmoUnit:percent",
                    "value": 50
                },
                "dewpoint": {
                    "unitCode": "wmoUnit:degC",
                    "value": 7.7777777777777777
                },
                "relativeHumidity": {
                    "unitCode": "wmoUnit:percent",
                    "value": 86
                },
                "windSpeed": "13 to 18 mph",
                "windDirection": "S",
                "icon": "https://api.weather.gov/icons/land/night/rain,50?size=medium",
                "shortForecast": "Chance Light Rain",
                "detailedForecast": "A chance of rain. Mostly cloudy, with a low around 46. Chance of precipitation is 50%."
            }
        ]
    }
}


Larger JSON output that we will parse for the results we want to return to Slack

We parse the data with EvaluateJSONPath to get primary fields for the weather.

We then format those fields to PutSlack.

LLM Skipped. Read forecast on ${date} for ${weatherlocation} @ ${latitude},${longitude}
Used ${forecasturl} ${icon} Temp: ${temperature} ${temperatureunit} - ${temperaturetrend}
There is a wind ${winddirection} at ${windspeed}. ${detailedforecast}

Slack Output to question "What is the weather in Highstown?"

Slack Output

If we do have an LLM question, let’s make sure it’s just one record.

If we do have an LLM question, let’s make sure it’s just one record

We use a few different models that are available at IBM WatsonX.AI on IBM Cloud to quickly be accessed by our REST prompts.

Log in to IBM Cloud

I tested and built the prompts initially at IBM’s Prompt Lab and then copied the initial curl statement from there.

IBM Prompt Lab

Click here for supported foundation models available with IBM watsonx.ai.

ibm/mpt-7b-instruct2
meta-llama/llama-2–70b-chat
ibm/granite-13b-chat-v1

Property: Replacement Value

We have to send our unique secure key to IBM and they will give us a token to use in our next call.

We parse out the question and then send it to WatsonX via REST API.

Parse out the question and then send it to WatsonX via REST API

We build a prompt to send to IBM as follows.

{
  "model_id": "meta-llama/llama-2-70b-chat",
  "input": "${inputs:urlEncode()}",
  "parameters": {
    "decoding_method": "greedy",
    "max_new_tokens": 200,
    "min_new_tokens": 50,
    "stop_sequences": [],
    "repetition_penalty": 1
  },
  "project_id": "0ead8ec4-d137-4f9c-8956-50b0da4a7068" }

Build a prompt to send to IBM

We parse the generated text which is our Generative AI results plus some helpful metadata on timings.

The result posted to Slack is as follows:

“You can use Apache NiFi to integrate Generative AI models in several ways:

  1. Data Preprocessing: Use NiFi to preprocess data before feeding it into your Generative AI model. This can include data cleaning, transformation, and feature engineering.
  2. Model Training: Use NiFi to automate the training process of your Generative AI model. You can use NiFi’s PutFile and PutFile_SFTP processors to write the training data to a file, and then use a processor like ExecuteScript to run the training script.
  3. Model Deployment: Once your Generative AI model is trained, you can use NiFi to deploy it. You can create a NiFi flow that takes in input data, runs it through the Generative AI model, and then outputs the generated data.
  4. Real-time Inference: You can use NiFi’s StreamingJobs” 

Chat results

After the Slackbot posted the results, it posted metrics and debugging information to the chat channel.

All of the metadata is posted to another Slack channel for administrator monitoring.

==== NiFi to IBM WatsonX.AI LLM Answers
On Date: Wed, 15 Nov 2023 15:43:29 GMT Created: 2023-11-15T15:43:29.248Z
  Prompt: Q:   What is a good way to integrate Generative AI and Apache NiFi?
Response: )
You can use Apache NiFi to integrate Generative AI models in several ways:
1. Data Preprocessing: Use NiFi to preprocess data before feeding it into your Generative AI model. This can include data cleaning, transformation, and feature engineering.
2. Model Training: Use NiFi to automate the training process of your Generative AI model. You can use NiFi's PutFile and PutFile_SFTP processors to write the training data to a file, and then use a processor like ExecuteScript to run the training script.
3. Model Deployment: Once your Generative AI model is trained, you can use NiFi to deploy it. You can create a NiFi flow that takes in input data, runs it through the Generative AI model, and then outputs the generated data.
4. Real-time Inference: You can use NiFi's StreamingJobs
Token: 200
Req Duration: 8153
HTTP TX ID: 89d71099-da23-4e7e-89f9-4e8f5620c0fb
IBM Msg: This model is a Non-IBM Product governed by a third-party license that may impose use restrictions and other obligations. By using this model you agree to its terms as identified in the following URL. URL: https://dataplatform.cloud.ibm.com/docs/content/wsj/analyze-data/fm-models.html?context=wx
IBM Msg ID: disclaimer_warning
Model ID: meta-llama/llama-2-70b-chat
Stop Reason: max_tokens
Token Count: 38
TX ID: NGp0djg-c05f740f84f84b7c80f93f9da05aa756
UUID: da0806cb-6133-4bf4-808e-1fbf419c09e3
Corr ID: NGp0djg-c05f740f84f84b7c80f93f9da05aa756
Global TX ID: 20c3a9cf276c38bcdaf26e3c27d0479b
Service Time: 478
Request ID: 03c2726a-dcb6-407f-96f1-f83f20fe9c9c
File Name: 1a3c4386-86d2-4969-805b-37649c16addb
Request Duration: 8153
Request URL: https://us-south.ml.cloud.ibm.com/ml/v1-beta/generation/text?version=2023-05-29
cf-ray: 82689bfd28e48ce2-EWR
=====

 

Make Your Own Slackbot

Make your own Slackbot - 1

Make your own Slackbot - 2

Make your own Slackbot - 3


Slack Output

Slack Output

Kafka Distribute

Kafka distribute

Apache Flink SQL Table Creation DDL

CREATE TABLE `ssb`.`Meetups`.`watsonairesults` (
  `date` VARCHAR(2147483647),
  `x_global_transaction_id` VARCHAR(2147483647),
  `x_request_id` VARCHAR(2147483647),
  `cf_ray` VARCHAR(2147483647),
  `inputs` VARCHAR(2147483647),
  `created_at` VARCHAR(2147483647),
  `stop_reason` VARCHAR(2147483647),
  `x_correlation_id` VARCHAR(2147483647),
  `x_proxy_upstream_service_time` VARCHAR(2147483647),
  `message_id` VARCHAR(2147483647),
  `model_id` VARCHAR(2147483647),
  `invokehttp_request_duration` VARCHAR(2147483647),
  `message` VARCHAR(2147483647),
  `uuid` VARCHAR(2147483647),
  `generated_text` VARCHAR(2147483647),
  `transaction_id` VARCHAR(2147483647),
  `tokencount` VARCHAR(2147483647),
  `generated_token` VARCHAR(2147483647),
  `ts` VARCHAR(2147483647),
  `advisoryId` VARCHAR(2147483647),
  `eventTimeStamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp',
  WATERMARK FOR `eventTimeStamp` AS `eventTimeStamp` - INTERVAL '3' SECOND
) WITH (
  'deserialization.failure.policy' = 'ignore_and_log',
  'properties.request.timeout.ms' = '120000',
  'format' = 'json',
  'properties.bootstrap.servers' = 'kafka:9092',
  'connector' = 'kafka',
  'properties.transaction.timeout.ms' = '900000',
  'topic' = 'watsonxaillmanswers',
  'scan.startup.mode' = 'group-offsets',
  'properties.auto.offset.reset' = 'earliest',
  'properties.group.id' = 'watsonxaillmconsumer'
)

CREATE TABLE `ssb`.`Meetups`.`watsonxresults` (
  `date` VARCHAR(2147483647),
  `x_global_transaction_id` VARCHAR(2147483647),
  `x_request_id` VARCHAR(2147483647),
  `cf_ray` VARCHAR(2147483647),
  `inputs` VARCHAR(2147483647),
  `created_at` VARCHAR(2147483647),
  `stop_reason` VARCHAR(2147483647),
  `x_correlation_id` VARCHAR(2147483647),
  `x_proxy_upstream_service_time` VARCHAR(2147483647),
  `message_id` VARCHAR(2147483647),
  `model_id` VARCHAR(2147483647),
  `invokehttp_request_duration` VARCHAR(2147483647),
  `message` VARCHAR(2147483647),
  `uuid` VARCHAR(2147483647),
  `generated_text` VARCHAR(2147483647),
  `transaction_id` VARCHAR(2147483647),
  `tokencount` VARCHAR(2147483647),
  `generated_token` VARCHAR(2147483647),
  `ts` VARCHAR(2147483647),
  `eventTimeStamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp',
  WATERMARK FOR `eventTimeStamp` AS `eventTimeStamp` - INTERVAL '3' SECOND
) WITH (
  'deserialization.failure.policy' = 'ignore_and_log',
  'properties.request.timeout.ms' = '120000',
  'format' = 'json',
  'properties.bootstrap.servers' = 'kafka:9092',
  'connector' = 'kafka',
  'properties.transaction.timeout.ms' = '900000',
  'topic' = 'watsonxaillm',
  'scan.startup.mode' = 'group-offsets',
  'properties.auto.offset.reset' = 'earliest',
  'properties.group.id' = 'allwatsonx1'
)


Example Prompt

{"inputs":"Please answer to the following question. What is the capital of the United States?"}


IBM DB2 SQL

alter table  "DB2INST1"."TRAVELADVISORY"
add column "summary" VARCHAR(2048);

-- DB2INST1.TRAVELADVISORY definition

CREATE TABLE "DB2INST1"."TRAVELADVISORY"  (
    "TITLE" VARCHAR(250 OCTETS) , 
    "PUBDATE" VARCHAR(250 OCTETS) , 
    "LINK" VARCHAR(250 OCTETS) , 
    "GUID" VARCHAR(250 OCTETS) , 
    "ADVISORYID" VARCHAR(250 OCTETS) , 
    "DOMAIN" VARCHAR(250 OCTETS) , 
    "CATEGORY" VARCHAR(4096 OCTETS) , 
    "DESCRIPTION" VARCHAR(4096 OCTETS) , 
    "UUID" VARCHAR(250 OCTETS) NOT NULL , 
    "TS" BIGINT NOT NULL , 
    "summary" VARCHAR(2048 OCTETS) )   
   IN "IBMDB2SAMPLEREL"  
   ORGANIZE BY ROW;

ALTER TABLE "DB2INST1"."TRAVELADVISORY" 
 ADD PRIMARY KEY
  ("UUID")
 ENFORCED;

GRANT CONTROL ON TABLE "DB2INST1"."TRAVELADVISORY" TO USER "DB2INST1";

GRANT CONTROL ON INDEX "SYSIBM  "."SQL230620142604860" TO USER "DB2INST1";


SELECT "summary", TITLE , ADVISORYID , TS, PUBDATE  FROM DB2INST1.TRAVELADVISORY t 
WHERE "summary" IS NOT NULL 
ORDER BY ts DESC 


Example Output Email

  • GitHub README
  • GitHub repo

Video


Source Code

  • Source Code
AI Apache NiFi JSON NLP Slack (software)

Published at DZone with permission of Tim Spann, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • A Complete Guide to Modern AI Developer Tools
  • Getting Started With GenAI on BigQuery: A Step-by-Step Guide
  • Create Your Own AI-Powered Virtual Tutor: An Easy Tutorial
  • Create Your Own AI-Powered Virtual Tutor: An Easy Tutorial

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!