Asynchronous programming is an essential part of modern software development. It allows applications to execute tasks concurrently, enhancing the throughput, speed, and scalability of backend services and their resource consumption. Consider the following examples:
I/O-bound operations;
Connections to third-party network services;
File operations, and more.
It proves invaluable in situations where the system must continue processing other tasks while waiting for time-consuming operations to conclude.
In this topic, we will delve deeper into asynchronous programming and explore the CompletableFuture class, which offers a contemporary API for crafting asynchronous code in Java.
Challenges in asynchronous programming
With the vast potential of asynchronous programming comes the complexity of its implementation. Here are some of the challenges encountered:
Concurrency control: Managing multiple execution threads can be intricate, especially when sharing resources or data. Proper synchronization is paramount to prevent issues like race conditions, deadlocks, or data inconsistencies.
Error handling: Asynchronous code errors can be trickier to manage than those in synchronous code. Exceptions thrown asynchronously may not propagate to the main execution thread, making them elusive and harder to address.
Debugging: Debugging asynchronous operations can pose challenges due to the nonlinear execution flow. Conventional debugging tools, often designed for synchronous, sequential code, might not offer a comprehensive view of the asynchronous application's activities.
Code complexity: Asynchronous code can rapidly become intricate and challenging to decipher, especially when dealing with nested callbacks, colloquially termed "callback hell." Such complexity can make the code harder to maintain and understand.
Limitations of the "Future" class
The initial approach to asynchronous programming in Java was introduced through the Future class. While it serves as a mechanism for managing asynchronous operations, it possesses several limitations that hinder its convenience. Here's a closer look:
Inability to manually complete: The
Futureclass doesn't allow manual completion of aFuture. Once you instantiate aFuturetask, its execution remains beyond your control.ExecutorService executorService = Executors.newSingleThreadExecutor(); // creates a single thread Callable<String> asyncTask = () -> { // task returns the string "Hello world" after 1 minute int ONE_MINUTE = 60000; Thread.sleep(ONE_MINUTE); return "Hello from future!"; }; Future<String> future = executorService.submit(asyncTask); // hope that future will be completeFor instance, there's no mechanism to set a default value to a
Futureor complete it based on specific conditions. You won't find a method akin tofuture.complete("default string value").
No notification upon completion: The
Futureclass doesn't offer notifications upon task completion. While you can employFuture.get()to retrieve the result, this method halts the execution of your program.ExecutorService executorService = Executors.newSingleThreadExecutor(); // creates a single thread Callable<String> asyncTask = () -> { // task returns the string "Hello from future" after 10 minutes int TEN_MINUTES = 600000; Thread.sleep(TEN_MINUTES); return "Hello from future!"; }; Future<String> future = executorService.submit(asyncTask); try { String result = future.get(); // this will block until the result is available } catch (InterruptedException | ExecutionException e) { // Handle exception }Additionally, you cannot attach a callback function to the
Futureto trigger post-completion of the asynchronous task.
Absence of exception handling: When a
Futuretask encounters an exception, it's encapsulated within anExecutionExceptionand manifested through theget()method. The sole method for error mitigation entails enclosing theget()within atry-catchblock and managing the prevalentExecutionException.// FutureTaskExample.java Future<String> futureWithException = executorService.submit(() -> { throw new RuntimeException("Exception while some operations!"); }); try { String result = futureWithException.get(); // Throws ExecutionException } catch (InterruptedException | ExecutionException e) { // Handle exception }Lack of support for merging futures: The
Futureclass doesn't supply utilities to amalgamate multipleFuturesinto a unified entity. If you're dealing with an assortment ofFuturesand aim to generate a newFuturecompleted upon the culmination of all original tasks, manual orchestration is imperative.
CompletableFuture API
CompletableFuture is a Java class that simplifies the process of writing asynchronous, non-blocking, and multi-threaded code.
Introduced in Java 8 within the java.util.concurrent package, CompletableFuture enhances the original Future class, addressing many of the limitations we discussed earlier.
Let's explore some key methods offered by the CompletableFuture API.
get(),getNow(),complete()Here's a basic example of retrieving results from a
CompletableFutureobject.CompletableFuture<String> completableFuture = new CompletableFuture<>(); // create CompletableFuture try { // block the main thread, due to no value in completableFuture // completableFuture.get(); // default value, when completableFuture is not completed String defaultValue = completableFuture.getNow("value before complete future"); completableFuture.complete("value from future"); String valueAfterComplete = completableFuture.get(); // get value future was completed System.out.println(defaultValue); System.out.println(valueAfterComplete); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }The output will be:
value before complete future value from futuresupplyAsync(),runAsync()A major feature of
CompletableFutureis its ability to run asynchronous operations. The methodssupplyAsync()andrunAsync()allow for operations to execute in a separate thread without blocking the main one. To illustrate this, we'll also display the thread executing the code:System.out.println("Main thread: " + Thread.currentThread().getName()); CompletableFuture<Void> voidFuture = CompletableFuture.runAsync(() -> { try { int ONE_SECOND = 1000; Thread.sleep(ONE_SECOND); System.out.println("Void future thread: " + Thread.currentThread().getName()); } catch (InterruptedException e) { throw new RuntimeException(e); } }); CompletableFuture<String> futureWithValue = CompletableFuture.supplyAsync(() -> { try { int ONE_SECOND = 1000; Thread.sleep(ONE_SECOND); System.out.println("Future with value thread: " + Thread.currentThread().getName()); return "Value from future after 1 second"; } catch (InterruptedException e) { throw new RuntimeException(e); } }); System.out.println("Some info from main thread"); voidFuture.get(); String valueFromFuture = futureWithValue.get(); System.out.println(valueFromFuture);The output will be:
Main thread: main Some info from main thread Void future thread: ForkJoinPool.commonPool-worker-1 Future with value thread: ForkJoinPool.commonPool-worker-2 Value from future after 1 secondjoin(),isCancelled(),isDone()Next, we'll examine methods that check the status of a
CompletableFutureobject. The methodjoin()functions similarly toget(), but it throws aCompletionExceptionif there's an underlying exception. The methodsisCancelled()andisDone()indicate whether the future has been completed or canceled. Here's an example that simulates a delayed operation and uses these methods:CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { int ONE_SECOND = 1000; Thread.sleep(ONE_SECOND); // simulate some delay } catch (InterruptedException e) { e.printStackTrace(); } return "Hello from future after 1 second"; }); // check if task is done or canceled System.out.println("Future done -> " + future.isDone()); System.out.println("Future canceled -> " + future.isCancelled()); // use join() to wait when the CompletableFuture will be finished and get the result String result = future.join(); System.out.println("Join result -> " + result); // check again if task is done or canceled System.out.println("Future done -> " + future.isDone()); System.out.println("Future canceled -> " + future.isCancelled());The output will be:
Future done -> false Future canceled -> false Join result -> Hello from future after 1 second Future done -> true Future canceled -> falsethenSupply(),thenApply(),thenRun(),thenAccept()Methods prefixed with "then" facilitate chaining operations by attaching callback functions. These functions are automatically invoked when the
CompletableFuturecompletes.CompletableFuture<String> completableFuture = CompletableFuture .supplyAsync(() -> "thenApply(): First supply, ") // creates first supply .thenApply(supplyResult -> supplyResult + "then first callback") //apply as many callback as you need .thenApply(thenApplyResult -> thenApplyResult + "and second callback"); System.out.println(completableFuture.get()); CompletableFuture .supplyAsync(() -> "thenAccept(): First supply, ") .thenApply(supplyResult -> supplyResult + "then first callback") .thenAccept(thenAcceptResult -> System.out.println(thenAcceptResult + " and accept the result")) .get(); // return void CompletableFuture and apply the function with value from previous chain CompletableFuture .supplyAsync(() -> "First supply, ") .thenApply(supplyResult -> supplyResult + "then first callback") .thenRun(() -> System.out.println("thenRun(): Just run separate function")) .get(); // return void CompletableFuture and apply the functionThe output will be:
thenApply(): First supply, then first callbackand second callback thenAccept(): First supply, then first callback and accept the result thenRun(): Just run separate functionthenApplyAsync(),thenAcceptAsync()While the previously discussed methods enable the attachment of numerous callbacks that all execute in one thread, there are times when callbacks need to run in different threads. For such scenarios, methods suffixed with "async" come into play.
Here's a comparison of threads used with
thenApply()versusthenApplyAsync():System.out.println("Main thread: " + Thread.currentThread().getName()); CompletableFuture .supplyAsync(() -> Thread.currentThread().getName() + " | ") .thenApply(supplyResult -> supplyResult + Thread.currentThread().getName() + " | ") .thenAccept(thenApplyResult -> System.out.println(thenApplyResult + Thread.currentThread().getName())) .get();We observe that in the first example, all callbacks are executed using the same separate thread:
Main thread: main ForkJoinPool.commonPool-worker-1 | ForkJoinPool.commonPool-worker-1 | ForkJoinPool.commonPool-worker-1In the second example, we employ
thenApplyAsync()andthenAcceptAsync()methods:System.out.println("Main thread: " + Thread.currentThread().getName()); CompletableFuture .supplyAsync(() -> Thread.currentThread().getName() + " | ") .thenApplyAsync(supplyResult -> supplyResult + Thread.currentThread().getName() + " | ") .thenAcceptAsync(thenApplyResult -> System.out.println(thenApplyResult + Thread.currentThread().getName())) .get();In the output of the second example, we notice that different threads are used for the callback functions:
Main thread: main ForkJoinPool.commonPool-worker-1 | ForkJoinPool.commonPool-worker-1 | ForkJoinPool.commonPool-worker-2
Error handling
There are two methods available for handling exceptions during asynchronous chain operations. The examples provided below detail their usage.
int negativeNumber = -1;
// example with exceptionally()
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
if (negativeNumber < 0) {
throw new IllegalArgumentException("Number in future1 is negative!");
} else {
return "Integer is positive!";
}
}).exceptionally(e -> { // catch only exception
System.out.println("Exception message: " + e.getMessage());
return "Exceptionally: Exception is caught";
});
System.out.println("Future1 response: " + future1.get());
// example with handle()
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
if (negativeNumber < 0) {
throw new IllegalArgumentException("Number in future2 is negative!");
} else {
return "Integer is positive!";
}
}).handle((response, e) -> { // catch the response and the exception
if(e == null) return response; // if there is no exception, then just return response
System.out.println("Exception message: " + e.getMessage());
return "Handle: Exception is caught";
});
System.out.println("Future2 response: " + future2.get());
In the output, all exceptions are caught:
Exception message: java.lang.IllegalArgumentException: Number in future1 is negative!
Future1 response: Exceptionally: Exception is caught
Exception message: java.lang.IllegalArgumentException: Number in future2 is negative!
Future2 response: Handle: Exception is caughtPractical examples with multiple CompletableFuture
Let's delve into practical examples using CompletableFuture. Imagine we're developing a food delivery service. We'll address two scenarios: sequential asynchronous invocation and joining multiple instances.
In the first example, we have the main method createOrder(). Within it, we create a CompletableFuture object, along with other methods that simulate individual processes.
public static void main(String[] args) {
var futureOrder = createOrder("Pizza");
System.out.println("Waiting for Pizza...");
futureOrder.get();
System.out.println("Thread Main: " + Thread.currentThread().getName());
System.out.println("Order is done: " + futureOrder.isDone());
}
public static CompletableFuture createOrder(String product) {
return CompletableFuture
.supplyAsync(() -> placeOrder(product))
.thenApply(order -> sendConfirmation(order))
.thenAccept(confirmation -> setForDelivery(confirmation));
}
public static String placeOrder(String product) {
System.out.println("Thread place order: " + Thread.currentThread().getName());
Thread.sleep(500);
System.out.println("Order placed for " + product);
return product;
}
public static boolean sendConfirmation(String orderConfirmation) {
System.out.println("Thread Confirmation: " + Thread.currentThread().getName());
Thread.sleep(500);
System.out.println("Confirmation was sent for order " + orderConfirmation);
return Objects.equals(orderConfirmation, "Pizza");
}
public static void setForDelivery(boolean isConfirmed) {
System.out.println("Thread Delivery: " + Thread.currentThread().getName());
Thread.sleep(500);
if (isConfirmed){
System.out.println("Order is ready for delivery!");
}
}
The output presents messages from methods, concluding with a final notification when the order is fulfilled:
Waiting for Pizza...
Thread place order: ForkJoinPool.commonPool-worker-1
Order placed for Pizza
Thread Confirmation: ForkJoinPool.commonPool-worker-1
Confirmation was sent for order Pizza
Thread Delivery: ForkJoinPool.commonPool-worker-1
Order is ready for delivery!
Thread Main: main
Order is done: true
In the second example, assume several orders have been placed. You need to display a message when all of them are ready. We will leverage the placeOrder() method from the previous example:
public static void placeAllOrders() {
var pizza = CompletableFuture.supplyAsync(() -> placeOrder("Pizza"));
var burger = CompletableFuture.supplyAsync(() -> placeOrder("Burger"));
var pasta = CompletableFuture.supplyAsync(() -> placeOrder("Pasta"));
var allOrders = CompletableFuture.allOf(pizza, burger, pasta);
// execute something when all "CompletableFuture" object will be completed
allOrders.thenRun(() -> System.out.println("All orders placed!"));
allOrders.join(); // get result of combined CompletableFuture
}
As a result, we observe that each CompletableFuture has its own dedicated thread, and they might conclude in a varied sequence. Nevertheless, the final message is only displayed once all CompletableFuture objects have concluded.
Thread place order: ForkJoinPool.commonPool-worker-2
Order placed for Burger
Thread place order: ForkJoinPool.commonPool-worker-3
Order placed for Pasta
Thread place order: ForkJoinPool.commonPool-worker-1
Order placed for Pizza
All orders placed!Conclusion
The CompletableFuture class offers a powerful tool for managing asynchronous programming in Java.
It transcends the limitations of the Future class and presents a versatile API for various operations, such as:
Creating a pipeline to chain operations using methods like
thenApply,thenAccept,thenCompose, and others.Combining results from different futures with
allOf.Handling errors using
exceptionallyandhandle.
By mastering CompletableFuture, you can significantly enhance performance by optimizing the utilization of your application's resources.