Computer scienceProgramming languagesJavaWorking with dataMultithreadingTask processing

CompletableFuture

17 minutes read

Asynchronous programming is an essential part of modern software development. It allows applications to execute tasks concurrently, enhancing the throughput, speed, and scalability of backend services and their resource consumption. Consider the following examples:

  • I/O-bound operations;

  • Connections to third-party network services;

  • File operations, and more.

It proves invaluable in situations where the system must continue processing other tasks while waiting for time-consuming operations to conclude.

In this topic, we will delve deeper into asynchronous programming and explore the CompletableFuture class, which offers a contemporary API for crafting asynchronous code in Java.

Challenges in asynchronous programming

With the vast potential of asynchronous programming comes the complexity of its implementation. Here are some of the challenges encountered:

  • Concurrency control: Managing multiple execution threads can be intricate, especially when sharing resources or data. Proper synchronization is paramount to prevent issues like race conditions, deadlocks, or data inconsistencies.

  • Error handling: Asynchronous code errors can be trickier to manage than those in synchronous code. Exceptions thrown asynchronously may not propagate to the main execution thread, making them elusive and harder to address.

  • Debugging: Debugging asynchronous operations can pose challenges due to the nonlinear execution flow. Conventional debugging tools, often designed for synchronous, sequential code, might not offer a comprehensive view of the asynchronous application's activities.

  • Code complexity: Asynchronous code can rapidly become intricate and challenging to decipher, especially when dealing with nested callbacks, colloquially termed "callback hell." Such complexity can make the code harder to maintain and understand.

Limitations of the "Future" class

The initial approach to asynchronous programming in Java was introduced through the Future class. While it serves as a mechanism for managing asynchronous operations, it possesses several limitations that hinder its convenience. Here's a closer look:

  1. Inability to manually complete: The Future class doesn't allow manual completion of a Future. Once you instantiate a Future task, its execution remains beyond your control.

    ExecutorService executorService = Executors.newSingleThreadExecutor(); // creates a single thread
    Callable<String> asyncTask = () -> { //  task returns the string "Hello world" after 1 minute
       int ONE_MINUTE = 60000;   
       Thread.sleep(ONE_MINUTE);
       return "Hello from future!";
    };
    Future<String> future = executorService.submit(asyncTask); // hope that future will be complete

    For instance, there's no mechanism to set a default value to a Future or complete it based on specific conditions. You won't find a method akin to future.complete("default string value").

  2. No notification upon completion: The Future class doesn't offer notifications upon task completion. While you can employ Future.get() to retrieve the result, this method halts the execution of your program.

    ExecutorService executorService = Executors.newSingleThreadExecutor(); // creates a single thread
    Callable<String> asyncTask = () -> { //  task returns the string "Hello from future" after 10 minutes
        int TEN_MINUTES = 600000;
        Thread.sleep(TEN_MINUTES);
        return "Hello from future!";
    };
    Future<String> future = executorService.submit(asyncTask);
    
    try {
        String result = future.get(); // this will block until the result is available
    } catch (InterruptedException | ExecutionException e) {
        // Handle exception
    }

    Additionally, you cannot attach a callback function to the Future to trigger post-completion of the asynchronous task.

  3. Absence of exception handling: When a Future task encounters an exception, it's encapsulated within an ExecutionException and manifested through the get() method. The sole method for error mitigation entails enclosing the get() within a try-catch block and managing the prevalent ExecutionException.

    // FutureTaskExample.java
    Future<String> futureWithException = executorService.submit(() -> {
        throw new RuntimeException("Exception while some operations!");
    });
    
    try {
        String result = futureWithException.get(); // Throws ExecutionException
    } catch (InterruptedException | ExecutionException e) {
        // Handle exception
    }
  4. Lack of support for merging futures: The Future class doesn't supply utilities to amalgamate multiple Futures into a unified entity. If you're dealing with an assortment of Futures and aim to generate a new Future completed upon the culmination of all original tasks, manual orchestration is imperative.

CompletableFuture API

CompletableFuture is a Java class that simplifies the process of writing asynchronous, non-blocking, and multi-threaded code.

Introduced in Java 8 within the java.util.concurrent package, CompletableFuture enhances the original Future class, addressing many of the limitations we discussed earlier.

