Stream Deduplication With Hazelcast Jet
Learn why deduplication is important, and how implement it with Hazelcast Jet. Find out more about implementation ideas.
Join the DZone community and get the full member experience.
Join For FreeHazelcast Jet 3.2 introduces a stateful map, filter, and flat map operations, which are very strong primitives. In this blog, I am going to show you how to use a stateful filter for detecting and removing duplicate elements in a stream.
You may also like: Java: How to Become More Productive With Hazelcast in Less Than 5 Minutes
Why Deduplication?
Deduplication is often used to achieve idempotency or effectively-once delivery semantics in messaging systems. Imagine you have a microservices architecture where individual microservices use a message broker to communicate with each other. Achieving exactly-once semantics is a hard problem.
If you cannot have exactly-once then you are typically left with at-most-once and at-least-once semantics. At-most-once means messages can get lost. This is often unacceptable. At-least-once means messages cannot get lost, but some can be delivered more than once.
This is oftentimes better than losing messages, yet for some use cases, it's still not good enough. The common solution to this problem is effectively-once. It's essentially at-least-once combined with duplicate detection and removal.
Implementation Idea
The deduplication process is usually straightforward. Producers attach a unique ID to each message. Consumers track all processed message IDs and discard messages with already observed IDs. This is often easy for batch processing as each batch has a finite size. Thus, it's often feasible to store all IDs observed in a given batch.
Streaming systems are different beasts. Streams are conceptually infinite and it's not feasible to hold all observed IDs, let alone in memory. On the other hand, it's often sensible to assume duplicated messages will be close to each other. Hence we can introduce time-to-live for each ID and remove it from memory when the time-to-live expires.
Implementation With Hazelcast Jet
Let's say I am running a discussion forum and I have a microservice that sends a new message whenever a user posts a new comment. The message looks like this:
public final class AddNewComment implements Serializable {
private UUID uuid;
private String comment;
private long authorId;
}
The UUID field is unique for each message posted. My consumer is a Hazelcast Jet application, and I want a processing pipeline to discard all messages with a UUID already processed in the past. It turns out to be really trivial:
stage.groupingKey(AddNewComment::getUuid)
.filterStateful(10_000, () -> new boolean[1],
(s, i) -> {
boolean res = s[0];
s[0] = true;
return !res;
}
);
How does it work? In the first step, we group a stream of incoming comments by UUID. In the next step, we apply a filter with an array of Booleans used as a state object. The state object will be created for each UUID.
When a UUID is observed for the first time, the element inside the array is false, so the code will flip it to true and the filtering function returns true. This means the object will not be discarded.
If at some point the stream receives another comment with the same UUID, then the filtering function receives the state object where the Boolean inside the array is already set to true. This means the filtering function will return false, and the duplicated object will be discarded.
The first parameter in the filterStateful() method is time-to-live. Event time is typically in milliseconds. This means each state object will be retained for at least 10 seconds. We have to choose this parameter to match the longest possible time window between two duplicated elements.
Further Improvements
Let's encapsulate the filtering logic into a reusable unit that can be applied to an arbitrary pipeline. We are going to use the apply() method to transform a pipeline. A utility class with this method is all that's needed:
public static <T> FunctionEx<StreamStage<T>, StreamStage<T>> deduplicationWindow(long window,
FunctionEx<T, ?> extractor) {
return stage -> stage.groupingKey(extractor)
.filterStateful(window, () -> new boolean[1],
(s, i) -> {
boolean res = s[0];
s[0] = true;
return !res;
}
);
}
Whenever you need to add deduplication into a pipeline, you can simply call:
pipelineStage.apply(StreamUtils
.deduplicationWindow(WINDOW_LENGTH, ID_EXTRACTOR_FUNCTION)
[...]
This makes the deduplication logic independent from your business logic and you can reuse the same deduplication utility across all your pipelines.
Summary
I have demonstrated the power of stateful stream processing and the simplicity of the Jet API. It only takes a few lines of code to implement custom stream deduplication. Visit the Hazelcast Jet page for more info, or stop by our Gitter chat and let us know what you think!
Further Reading
How to Use Hazelcast Auto-Discovery With Eureka
Published at DZone with permission of Jarett Lear, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments