DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • How to Develop Event-Driven Architectures
  • Event Driven Architecture (EDA) - Optimizer or Complicator
  • Next Evolution in Integration: Architecting With Intent Using Model Context Protocol
  • Golang: Is It a Memory Leak?

Trending

  • MCP Servers: The Technical Debt That Is Coming
  • GitHub Copilot's New AI Coding Agent Saves Developers Time – And Requires Their Oversight
  • Scaling Microservices With Docker and Kubernetes on Production
  • Rust, WASM, and Edge: Next-Level Performance
  1. DZone
  2. Software Design and Architecture
  3. Microservices
  4. Golang and Event-Driven Architecture

Golang and Event-Driven Architecture

A high-level overview of what event-driven architecture (EDA) is, its relevance to real-world use cases, with hands-on code samples using Paho MQTT and Solace PubSub+ Go API.

By 
Tamimi Ahmad user avatar
Tamimi Ahmad
·
Updated Apr. 22, 22 · Tutorial
Likes (6)
Comment
Save
Tweet
Share
7.0K Views

Join the DZone community and get the full member experience.

Join For Free

Golong and Event Driven Architecture

On March 31, 2022, I gave a talk at Conf42:Golang about using Go in an event-driven architecture entitled "Gopher in an Event-Driven Playground." You can check out the talk here or read along to know more about event-driven systems and how different messaging protocols are used in Go!

What's All the Hype About?

For all of us Go enthusiasts out there, we truly understand the beauty of using Go in applications and microservices because of its lightweight, high performance, and elegant syntax, to name just a few (let the debate start! ).

So imagine using your favorite programming language (yes, that is Go) with your favorite architecture (yes, that event-driven architecture) — where do you start? Carry on and read further to explore.

Hold On, What Is an Event-Driven Architecture (EDA)?

Glad you asked! There are loads of resources online that talk about EDA. At its core, an event-driven architecture involves asynchronous communication between applications via publishing and subscribing to events over an event broker using a messaging protocol.

Some examples of messaging protocols include open standard and open source protocols such as MQTT, AMQP, and JMS.

To delve more into event-driven architecture and explore different use-cases, check out this page on What is Event-Driven Architecture.

fmt.Println("Show me the CODE!")

Alright, we did a high-level overview of what EDA is, but let's get to business with some hands-on code. Let me show you how to use:

  1. The Paho MQTT library for Golang.
  2. The PubSub+ Messaging API for Go.

Note that all the source code can be found in this open-source Github repo.

Hands-on

Let's start with creating a new directory and initializing go mod. In a new terminal window, execute the following commands:

mkdir funWithEDA && cd "$_"
go mod init GoEDA


MQTT

As per definition: "MQTT is an OASIS standard messaging protocol for the Internet of Things (IoT). It is designed as an extremely lightweight publish/subscribe messaging transport that is ideal for connecting remote devices with a small code footprint and minimal network bandwidth"

The Eclipse Paho project provides open-source, mainly client-side, implementations of MQTT in a variety of programming languages. For today's fun coding session, we will be using the Eclipse Paho MQTT Go client.

Install the Paho MQTT Go library:

go get github.com/eclipse/paho.mqtt.golang


Create a new file and open it with your favorite IDE. I named my file go_mqtt.go.

Initialize the file and import the necessary libraries:

package main

import (
    "fmt"
    "time"

    mqtt "github.com/eclipse/paho.mqtt.golang"
)


In your main() function, start with:

  1. Creating a new MQTT client and configuring it.
  2. Add callback functions for:
    1. Received Messages.
    2. Successful broker connection.
    3. Lost broker connection.
  3. Subscribe to a topic.
  4. Publish on a topic.
func main() {
    var broker = "public.messaging.solace.cloud"
    var port = 1883
    opts := mqtt.NewClientOptions()
    opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
    opts.SetUsername("conf42")
    opts.SetPassword("public")
    opts.SetDefaultPublishHandler(messageHandler)
    opts.OnConnect = connectHandler
    opts.OnConnectionLost = connectLostHandler
    client := mqtt.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }

    sub(client)
    publish(client)

    client.Disconnect(250)
}


