DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
View Events Video Library
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Integrating PostgreSQL Databases with ANF: Join this workshop to learn how to create a PostgreSQL server using Instaclustr’s managed service

Mobile Database Essentials: Assess data needs, storage requirements, and more when leveraging databases for cloud and edge applications.

Monitoring and Observability for LLMs: Datadog and Google Cloud discuss how to achieve optimal AI model performance.

Automated Testing: The latest on architecture, TDD, and the benefits of AI and low-code tools.

Related

  • Building Real-Time Applications to Process Wikimedia Streams Using Kafka and Hazelcast
  • Boosting Similarity Search With Stream Processing
  • Kafka: Powerhouse Messaging
  • Medallion Architecture: Efficient Batch and Stream Processing Data Pipelines With Azure Databricks and Delta Lake

Trending

  • Bad Software Examples: How Much Can Poor Code Hurt You?
  • Choosing the Appropriate AWS Load Balancer: ALB vs. NLB
  • TypeScript: Useful Features
  • How To Validate Archives and Identify Invalid Documents in Java

Stream Deduplication With Hazelcast Jet

Learn why deduplication is important, and how implement it with Hazelcast Jet. Find out more about implementation ideas.

Jarett Lear user avatar by
Jarett Lear
·
Dec. 02, 19 · Tutorial
Like (1)
Save
Tweet
Share
3.38K Views

Join the DZone community and get the full member experience.

Join For Free

Jet flying

Learn how to stream deduplication with Hazelcast Jet.


Hazelcast Jet 3.2 introduces a stateful map, filter, and flat map operations, which are very strong primitives. In this blog, I am going to show you how to use a stateful filter for detecting and removing duplicate elements in a stream.

You may also like: Java: How to Become More Productive With Hazelcast in Less Than 5 Minutes

Why Deduplication?

Deduplication is often used to achieve idempotency or effectively-once delivery semantics in messaging systems. Imagine you have a microservices architecture where individual microservices use a message broker to communicate with each other. Achieving exactly-once semantics is a hard problem.

If you cannot have exactly-once then you are typically left with at-most-once and at-least-once semantics. At-most-once means messages can get lost. This is often unacceptable. At-least-once means messages cannot get lost, but some can be delivered more than once.

This is oftentimes better than losing messages, yet for some use cases, it's still not good enough. The common solution to this problem is effectively-once. It's essentially at-least-once combined with duplicate detection and removal.

Implementation Idea

The deduplication process is usually straightforward. Producers attach a unique ID to each message. Consumers track all processed message IDs and discard messages with already observed IDs. This is often easy for batch processing as each batch has a finite size. Thus, it's often feasible to store all IDs observed in a given batch.

Streaming systems are different beasts. Streams are conceptually infinite and it's not feasible to hold all observed IDs, let alone in memory. On the other hand, it's often sensible to assume duplicated messages will be close to each other. Hence we can introduce time-to-live for each ID and remove it from memory when the time-to-live expires.

Implementation With Hazelcast Jet

Let's say I am running a discussion forum and I have a microservice that sends a new message whenever a user posts a new comment. The message looks like this:

public final class AddNewComment implements Serializable {
  private UUID uuid;
  private String comment;
  private long authorId;
}


The UUID field is unique for each message posted. My consumer is a Hazelcast Jet application, and I want a processing pipeline to discard all messages with a UUID already processed in the past. It turns out to be really trivial:

stage.groupingKey(AddNewComment::getUuid)
  .filterStateful(10_000, () -> new boolean[1],
    (s, i) -> {
    boolean res = s[0];
    s[0] = true;
    return !res;
  }
);


How does it work? In the first step, we group a stream of incoming comments by UUID. In the next step, we apply a filter with an array of Booleans used as a state object. The state object will be created for each UUID.

When a UUID is observed for the first time, the element inside the array is false, so the code will flip it to true and the filtering function returns true. This means the object will not be discarded.

If at some point the stream receives another comment with the same UUID, then the filtering function receives the state object where the Boolean inside the array is already set to true. This means the filtering function will return false, and the duplicated object will be discarded.

The first parameter in the filterStateful() method is time-to-live. Event time is typically in milliseconds. This means each state object will be retained for at least 10 seconds. We have to choose this parameter to match the longest possible time window between two duplicated elements.

Further Improvements

Let's encapsulate the filtering logic into a reusable unit that can be applied to an arbitrary pipeline. We are going to use the apply() method to transform a pipeline. A utility class with this method is all that's needed:

public static <T> FunctionEx<StreamStage<T>, StreamStage<T>> deduplicationWindow(long window, 
                                             FunctionEx<T, ?> extractor) {
  return stage -> stage.groupingKey(extractor)
    .filterStateful(window, () -> new boolean[1],
      (s, i) -> {
        boolean res = s[0];
        s[0] = true;
        return !res;
      }
    );
}


Whenever you need to add deduplication into a pipeline, you can simply call:

pipelineStage.apply(StreamUtils
.deduplicationWindow(WINDOW_LENGTH, ID_EXTRACTOR_FUNCTION)
[...]


This makes the deduplication logic independent from your business logic and you can reuse the same deduplication utility across all your pipelines.

Summary

I have demonstrated the power of stateful stream processing and the simplicity of the Jet API. It only takes a few lines of code to implement custom stream deduplication. Visit the Hazelcast Jet page for more info, or stop by our Gitter chat and let us know what you think!


Further Reading

How to Use Hazelcast Auto-Discovery With Eureka

How to Use Embedded Hazelcast on Kubernetes

Hazelcast for Go Getters, Part 1

Hazelcast Stream processing

Published at DZone with permission of Jarett Lear, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Building Real-Time Applications to Process Wikimedia Streams Using Kafka and Hazelcast
  • Boosting Similarity Search With Stream Processing
  • Kafka: Powerhouse Messaging
  • Medallion Architecture: Efficient Batch and Stream Processing Data Pipelines With Azure Databricks and Delta Lake

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends: