DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Related

  • Building Microservice in Golang
  • DevOps Fast Forward with Go
  • Tired of Spring Overhead? Try Dropwizard for Your Next Java Microservice
  • Graceful Shutdown: Spring Framework vs Golang Web Services

Trending

  • Designing a Java Connector for Software Integrations
  • Customer 360: Fraud Detection in Fintech With PySpark and ML
  • Mastering Advanced Aggregations in Spark SQL
  • Infrastructure as Code (IaC) Beyond the Basics
  1. DZone
  2. Data Engineering
  3. Data
  4. Implementing NATS JetStream as Queues in Golang

Implementing NATS JetStream as Queues in Golang

Dive into implementing NATS JetStream as queues in Golang with this guide. Learn essential concepts and techniques for building scalable distributed systems.

By 
Vlad Gukasov user avatar
Vlad Gukasov
·
May. 02, 23 · Tutorial
Likes (2)
Comment
Save
Tweet
Share
10.3K Views

Join the DZone community and get the full member experience.

Join For Free

In recent years, the increasing demand for efficient and scalable distributed systems has driven the development and adoption of various message queuing solutions. These solutions enable the decoupling of components within distributed architectures, ensuring fault tolerance and load balancing.

Recently, we faced the challenge of selecting a message queue system for a new project in our microservice architecture. After conducting extensive research and evaluation, we chose NATS JetStream. This decision led us to explore the integration of NATS JetStream with Golang, which ultimately served as the basis for this article.

This article aims to comprehensively understand implementing NATS JetStream as a queue in Golang, covering fundamental concepts, methodologies, and advanced techniques.

Fundamentals of Message Queue Systems

First, we need to understand message queue systems. At their core, these systems are designed to facilitate communication between distributed components within a software architecture. They enable the exchange of messages containing information or commands between different components in a decoupled manner. This decoupling allows for enhanced fault tolerance, scalability, and maintainability, as components can function independently without direct knowledge of one another.

Message queue systems typically implement the publish-subscribe pattern or point-to-point communication, ensuring that messages are delivered to the appropriate recipients. Key aspects of any message queue system include message persistence, guaranteed delivery, and the ability to handle varying workloads.

Message queue system architecture









History of NATS

Today, NATS JetStream is an excellent implementation of a message queue system, but this was not the case several years ago. It has evolved significantly since its inception, transitioning from a simple publish-subscribe system to a more feature-rich, resilient, and scalable messaging solution. This journey consists of three main stages: NATS as a Pub/Sub system, NATS Streaming, and NATS JetStream.

NATS as a Pub/Sub System

NATS was created in 2010. It was designed as a lightweight, high-performance publish-subscribe system in its early days. As a result, it was well-suited for real-time message passing, offering low-latency communication and ease of use. However, it lacked essential features like message persistence, durability, and guaranteed delivery. These limitations made NATS less suitable for scenarios that required reliable message delivery and storage.

NATS Streaming

NATS Streaming was introduced to address the original NATS system's limitations. This new iteration brought features such as message persistence, at-least-once delivery guarantees, and support for message replay. While NATS Streaming significantly improved the reliability and capability of NATS, it had some drawbacks: 

  • Inability to remove acknowledged messages from the system 
  • The lack of "Nak" (not acknowledge) support for messages
  • Clients being restricted to only receiving pushed messages without the option to pull messages
  • A lack of horizontal scalability.

The drawbacks led the community to create a new implementation of NATS that has a built-in mechanism called "JetStream."

NATS JetStream

NATS JetStream combines the strengths of both NATS and NATS Streaming while addressing their weaknesses. The main features of NATS JetStream include:

  • Streams, consumers, and messages: Enhances the messaging solution for better organization and management of message flows
  • Push and pull subscriptions: Allows clients to consume messages at their own pace
  • NAck (not acknowledge) with optional delay: Provides better message handling control
  • Event streaming capabilities: Supports event-driven architectures, similar to Apache Kafka
  • Durability, replication, and clustering: Ensures message persistence, high availability, and load balancing
  • Full integration with the NATS ecosystem: Simplifies deployment and management

Let us consider NATS JetStream basics concepts before we use it in a Golang app.

NATS JetStream: Theoretical Background

As previously mentioned, NATS JetStream is built upon the foundation of three primary concepts: streams, consumers, and messages. In essence, streams serve as channels that store a sequence of messages, facilitating the organization and management of message flows. Furthermore, each stream can have multiple subjects, allowing for the categorization of messages based on their content or purpose. Therefore, streams can be considered queues when viewing NATS JetStream as a message queue system.