We will need to define the callback functions as follows

var messageHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
    fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}

var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
    options := client.OptionsReader()
    fmt.Println("Connected to: ", options.Servers())
}

var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
    fmt.Printf("Connect lost: %v", err)
}


It is important to note that these functions will be triggered "on" particular actions. So for example, the messageHandler function will be triggered whenever the MQTT client receives a message via mqtt.MessageHandler.

And finally, define your publish and subscribe functions as follows:

func publish(client mqtt.Client) {
    num := 10
    for i := 0; i < num; i++ {
        text := fmt.Sprintf("Message %d", i)
        token := client.Publish("conf42/go", 0, false, text)
        token.Wait()
        time.Sleep(time.Second)
    }
}

func sub(client mqtt.Client) {
    topic := "conf42/#"
    token := client.Subscribe(topic, 1, nil)
    token.Wait()
    fmt.Printf("Subscribed to topic: %s\n", topic)
}


And that's it! Run the application and observe the results:

go run go_mqtt.go


Solace PubSub+ Messaging API for Go

Now that you are an expert on messaging concepts with Go, let's take it up a notch and delve into a more advanced messaging API! We'll be using the Solace PubSub+ Messaging API for Go.

Install the Solace Native Go API:

go get solace.dev/go/messaging


Create a new file and open it with your favorite IDE. I named my file solace_publisher.go.

Import the necessary packages:

package main

import (
    "fmt"
    "os"
    "os/signal"
    "strconv"
    "time"

    "solace.dev/go/messaging"
    "solace.dev/go/messaging/pkg/solace/config"
    "solace.dev/go/messaging/pkg/solace/resource"
)

Define the configuration parameters to connect to the Solace PubSub+ Broker

// Configuration parameters
    brokerConfig := config.ServicePropertyMap{
        config.TransportLayerPropertyHost:                "tcp://public.messaging.solace.cloud",
        config.ServicePropertyVPNName:                    "public",
        config.AuthenticationPropertySchemeBasicUserName: "conf42",
        config.AuthenticationPropertySchemeBasicPassword: "public",
    }


Initialize a messaging service and connect to it:

messagingService, err := messaging.NewMessagingServiceBuilder().FromConfigurationProvider(brokerConfig).Build()

if err != nil {
    panic(err)
}

// Connect to the messaging serice
if err := messagingService.Connect(); err != nil {
    panic(err)
}


Build a Direct Message Publisher and start it:

//  Build a Direct Message Publisher
directPublisher, builderErr := messagingService.CreateDirectMessagePublisherBuilder().Build()
if builderErr != nil {
    panic(builderErr)
}

// Start the publisher
startErr := directPublisher.Start()
if startErr != nil {
    panic(startErr)
}


Publish messages in a loop:

msgSeqNum := 0

//  Prepare outbound message payload and body
messageBody := "Hello from Conf42"
messageBuilder := messagingService.MessageBuilder().
    WithProperty("application", "samples").
    WithProperty("language", "go")

// Run forever until an interrupt signal is received
go func() {
    for directPublisher.IsReady() {
        msgSeqNum++
        message, err := messageBuilder.BuildWithStringPayload(messageBody + " --> " + strconv.Itoa(msgSeqNum))
        if err != nil {
            panic(err)
        }
        topic := resource.TopicOf("conf42/solace/go/" + strconv.Itoa(msgSeqNum))

        // Publish on dynamic topic with dynamic body
        publishErr := directPublisher.Publish(message, topic)
        if publishErr != nil {
            panic(publishErr)
        }

        fmt.Println("Published message on topic: ", topic.GetName())
        time.Sleep(1 * time.Second)
    }
}()


This is the final application:

package main

import (
    "fmt"
    "os"
    "os/signal"
    "strconv"
    "time"

    "solace.dev/go/messaging"
    "solace.dev/go/messaging/pkg/solace/config"
    "solace.dev/go/messaging/pkg/solace/resource"
)

func main() {
    // Configuration parameters
    brokerConfig := config.ServicePropertyMap{
        config.TransportLayerPropertyHost:                "tcp://public.messaging.solace.cloud",
        config.ServicePropertyVPNName:                    "public",
        config.AuthenticationPropertySchemeBasicUserName: "conf42",
        config.AuthenticationPropertySchemeBasicPassword: "public",
    }

    messagingService, err := messaging.NewMessagingServiceBuilder().FromConfigurationProvider(brokerConfig).Build()

    if err != nil {
        panic(err)
    }

    // Connect to the messaging serice
    if err := messagingService.Connect(); err != nil {
        panic(err)
    }

    //  Build a Direct Message Publisher
    directPublisher, builderErr := messagingService.CreateDirectMessagePublisherBuilder().Build()
    if builderErr != nil {
        panic(builderErr)
    }

    // Start the publisher
    startErr := directPublisher.Start()
    if startErr != nil {
        panic(startErr)
    }

    msgSeqNum := 0

    //  Prepare outbound message payload and body
    messageBody := "Hello from Conf42"
    messageBuilder := messagingService.MessageBuilder().
        WithProperty("application", "samples").
        WithProperty("language", "go")

    // Run forever until an interrupt signal is received
    go func() {
        for directPublisher.IsReady() {
            msgSeqNum++
            message, err := messageBuilder.BuildWithStringPayload(messageBody + " --> " + strconv.Itoa(msgSeqNum))
            if err != nil {
                panic(err)
            }
            topic := resource.TopicOf("conf42/solace/go/" + strconv.Itoa(msgSeqNum))

            // Publish on dynamic topic with dynamic body
            publishErr := directPublisher.Publish(message, topic)
            if publishErr != nil {
                panic(publishErr)
            }

            fmt.Println("Published message on topic: ", topic.GetName())
            time.Sleep(1 * time.Second)
        }
    }()

    // Handle OS interrupts
    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt)

    // Block until an OS interrupt signal is received.
    <-c

    // Terminate the Direct Publisher
    directPublisher.Terminate(1 * time.Second)
    fmt.Println("\nDirect Publisher Terminated? ", directPublisher.IsTerminated())
    // Disconnect the Message Service
    messagingService.Disconnect()
    fmt.Println("Messaging Service Disconnected? ", !messagingService.IsConnected())

}


And that's it! Run the publisher as follows:

go run solace_publisher.go


Note: You can use the Solace PubSub+ TryMe tab to connect to the broker and subscribe to any topic you want. Subscribe to topic conf42/solace/>

Bonus! You can run a subscriber application in another terminal, subscribe to conf42/solace/>, and observe the results. You can find more about this on the SolaceSample Github org.

Challenge! Run a publisher and a subscriber in the same application!

Note on Protocol Interoperability

Using the Solace PubSub+ Event Broker, you can leverage the protocol translation and interoperability features. Even though the Go API was used to connect to the broker, you can use other Solace APIs and/or open standard protocols to connect to the broker and still have the microservices send and receive messages to each other.

Solace Protocol Interoperability

This Is All Cool. How Can I Know More?

In summary, here are some quick links:

  • A Solace Community post listing a bunch of resources
  • Solace PubSub+ Messaging API for Go Samples
  • Blog post on the motivation behind a native Solace Go API

If you want to see my colleagues and I during a live streaming session talking about EDA, the Solace PubSub+ Messaging API for Go, and coding (on LIVE television!), check out this event:

What About You?

I am curious to hear from the go community!

  • What is it about Golang that you enjoy?
  • Have you ever used Golang in an event-driven application?
  • What messaging protocols and/or open standards have you used?
  • What message brokers do you use?

Open to all sorts of discussions, comments, and questions!

And P.S., You can check out the other talks from Conf42 at conf42.com/golang2022

Architecture Event-driven architecture Golang Open source Protocol (object-oriented programming)

Published at DZone with permission of Tamimi Ahmad. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • How to Develop Event-Driven Architectures
  • Event Driven Architecture (EDA) - Optimizer or Complicator
  • Next Evolution in Integration: Architecting With Intent Using Model Context Protocol
  • Golang: Is It a Memory Leak?

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!