Let's explore some key methods offered by the CompletableFuture API.

  • get(), getNow(), complete()

    Here's a basic example of retrieving results from a CompletableFuture object.

    CompletableFuture<String> completableFuture = new CompletableFuture<>(); // create CompletableFuture
    
    try {
        // block the main thread, due to no value in completableFuture
        // completableFuture.get();
    
        // default value, when completableFuture is not completed
        String defaultValue = completableFuture.getNow("value before complete future");
    
        completableFuture.complete("value from future");
        String valueAfterComplete = completableFuture.get(); // get value future was completed
    
        System.out.println(defaultValue);
        System.out.println(valueAfterComplete);
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }

    The output will be:

    value before complete future
    value from future
  • supplyAsync(), runAsync()

    A major feature of CompletableFuture is its ability to run asynchronous operations. The methods supplyAsync() and runAsync() allow for operations to execute in a separate thread without blocking the main one. To illustrate this, we'll also display the thread executing the code:

    System.out.println("Main thread: " + Thread.currentThread().getName());
    CompletableFuture<Void> voidFuture = CompletableFuture.runAsync(() -> {
        try {
            int ONE_SECOND = 1000;
            Thread.sleep(ONE_SECOND);
            System.out.println("Void future thread: " + Thread.currentThread().getName());
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    });
    
    CompletableFuture<String> futureWithValue = CompletableFuture.supplyAsync(() -> {
        try {
            int ONE_SECOND = 1000;
            Thread.sleep(ONE_SECOND);
            System.out.println("Future with value thread: " + Thread.currentThread().getName());
            return "Value from future after 1 second";
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    });
    
    System.out.println("Some info from main thread");
    voidFuture.get();
    String valueFromFuture = futureWithValue.get();
    System.out.println(valueFromFuture);

    The output will be:

    Main thread: main
    Some info from main thread
    Void future thread: ForkJoinPool.commonPool-worker-1
    Future with value thread: ForkJoinPool.commonPool-worker-2
    Value from future after 1 second

  • join(), isCancelled(), isDone()

    Next, we'll examine methods that check the status of a CompletableFuture object. The method join() functions similarly to get(), but it throws a CompletionException if there's an underlying exception. The methods isCancelled() and isDone() indicate whether the future has been completed or canceled. Here's an example that simulates a delayed operation and uses these methods:

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        try {
            int ONE_SECOND = 1000;
            Thread.sleep(ONE_SECOND); // simulate some delay
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Hello from future after 1 second";
    });
    
    // check if task is done or canceled
    System.out.println("Future done      -> " + future.isDone());
    System.out.println("Future canceled -> " + future.isCancelled());
    
    // use join() to wait when the CompletableFuture will be finished and get the result
    String result = future.join();
    System.out.println("Join result      -> " + result);
    
    // check again if task is done or canceled
    System.out.println("Future done      -> " + future.isDone());
    System.out.println("Future canceled -> " + future.isCancelled());

    The output will be:

    Future done      -> false
    Future canceled -> false
    Join result      -> Hello from future after 1 second
    Future done      -> true
    Future canceled -> false

  • thenSupply(), thenApply(), thenRun(), thenAccept()

    Methods prefixed with "then" facilitate chaining operations by attaching callback functions. These functions are automatically invoked when the CompletableFuture completes.

    CompletableFuture<String> completableFuture = CompletableFuture
            .supplyAsync(() -> "thenApply(): First supply, ") // creates first supply
            .thenApply(supplyResult -> supplyResult + "then first callback") //apply as many callback as you need
            .thenApply(thenApplyResult -> thenApplyResult + "and second callback");
    
    System.out.println(completableFuture.get());
    
    CompletableFuture
            .supplyAsync(() -> "thenAccept(): First supply, ")
            .thenApply(supplyResult -> supplyResult + "then first callback")
            .thenAccept(thenAcceptResult -> System.out.println(thenAcceptResult + " and accept the result"))
            .get(); // return void CompletableFuture and apply the function with value from previous chain
    
    CompletableFuture
            .supplyAsync(() -> "First supply, ")
            .thenApply(supplyResult -> supplyResult + "then first callback")
            .thenRun(() -> System.out.println("thenRun(): Just run separate function"))
            .get(); // return void CompletableFuture and apply the function

    The output will be:

    thenApply(): First supply, then first callbackand second callback
    thenAccept(): First supply, then first callback and accept the result
    thenRun(): Just run separate function

  • thenApplyAsync(), thenAcceptAsync()

    While the previously discussed methods enable the attachment of numerous callbacks that all execute in one thread, there are times when callbacks need to run in different threads. For such scenarios, methods suffixed with "async" come into play.

    Here's a comparison of threads used with thenApply() versus thenApplyAsync():

    System.out.println("Main thread: " + Thread.currentThread().getName());
    CompletableFuture
            .supplyAsync(() -> Thread.currentThread().getName() + " | ")
            .thenApply(supplyResult -> supplyResult + Thread.currentThread().getName() + " | ")
            .thenAccept(thenApplyResult -> System.out.println(thenApplyResult + Thread.currentThread().getName()))
            .get();

    We observe that in the first example, all callbacks are executed using the same separate thread:

    Main thread: main
    ForkJoinPool.commonPool-worker-1 | ForkJoinPool.commonPool-worker-1 | ForkJoinPool.commonPool-worker-1

    In the second example, we employ thenApplyAsync() and thenAcceptAsync() methods:

    System.out.println("Main thread: " + Thread.currentThread().getName());
    CompletableFuture
            .supplyAsync(() -> Thread.currentThread().getName() + " | ")
            .thenApplyAsync(supplyResult -> supplyResult + Thread.currentThread().getName() + " | ")
            .thenAcceptAsync(thenApplyResult -> System.out.println(thenApplyResult + Thread.currentThread().getName()))
            .get();

    In the output of the second example, we notice that different threads are used for the callback functions:

    Main thread: main
    ForkJoinPool.commonPool-worker-1 | ForkJoinPool.commonPool-worker-1 | ForkJoinPool.commonPool-worker-2

Error handling

There are two methods available for handling exceptions during asynchronous chain operations. The examples provided below detail their usage.

int negativeNumber = -1;

// example with exceptionally()
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    if (negativeNumber < 0) {
        throw new IllegalArgumentException("Number in future1 is negative!");
    } else {
        return "Integer is positive!";
    }
}).exceptionally(e -> { // catch only exception
    System.out.println("Exception message: " + e.getMessage());
    return "Exceptionally: Exception is caught";
});
System.out.println("Future1 response: " + future1.get());

// example with handle()
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    if (negativeNumber < 0) {
        throw new IllegalArgumentException("Number in future2 is negative!");
    } else {
        return "Integer is positive!";
    }
}).handle((response, e) -> { // catch the response and the exception
    if(e == null) return response; // if there is no exception, then just return response
    System.out.println("Exception message: " + e.getMessage());
    return "Handle: Exception is caught";
});
System.out.println("Future2 response: " + future2.get()); 

In the output, all exceptions are caught:

Exception message: java.lang.IllegalArgumentException: Number in future1 is negative!
Future1 response: Exceptionally: Exception is caught
Exception message: java.lang.IllegalArgumentException: Number in future2 is negative!
Future2 response: Handle: Exception is caught

Practical examples with multiple CompletableFuture

Let's delve into practical examples using CompletableFuture. Imagine we're developing a food delivery service. We'll address two scenarios: sequential asynchronous invocation and joining multiple instances.

In the first example, we have the main method createOrder(). Within it, we create a CompletableFuture object, along with other methods that simulate individual processes.

public static void main(String[] args) {
    var futureOrder = createOrder("Pizza");

    System.out.println("Waiting for Pizza...");
    futureOrder.get();
    System.out.println("Thread Main: " + Thread.currentThread().getName());
    System.out.println("Order is done: " + futureOrder.isDone());
}

public static CompletableFuture createOrder(String product) {
    return CompletableFuture
            .supplyAsync(() -> placeOrder(product))
            .thenApply(order -> sendConfirmation(order))
            .thenAccept(confirmation -> setForDelivery(confirmation));
}

public static String placeOrder(String product) {
    System.out.println("Thread place order: " + Thread.currentThread().getName());
    Thread.sleep(500);
    System.out.println("Order placed for " + product);
    return product;
}

public static boolean sendConfirmation(String orderConfirmation) {
    System.out.println("Thread Confirmation: " + Thread.currentThread().getName());
    Thread.sleep(500);
    System.out.println("Confirmation was sent for order " + orderConfirmation);
    return Objects.equals(orderConfirmation, "Pizza");
}

public static void setForDelivery(boolean isConfirmed) {
    System.out.println("Thread Delivery: " + Thread.currentThread().getName());
    Thread.sleep(500);
    if (isConfirmed){
        System.out.println("Order is ready for delivery!");
   }
}

The output presents messages from methods, concluding with a final notification when the order is fulfilled:

Waiting for Pizza...

Thread place order: ForkJoinPool.commonPool-worker-1
Order placed for Pizza

Thread Confirmation: ForkJoinPool.commonPool-worker-1
Confirmation was sent for order Pizza

Thread Delivery: ForkJoinPool.commonPool-worker-1
Order is ready for delivery!

Thread Main: main
Order is done: true

In the second example, assume several orders have been placed. You need to display a message when all of them are ready. We will leverage the placeOrder() method from the previous example:

public static void placeAllOrders() {
    var pizza = CompletableFuture.supplyAsync(() -> placeOrder("Pizza"));
    var burger = CompletableFuture.supplyAsync(() -> placeOrder("Burger"));
    var pasta = CompletableFuture.supplyAsync(() -> placeOrder("Pasta"));

    var allOrders = CompletableFuture.allOf(pizza, burger, pasta);

    // execute something when all "CompletableFuture" object will be completed
    allOrders.thenRun(() -> System.out.println("All orders placed!"));
    allOrders.join(); // get result of combined CompletableFuture
} 

As a result, we observe that each CompletableFuture has its own dedicated thread, and they might conclude in a varied sequence. Nevertheless, the final message is only displayed once all CompletableFuture objects have concluded.

Thread place order: ForkJoinPool.commonPool-worker-2
Order placed for Burger

Thread place order: ForkJoinPool.commonPool-worker-3
Order placed for Pasta

Thread place order: ForkJoinPool.commonPool-worker-1
Order placed for Pizza

All orders placed!

Conclusion

The CompletableFuture class offers a powerful tool for managing asynchronous programming in Java.
It transcends the limitations of the Future class and presents a versatile API for various operations, such as:

  • Creating a pipeline to chain operations using methods like thenApply, thenAccept, thenCompose, and others.

  • Combining results from different futures with allOf.

  • Handling errors using exceptionally and handle.

By mastering CompletableFuture, you can significantly enhance performance by optimizing the utilization of your application's resources.

22 learners liked this piece of theory. 2 didn't like it. What about you?
Report a typo