Each stream (or queue) is composed of messages. These messages contain the information or commands to be exchanged between distributed components. Publishers send messages to streams, and recipients subsequently consume them.

These recipients are known as "consumers." They are responsible for reading and processing messages from subjects. Consumers can be considered subscribers in the publish-subscribe pattern. They receive and act upon messages that match their configured subjects. Consumers can also acknowledge the receipt and processing of messages, which helps manage message delivery and ensures that messages are successfully consumed.

NATS Jetstream Architecture

We are now well-equipped to delve into the integration with NATS JetStream. The following sections will explore the practical aspects of implementing NATS JetStream publishers and consumers using the Golang programming language.

Sending Messages With NATS JetStream in Golang

In this section, we will first explore the implementation of NATS JetStream publishers in Golang before moving on to consumers.

First, connect to the NATS JetStream server using the "nats.go" library. To do that, import the NATS package in a Golang file. Then, use the nats.Connect() function to establish a connection, specifying the appropriate server address and options.

Go
 
import (
	"github.com/nats-io/nats.go"
)

func connectToNATS() (*nats.Conn, error) {
	nc, err := nats.Connect("nats://nats:4222")
	if err != nil {
		return nil, fmt.Errorf("nats connect: %w", err)
	}

	return nc, nil
}


With a connection in place, the next step is to create a JetStream context, which can be done using the nats.JetStream() function. This context will interact with the JetStream system, allowing for the creation of streams and the publishing of messages.

Go
 
func natsJetStream(nc *nats.Conn) (nats.JetStreamContext, error) {
	jsCtx, err := nc.JetStream()
	if err != nil {
		return nil, fmt.Errorf("jetstream: %w", err)
	}
	
	return jsCtx, nil
}


Once the JetStream context is acquired, it is essential to ensure that the desired stream exists or create one if necessary. The jsCtx.AddStream() method can specify the stream's configuration, including its name, subjects, storage type, and retention policy. It is important to note that stream creation is typically an administrative task and may only be required in some publisher implementations.

Go
 
func createStream(ctx context.Context, jsCtx nats.JetStreamContext) (*nats.StreamInfo, error) {
	stream, err := jsCtx.AddStream(&nats.StreamConfig{
		Name:              "test_stream",
		Subjects:          []string{"subject.1", "subject.2", "subject.N"},
		Retention:         nats.InterestPolicy, // remove acked messages
		Discard:           nats.DiscardOld,     // when the stream is full, discard old messages
		MaxAge:            7 * 24 * time.Hour,  // max age of stored messages is 7 days
		Storage:           nats.FileStorage,    // type of message storage
		MaxMsgsPerSubject: 100_000_000,         // max stored messages per subject
		MaxMsgSize:        4 << 20,             // max single message size is 4 MB
		NoAck:             false,               // we need the "ack" system for the message queue system
	}, nats.Context(ctx))
	if err != nil {
		return nil, fmt.Errorf("add stream: %w", err)
	}

	return stream, nil
}


With the stream in place, publishing messages to NATS JetStream is straightforward. The nats.Publish() method can be called on the JetStream context, providing the subject and payload for the message.

Go
 
func publishMsg(nc *nats.Conn, subject string, payload []byte) error {
	err := nc.Publish(subject, payload)
	if err != nil {
		return fmt.Errorf("publish: %w", err)
	}

	return nil
}


We have successfully published a message to NATS JetStream, where it is stored in the specified stream. It is time to implement a consumer to receive and process the message.

Consuming Messages With NATS JetStream in Golang Using Pull Subscriptions

This section will discuss the process of consuming messages from NATS JetStream using the Golang programming language and pulling subscriptions, providing a comprehensive understanding of the necessary steps.

First, ensure a connection to the NATS JetStream server is established using the "nats.go" library, as described in the previous chapter. Then, with a connection in place, create a JetStream context using the nats.JetStream() function, allowing you to interact with the JetStream system and consume messages from streams.

Next, it is crucial to configure and create a consumer for the desired stream. The jsCtx.AddConsumer() method can specify the consumer's configuration, such as its name, durable name, and acknowledgment policy. 

Go
 
func createConsumer(ctx context.Context, jsCtx nats.JetStreamContext, consumerGroupName, streamName string) (*nats.ConsumerInfo, error) {
	consumer, err := jsCtx.AddConsumer(streamName, &nats.ConsumerConfig{
		Durable:       consumerGroupName,      // durable name is the same as consumer group name
		DeliverPolicy: nats.DeliverAllPolicy,  // deliver all messages, even if they were sent before the consumer was created
		AckPolicy:     nats.AckExplicitPolicy, // ack messages manually
		AckWait:       5 * time.Second,        // wait for ack for 5 seconds
		MaxAckPending: -1,                     // unlimited number of pending acks
	}, nats.Context(ctx))
	if err != nil {
		return nil, fmt.Errorf("add consumer: %w", err)
	}

	return consumer, nil
}


Once the consumer is created and configured, the next step is subscribing to the relevant subject in the stream. For the scope of this article, we will focus on pull subscriptions, which allow for more direct control over message delivery.

Go
 
func subscribe(ctx context.Context, js nats.JetStreamContext, subject, consumerGroupName, streamName string) (*nats.Subscription, error) {
	pullSub, err := js.PullSubscribe(
		subject,
		consumerGroupName,
		nats.ManualAck(),                         // ack messages manually
		nats.Bind(streamName, consumerGroupName), // bind consumer to the stream
		nats.Context(ctx),                        // use context to cancel the subscription
	)
	if err != nil {
		return nil, fmt.Errorf("pull subscribe: %w", err)
	}

	return pullSub, nil
}


To consume messages, use the Fetch() method on the Subscription() object. This method will return the following available message from the stream, allowing an application to process it accordingly. It is crucial to acknowledge the message using the msg.Ack() method; otherwise, it will be redelivered continuously.

Go
 
func fetchOne(ctx context.Context, pullSub *nats.Subscription) (*nats.Msg, error) {
	msgs, err := pullSub.Fetch(1, nats.Context(ctx))
	if err != nil {
		return nil, fmt.Errorf("fetch: %w", err)
	}
	if len(msgs) == 0 {
		return nil, errors.New("no messages")
	}

	return msgs[0], nil
}


We considered the essential way of integration with NATS JetStream. In the following sections, we will explore some advanced features of NATS JetStream to enhance the system's capabilities further.

Delving Deeper: Advanced NATS JetStream Features in Golang

As we continue our exploration of NATS JetStream integration with Golang, we must understand some advanced concepts that can improve the reliability and durability of distributed systems.

Handling NATS Reconnections in Golang

In real-world scenarios, network disruptions and server failures can lead to connection losses between the client and the NATS server. Therefore, it is crucial to handle reconnections gracefully. The "nats.go" library provides built-in reconnection support, allowing you to specify a custom reconnection logic. Configuring these options allows a Golang application to recover from connection losses and ensure continued operation.

Go
 
func SetReconnectionHandler(nc *nats.Conn) {
	nc.SetReconnectHandler(func(nc *nats.Conn) {
		// Update the connection in the publisher and consumers
		...
	})
}


Stream and Consumer Durability

Durability is an essential aspect of ensuring message persistence and reliable delivery. In NATS JetStream, both streams and consumers can be made durable by specifying a durable name when creating them. This feature ensures that their state is preserved across restarts or failures, allowing messages to be stored reliably and consumers to resume processing messages from where they left off.

To create a durable stream, provide a name in the stream configuration. Also, for a durable consumer, specify a durable name in the consumer configuration. The steps configure NATS JetStream to store the state of these entities, making them resilient to failures and ensuring reliable message delivery.

Go
 
...
stream, err := jsCtx.AddStream(&nats.StreamConfig{
  ...
  Name:              "test_stream",
  Subjects:          []string{"subject.1", "subject.2", "subject.N"},
  MaxAge:            7 * 24 * time.Hour,  // max age of stored messages is 7 days
  Storage:           nats.FileStorage,    // type of message storage
  ...
}, nats.Context(ctx))
...
consumer, err := jsCtx.AddConsumer(streamName, &nats.ConsumerConfig{
  Durable:       consumerGroupName,      // durable name is the same as consumer group name
  ...
}, nats.Context(ctx))
...


Incorporating these advanced concepts into NATS JetStream implementation with Golang allows us to create a more resilient and reliable messaging system for our distributed architecture.

Conclusion

In conclusion, NATS JetStream offers a robust, scalable, and feature-rich message queue solution for distributed systems. By integrating it with Golang, developers can harness its capabilities to create resilient, high-performance applications. This article provides a comprehensive understanding of essential concepts, methodologies, and advanced techniques for implementing NATS JetStream as queues in Golang. By delving into the theoretical background, connecting to NATS JetStream, publishing and consuming messages, and exploring advanced features like reconnections and durability, developers can efficiently build and maintain reliable distributed systems that effectively address modern architectural challenges.

Golang Message queue Systems architecture microservice

Opinions expressed by DZone contributors are their own.

Related

  • Building Microservice in Golang
  • DevOps Fast Forward with Go
  • Tired of Spring Overhead? Try Dropwizard for Your Next Java Microservice
  • Graceful Shutdown: Spring Framework vs Golang Web Services

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!