DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Using Python Libraries in Java
  • Designing a Java Connector for Software Integrations
  • How to Convert XLS to XLSX in Java
  • Recurrent Workflows With Cloud Native Dapr Jobs

Trending

  • AI's Dilemma: When to Retrain and When to Unlearn?
  • Artificial Intelligence, Real Consequences: Balancing Good vs Evil AI [Infographic]
  • Performing and Managing Incremental Backups Using pg_basebackup in PostgreSQL 17
  • MySQL to PostgreSQL Database Migration: A Practical Case Study
  1. DZone
  2. Coding
  3. Java
  4. Streaming Java CompletableFutures in Completion Order

Streaming Java CompletableFutures in Completion Order

Want to see what happens when you combine Stream and CompletableFutures in Java?

By 
Grzegorz Piwowarek user avatar
Grzegorz Piwowarek
·
Updated Jun. 03, 19 · Tutorial
Likes (12)
Comment
Save
Tweet
Share
28.6K Views

Join the DZone community and get the full member experience.

Join For Free

Java 8 brought us tools like CompletableFuture and Stream API… let’s try to combine them both and create a Stream that returns values from a collection of CompletableFutures as they arrive.

This approach was also employed when developing 1.0.0 of parallel-collectors.

Streaming CompletableFutures

Essentially, what are trying to do is to implement a solution that would allow us to convert a collection of futures into a stream of values returned by those futures:

Collection<CompletableFuture<T>> -> Stream<T>


In the world of Java, that could be achieved by using, for example, a static method:

public static <T> Stream<T> inCompletionOrder(Collection<CompletableFuture<T>> futures) {
    // ...
}


To create a custom Stream, one needs to implement a custom java.util.Spliterator:

final class CompletionOrderSpliterator<T> 
  implements Spliterator<T> { ... }


And now, we can finish up with the implementation of our static method:

public static <T> Stream<T> completionOrder(Collection<CompletableFuture<T>> futures) {
    return StreamSupport.stream(
      new CompletionOrderSpliterator<>(futures), false);
}


That’s the easy part, let’s implement the CompletionOrderSpliterator now.

Implementing CompletionOrderSpliterator

To implement our Spliterator, we’ll need to fill in the blanks provide custom implementations of the following methods:

final class CompletionOrderSpliterator<T> implements Spliterator<T> {

    CompletionOrderSpliterator(Collection<CompletableFuture<T>> futures) {
        // TODO
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action) {
        // TODO
    }

    @Override
    public Spliterator<T> trySplit() {
        // TODO
    }

    @Override
    public long estimateSize() {
        // TODO
    }

    @Override
    public int characteristics() {
        // TODO
    }
}


Naturally, we need a proper constructor as well.

The most natural way of approaching the problem would involve making a working copy of the source collection, waiting for any future to complete, removing it from the collection and feeding it to the Spliterator itself.

Waiting for any future to complete can be quickly done using  CompletableFuture#anyOf, and it handles exception propagation correctly out of the box.

However, there’s a slight complication.

If you look at the signature of  CompletableFuture#anyOf, you will see that it’s not very practical because it accepts multiple  CompletableFutures<?> and returns a single  CompletableFuture< Object> , but this is not the main issue here (just a slight inconvenience).

The real problem is that the  CompletableFuture<Object> returned by the method is not the future that completed first, but a new CompletableFuture instance that completes when any future completes.

This makes the whole idea of waiting for a future and then removing it from a list of remaining futures a bit complicated. We can’t rely on reference equality, so we can either do a linear scan after each signal from  CompletableFuture#anyOf, or try to come up with something better.

The naive solution could look like:

private T takeNextCompleted() {
    anyOf(futureQueue.toArray(new CompletableFuture[0])).join();

    CompletableFuture<T> next = null;
    for (CompletableFuture<T> future : futureQueue) {
        if (future.isDone()) {
            next = future;
            break;
        }
    }
    futureQueue.remove(next);

    return next.join();
}


I’m doing a linear scan and store the index for the sake of constant-time removal. If you want to know why I’m passing 0 to the CompletableFuture[], although I know what the size is, check this article.

If you look at the problem from a pragmatic point of view, this should be good enough since no one would ever expect to iterate on a collection of futures that’s more than 10-20 thousand in size (although, it's absolutely possible since CompletableFutures are not bound to threads and millions of them can be completed by a single thread) because of the hardware thread-count limitations (actual number can vary a lot depending on multiple factors, for example, stack size).

However, that might change once Project Loom goes live.

Still, 20000 iterations would result in visiting anything between 20000 nodes optimistically(it’s always the first future that completes) to 200000000 nodes pessimistically.

What could we do about it if we can’t rely on referential equality or hashcodes of CompletableFutures?

We could assign our ids to them and store them in a map along with matching futures and then make futures identify themselves by returning indexes alongside actual values by returning a pair.

So, let’s store our futures in a map:

private final Map<Integer, CompletableFuture<Map.Entry<Integer, T>>> indexedFutures;


Now, we could manually assign ids from a monotonically increasing sequence, and make futures return them as well:

private static <T> Map<Integer, CompletableFuture<Map.Entry<Integer, T>>> toIndexedFutures(List<CompletableFuture<T>> futures) {

    Map<Integer, CompletableFuture<Map.Entry<Integer, T>>> map
      = new HashMap<>(futures.size(), 1); // presizing the HashMap since we know the capacity and expected collisions count (0)

    int seq = 0;
    for (CompletableFuture<T> future : futures) {
        int index = seq++;
        map.put(
          index, 
          future.thenApply(
            value -> new AbstractMap.SimpleEntry<>(index, value)));
    }
    return map;
}


And now, we can efficiently find and process the next completed future by waiting for it, reading the sequence number and then using it to remove the future from the list of remaining ones:

private T nextCompleted() {
    return anyOf(indexedFutures.values()
      .toArray(new CompletableFuture[0]))
        .thenApply(result -> ((Map.Entry<Integer, T>) result))
        .thenApply(result -> {
            indexedFutures.remove(result.getKey());
            return result.getValue();
        }).join();
}


Implementation of tryAdvance() becomes trivial:

@Override
public boolean tryAdvance(Consumer<? super T> action) {
    if (!indexedFutures.isEmpty()) {
        action.accept(nextCompleted());
        return true;
    } else {
        return false;
    }
}


The hardest part is behind us, now we need to implement three remaining methods:

@Override
public Spliterator<T> trySplit() {
    return null; // because splitting is not allowed
}

@Override
public long estimateSize() {
    return indexedFutures.size(); // because we know the size
}

@Override
public int characteristics() {
    return 
      SIZED       // because we know the size upfront
      | IMMUTABLE // because the source can be safely modified
      | NONNULL;  // because nulls in source are not accepted
}


And here we are.

Working Example

We can quickly validate that it works appropriately by introducing a random processing lag when going through a sorted sequence:

public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(10);

    List<CompletableFuture<Integer>> futures = Stream
      .iterate(0, i -> i + 1)
      .limit(100)
      .map(i -> CompletableFuture.supplyAsync(
        withRandomDelay(i), executorService))
      .collect(Collectors.toList());

    completionOrder(futures)
      .forEach(System.out::println);
}

private static Supplier<Integer> withRandomDelay(Integer i) {
    return () -> {
        try {
            Thread.sleep(ThreadLocalRandom.current()
              .nextInt(10000));
        } catch (InterruptedException e) {
            // ignore shamelessly, don't do this on production
        }
        return i;
    };
}


And you can see that values get returned not in the original order:

6
5
2
4
1
11
8
12
3

Streaming Futures in Original Order

What if we want different semantics and simply maintain the original order?

Luckily, that can be achieved quickly without any particular infrastructure:

public static <T> Stream<T> originalOrder(
  Collection<CompletableFuture<T>> futures) {
    return futures.stream().map(CompletableFuture::join);
}


A Complete Example

package com.pivovarit.collectors;

import java.util.AbstractMap;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Spliterator;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

import static java.util.concurrent.CompletableFuture.anyOf;

/**
 * @author Grzegorz Piwowarek
 */
final class CompletionOrderSpliterator<T> implements Spliterator<T> {

    private final Map<Integer, CompletableFuture<Map.Entry<Integer, T>>> indexedFutures;

    CompletionOrderSpliterator(Collection<CompletableFuture<T>> futures) {
        indexedFutures = toIndexedFutures(futures);
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action) {
        if (!indexedFutures.isEmpty()) {
            action.accept(nextCompleted());
            return true;
        } else {
            return false;
        }
    }

    private T nextCompleted() {
        return anyOf(indexedFutures.values().toArray(new CompletableFuture[0]))
          .thenApply(result -> ((Map.Entry<Integer, T>) result))
          .thenApply(result -> {
              indexedFutures.remove(result.getKey());
              return result.getValue();
          }).join();
    }

    @Override
    public Spliterator<T> trySplit() {
        return null;
    }

    @Override
    public long estimateSize() {
        return indexedFutures.size();
    }

    @Override
    public int characteristics() {
        return SIZED | IMMUTABLE | NONNULL;
    }

    private static <T> Map<Integer, CompletableFuture<Map.Entry<Integer, T>>> toIndexedFutures(Collection<CompletableFuture<T>> futures) {
        Map<Integer, CompletableFuture<Map.Entry<Integer, T>>> map = new HashMap<>(futures.size(), 1);

        int counter = 0;
        for (CompletableFuture<T> f : futures) {
            int index = counter++;
            map.put(index, f.thenApply(value -> new AbstractMap.SimpleEntry<>(index, value)));
        }
        return map;
    }
}


The complete working example can be found on GitHub as well.

Do you have an idea of how to make it better? Don’t hesitate, and let me know!

Java (programming language)

Published at DZone with permission of Grzegorz Piwowarek, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Using Python Libraries in Java
  • Designing a Java Connector for Software Integrations
  • How to Convert XLS to XLSX in Java
  • Recurrent Workflows With Cloud Native Dapr Jobs

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends: