DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • Android Cloud Apps with Azure
  • Delivering Your Code to the Cloud With JFrog Artifactory and GitHub Actions
  • Migrating Spring Java Applications to Azure App Service (Part 1: DataSources and Credentials)
  • Identifying, Exploiting, and Preventing Host Header Attacks on Web Servers

Trending

  • GitHub Copilot's New AI Coding Agent Saves Developers Time – And Requires Their Oversight
  • Scaling Microservices With Docker and Kubernetes on Production
  • Rust, WASM, and Edge: Next-Level Performance
  • Enforcing Architecture With ArchUnit in Java
  1. DZone
  2. Software Design and Architecture
  3. Cloud Architecture
  4. Redis Streams in Action (Part 4)

Redis Streams in Action (Part 4)

In the final part, we walk through how to build a serverless Go app to monitor tweets that have been abandoned due to processing failure or consumer instance failure.

By 
Abhishek Gupta user avatar
Abhishek Gupta
DZone Core CORE ·
Jul. 29, 21 · Tutorial
Likes (3)
Comment
Save
Tweet
Share
5.2K Views

Join the DZone community and get the full member experience.

Join For Free

Welcome to this series of blog posts that covers Redis Streams with the help of a practical example. We will use a sample application to make Twitter data available for search and query in real-time. RediSearch and Redis Streams serve as the backbone of this solution that consists of several co-operating components, each of which will be covered in a dedicated blog post.

The code is available in this GitHub repo -   https://github.com/abhirockzz/redis-streams-in-action

We will continue from where we left off in the previous blog post and see how to build a monitoring app to make the overall system more robust in the face of high load or failure scenarios. This is because very often, data processing applications either slow down (due to high data volumes) or may even crash/stop due to circumstances beyond our control. If this happens with our tweets processing application, the messages that were assigned to a specific instance will be left unprocessed. The monitoring component covered in this blog post checks pending tweets (using XPENDING), claims (XCLAIM), processes (store them as HASH using HSET) and finally acknowledges them (XACK).

This is a Go application that will be deployed to Azure Functions — yes, we will be using a serverless model, wherein the monitoring system will be executed based on a pre-defined timer trigger. As always, we will first configure and deploy it to Azure, see it working, and finally, walk through the code.

Before we move on, here is some background about Go support in Azure Functions.

Serverless Go Apps on Azure, Thanks to Custom Handlers

Those who have worked with Azure Functions might recall that Go is not one of the language handlers that is supported by default. That's where custom handlers come into the picture.

In a nutshell, a custom handler is a lightweight web server that receives events from the Functions host. The only thing you need to implement a custom handler in your favorite runtime/language is HTTP support!

An event trigger (via HTTP, storage, event hubs, etc.) invokes the Functions host. The way custom handlers differ from traditional functions is that the Functions host acts as a middle man: it issues a request payload to the web server of the custom handler (the function) along with a payload that contains a trigger, input binding data, and other metadata for the function. The function returns a response back to the Functions host which passes data from the response to the function's output bindings for processing.

Here is a summary of how custom handlers work at a high level (the diagram below has been picked from the documentation). 

Alright, let's move on to the practical bits now.

Pre-Requisites

Please make sure that you read parts two and three of this series and have the respective applications up and running. Our monitoring application will build on top of the tweets producer and processor services that you deploy.

You will need an Azure account (which you can get for free) and the Azure CLI. Make sure to download and install Go if you don't have it already and also install the Azure Functions Core Tools — this will allow you to deploy the function using a CLI (and also run it test and debug it locally).

The upcoming sections will guide you on how to deploy and configure the Azure Function.

Deploy the Monitoring Service to Azure Functions

To do this, you will:

  • Create an Azure Functions app
  • Configure it
  • Deploy the Function to the app that you created

Start by creating a resource group to host all the compon`ents of the solution.

Search for Function App in the Azure portal and click add.

Enter the required details: you should select custom handler as the runtime stack. 

In the Hosting section, choose Linux and Consumption (Serverless) for the operating system and plan type respectively.

Enable application insights (if you need to).

Review the final settings and click create to proceed. 

Once the process is complete, the following resources will also be created along with the Function App:

  • App Service plan (a Consumption/Serverless plan in this case)
  • An Azure storage account
  • An Azure Application Insights (function)

Update the Function App Configuration

Our function needs a few environment variables to work properly — these can be added as Function Configuration using the Azure portal. Here is the list:

  • Redis connectivity details:
    • REDIS_HOST — host and port for Redis instance, e.g. myredis:10000
    • REDIS_PASSWORD — access key (password) for Redis instance
  • Redis Stream info:
    • STREAM_NAME — the name of the Redis Stream (use tweets_stream as the value)
    • STREAM_CONSUMER_GROUP_NAME — name of the Redis Streams consumer group (use redisearch_app_group as the value)
  • Monitoring app metadata:
    • MONITORING_CONSUMER_NAME — name of the consumer instance represented by the monitoring app (it is part of the aforementioned consumer group)
    • MIN_IDLE_TIME_SEC — only pending messages that are older than the specified time interval will be claimed

We're Now Ready to Deploy the Function

First, clone the GitHub repo and build the function:

C#
 
git clone https://github.com/abhirockzz/redis-streams-in-action
cd redis-streams-in-action/monitoring-app

GOOS=linux go build -o processor_monitor cmd/main.go


GOOS=linux is used to build a linux executable since we chose a linux OS for our Function App

To deploy, use the Azure Functions core tools CLI:

C#
 
func azure functionapp publish <enter name of the Azure Function app>


Once completed, you should see the following logs:

C#
 
Getting site publishing info...
Uploading package...
Uploading 3.71 MB [###############################################################################]
Upload completed successfully.
Deployment completed successfully.
Syncing triggers...
Functions in streams-monitor:
    monitor - [timerTrigger]


You should see the function in the Azure portal as well:

Azure portal showing function

The function is configured to trigger every 20 seconds ("schedule": "*/20 * * * * *"), as per function.json: 

C#
 
{
    "bindings": [
        {
            "type": "timerTrigger",
            "direction": "in",
            "name": "req",
            "schedule": "*/20 * * * * *"
        }
    ]
}


Monitoring the Monitoring App

As before, we can inspect the state of our system using redis-cli. Execute the XPENDING command:

C#
 
XPENDING tweets_stream redisearch_app_group


You will get an output similar to this (the numbers will differ in your case depending on how many tweets processor instances you were running and for how long):

C#
 
1) (integer) 209
2) "1620973121009-0"
3) "1621054539960-0"
4) 1) 1) "consumer-1f20d41d-e63e-40d2-bc0f-749f11f15026"
      2) "3"
   2) 1) "monitoring_app"
      2) "206"


As explained before, the monitoring app will claim pending messages which haven't been processed by the other consumers (active or inactive). In the output above, notice that the number of messages currently being processed by monitoring_app (name of our consumer) is 206 — it actually claimed these from another consumer instance(s). Once these messages have been claimed, their ownership moves from their original consumer to the monitoring_app consumer.

You can check the same using XPENDING tweets_stream redisearch_app_group again, but it might be hard to detect since the messages actually get processed pretty quickly.

Out of the 206 messages that were claimed, only the ones that have not been processed in the last 10 seconds (this is the MIN_IDLE_TIME_SEC we had specified) will be processed — others will be ignored and picked up in the next run by XPENDING call (if they are still in an unprocessed state). This is because we want to give some time for our consumer application to finish their work — 10 seconds is a pretty generous time-frame for the processing that involves saving to HASH using HSET followed by XACK.

Please note that the 10 second time interval used above has been used as an example and you should determine these figures based on the end-to-end latencies required for your data pipelines/processing.

You have complete flexibility in terms of how you want to run/operate such a "monitoring" component. I chose a serverless function but you could run it as a standalone program, as a scheduled cron job, or even as a Kubernetes job!

Don't forget to execute RediSearch queries to validate that you can search for tweets based on multiple criteria:

C#
 
FT.SEARCH tweets-index hello
FT.SEARCH tweets-index hello|world
FT.SEARCH tweets-index "@location:India"

FT.SEARCH tweets-index "@user:jo* @location:India"
FT.SEARCH tweets-index "@user:jo* | @location:India"

FT.SEARCH tweets-index "@hashtags:{cov*}"
FT.SEARCH tweets-index "@hashtags:{cov*|Med*}"


Now that we have seen things in action, let's explore the code.

Code Walkthrough

Please refer to the code on GitHub

The app uses the excellent Go-Redis client library. As usual, it all starts with connecting to Redis (note the usage of TLS):

C#
 
    client := redis.NewClient(&redis.Options{Addr: host, Password: password, TLSConfig: &tls.Config{MinVersion: tls.VersionTLS12}})

    err = client.Ping(context.Background()).Err()
    if err != nil {
        log.Fatal(err)
    }


Then comes the part where the bulk of the processing happens — think of it as a workflow with sub-parts.

We call XPENDING to detect the number of pending messages, e.g. XPENDING tweets_stream group1

C#
 
numPendingMessages := client.XPending(context.Background(), streamName, consumerGroupName).Val().Count


To get the pending messages, we invoke a different variant of XPENDING, to which we pass on the number of messages we obtained in the previous call.

C#
 
xpendingResult := client.XPendingExt(context.Background(), &redis.XPendingExtArgs{Stream: streamName,Group: consumerGroupName, Start: "-", End: "+", Count: numPendingMessages})


We can now claim the pending messages — the ownership of these will be the changes from the previous consumer to the new consumer (monitoringConsumerName) whose name we specified.

C#
 
xclaim := client.XClaim(context.Background(), &redis.XClaimArgs{Stream: streamName, Group: consumerGroupName, Consumer: monitoringConsumerName, MinIdle: time.Duration(minIdleTimeSec) * time.Second, Messages: toBeClaimed})


Once the ownership is transferred, we can process them. This involves, adding tweet info to HASH (using HSET) and acknowledging successful processing (XACK). goroutines are used to keep things efficient, for example, if we get 100 claimed messages in a batch, a scatter-gather process is followed where a goroutine is spawned to process each of these messages. A WaitGroup is used to "wait" for the current batch to complete before looking for the next set of pending messages (if any).

C#
 
        for _, claimed := range xclaim.Val() {
            if exitSignalled {
                return
            }
            waitGroup.Add(1)

            go func(tweetFromStream redis.XMessage) {
                hashName := fmt.Sprintf("%s%s", indexDefinitionHashPrefix, tweetFromStream.Values["id"])

                processed := false
                defer func() {
                    waitGroup.Done()
                }()

                err = client.HSet(context.Background(), hashName, claimed.Values).Err()

                if err != nil {
                    return // don't proceed (ACK) if HSET fails
                }

                err = client.XAck(context.Background(), streamName, consumerGroupName, tweetFromStream.ID).Err()
                if err != nil {
                    return
                }
                processed = true
            }(claimed)
        }
        waitGroup.Wait()


Before we dive into the other areas, it might help to understand the nitty-gritty by exploring the code (which is relatively simple, by the way).

A Quick Note on the Application Structure

Here is how the app is set up (folder structure):

C#
 
.
├── cmd
│   └── main.go
├── monitor
│   └── function.json
├── go.mod
├── go.sum
├── host.json


host.json tells the Functions host where to send requests by pointing to a web server capable of processing HTTP events. Notice the customHandler.description.defaultExecutablePath which defines that processor_monitor is the name of the executable that'll be used to run the web server.

C#
 
{
    "version": "2.0",
    "extensionBundle": {
        "id": "Microsoft.Azure.Functions.ExtensionBundle",
        "version": "[1.*, 2.0.0)"
    },
    "customHandler": {
        "description": {
            "defaultExecutablePath": "processor_monitor"
        },
        "enableForwardingHttpRequest": true
    },
    "logging": {
        "logLevel": {
            "default": "Trace"
        }
    }
}


That's a Wrap!

This brings us to the end of this blog series. Let's recap what we learned:

  • In the first part, you got an overview of the use case, architecture, its components, along with an introduction to Redis Streams and RediSearch. It set up the scene for the rest of the series.
  • Part two dealt with the specifics of the Rust-based tweets consumer app that consumed from the Twitter Streaming API and queued up the tweets in Redis Streams for further processing.
  • The third part was all about the Java app that processed these tweets by leveraging the Redis Streams Consumer Group feature and scaling out processing across multiple instances.
  • And the final part (this one) was all about the Go app to monitor tweets that have been abandoned (in the pending entry list) either due to processing failure or consumer instance failure.

I hope you found this useful and apply it to building scalable solutions with Redis Streams. Happy coding!

Redis (company) Stream (computing) app azure application consumer Data processing operating system Host (Unix) Web server

Published at DZone with permission of Abhishek Gupta, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Android Cloud Apps with Azure
  • Delivering Your Code to the Cloud With JFrog Artifactory and GitHub Actions
  • Migrating Spring Java Applications to Azure App Service (Part 1: DataSources and Credentials)
  • Identifying, Exploiting, and Preventing Host Header Attacks on Web Servers

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!