DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports Events Over 2 million developers have joined DZone. Join Today! Thanks for visiting DZone today,
Edit Profile Manage Email Subscriptions Moderation Admin Console How to Post to DZone Article Submission Guidelines
View Profile
Sign Out
Refcards
Trend Reports
Events
Zones
Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
  1. DZone
  2. Coding
  3. Java
  4. Composing Futures with Akka

Composing Futures with Akka

Nishant Chandra user avatar by
Nishant Chandra
·
Mar. 17, 13 · Interview
Like (0)
Save
Tweet
Share
14.93K Views

Join the DZone community and get the full member experience.

Join For Free

Composing Futures provides a way to do two (or more) things at the same time and then wait until they are done. Typically in Java this would be done with a ExecutorService.

It is very often desirable to be able to combine different Futures with each other. Akka provides additional constructs that greatly simplifies some commons uses cases like making parallel remote service calls, collect and map results.

In this article, I will create a simple Java program that explores Akka composing futures. The sample programs works with Akka 2.1.1.

    <dependency>
        <groupid>com.typesafe.akka</groupid>
        <artifactid>akka-actor_2.10</artifactid>
        <version>2.1.1</version>
    </dependency>

In the Scala Standard Library, a Future is a data structure used to retrieve the result of some concurrent operation. This result can be accessed synchronously (blocking) or asynchronously (non-blocking). To be able to use this from Java, Akka provides a Java friendly interface in akka.dispatch.Futures.

Lets setup a Callable class that does some work and then returns a result. For this example, the work is just to pause for a random amount of time and the result is the amount of time it paused for.

import java.util.concurrent.Callable;

public class RandomPause implements Callable<Long> {

    private Long millisPause;

    public RandomPause() {
        millisPause = Math.round(Math.random() * 3000) + 1000; // 1,000 to 4,000
        System.out.println(this.toString() + " will pause for " + millisPause
                + " milliseconds");
    }

    public Long call() throws Exception {
        Thread.sleep(millisPause);
        System.out.println(this.toString() + " was paused for " + millisPause
                + " milliseconds");
        return millisPause;
    }
}
Akka's Future has several monadic methods that are very similar to the ones used by Scala's collections. These allow you to create 'pipelines' or 'streams' that the result will travel through.

Here is Java app to compose the RandomPause futures.
import static akka.dispatch.Futures.future;
import static akka.dispatch.Futures.sequence;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import scala.concurrent.Await;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import akka.dispatch.ExecutionContexts;
import akka.dispatch.Mapper;

public class SimpleFutures {
    public static void main(String[] args) {

        ExecutorService executor = Executors.newFixedThreadPool(4);
        ExecutionContext ec = ExecutionContexts.fromExecutorService(executor);

        List<future<Long>> futures = new ArrayList<future<Long>>();

        System.out.println("Akka Futures says: Adding futures for two random length pauses");

        futures.add(future(new RandomPause(), ec));
        futures.add(future(new RandomPause(), ec));

        System.out.println("Akka Futures says: There are " + futures.size()
                + " RandomPause's currently running");

        // compose a sequence of the futures
        Future<Iterable<Long>> futuresSequence = sequence(futures, ec);

        // Find the sum of the odd numbers
        Future<Long> futureSum = futuresSequence.map(
                new Mapper<Iterable<Long>, Long>() {
                    public Long apply(Iterable<Long> ints) {
                        long sum = 0;
                        for (Long i : ints)
                            sum += i;
                        return sum;
                    }
                }, ec);

        // block until the futures come back
        futureSum.onSuccess(new PrintResult<Long>(), ec);

        try {
                System.out.println("Result :" + Await.result(futureSum, Duration.apply(5, TimeUnit.SECONDS)));
        } catch (Exception e) {
                e.printStackTrace();
        } 
        
        executor.shutdown();
    }

}

Explanation:
In order to execute callbacks and operations, Futures need something called an ExecutionContext, which is very similar to a java.util.concurrent.Executor. In the above program, I have provided my own ExecutorService and passed it to factory methods provided by the ExecutionContexts.

Take note of 'sequence' that combines different Futures with each other.

To better explain what happened in the example, Future.sequence is taking the Iterable<Future<Long>> and turning it into a Future<Iterable<Long>>. We can then use map to work with the Iterable<Long> directly, and we aggregate the sum of the Iterable.

Finally, PrintResult simply prints the output of futureSum.
public final class PrintResult<T> extends OnSuccess<T> {
    
    @Override
    public final void onSuccess(T t) {

        System.out.println("PrintResults says: Total pause was for " + ((Long) t)
                + " milliseconds");
    }
}

Output:
Akka Futures says: Adding futures for two random length pauses
RandomPause@55e859c0 will pause for 3892 milliseconds
RandomPause@5430d082 will pause for 2306 milliseconds

Akka Futures says: There are 2 RandomPause's currently running
RandomPause@5430d082 was paused for 2306 milliseconds
RandomPause@55e859c0 was paused for 3892 milliseconds

PrintResults says: Total pause was for 6198 milliseconds
Note: Akka Actors are not used in this example.
Akka (toolkit)

Published at DZone with permission of Nishant Chandra, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Popular on DZone

  • DZone's Article Submission Guidelines
  • What Was the Question Again, ChatGPT?
  • API Design Patterns Review
  • Microservices Discovery With Eureka

Comments

Partner Resources

X

ABOUT US

  • About DZone
  • Send feedback
  • Careers
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 600 Park Offices Drive
  • Suite 300
  • Durham, NC 27709
  • support@dzone.com
  • +1 (919) 678-0300

Let's be friends: