Introduction
Many Java applications require of concurrent operations. When using concurrency there are common risk that have to be avoided such as memory leaking, race condition, callback hell, disjointed error handling…
In version 8, Java provides a new class CompletableFuture
Concurrency classes provided by Java as Future and ExecutorService interfaces since version 5 to handle the asynchronous operations.
The Future interface is a placeholder for a result from an asynchronous operation which can be a Runnable or Callable instance. The ExecutorService interface submits an asynchronous operation and returns a Future object. The caller can use its get method to wait for the operation or cancel method to try to cancel the operation.
Future has the following methods: isDone() : Checks if the computation is completed. isCancelled() : checks if the task was cancelled before it is completed normally. get() : blocks for its completion and to retrieve the result of the computation. get(long timeout, TimeUnit unit) : blocks if necessary for at most the given time for this future to complete, and then returns its result, if available. cancel() : cancels execution of this task if possible.
Java 8 Introduced a new Class for dealing with concurrency problems caused by Future like blockings, also implementing interfaces Runnable and CompletionStage.
Java 8 Concurrency new CompletonStage and CompletableFuture
Java 8 provided a new interface : – java.util.concurrent.CompletionStage This interface defines a “stage” of possible asynchronous computation. All its methods return an instance of “CompletionStage” itself. This way multiple Completionstages can be chained together to complete a group of tasks.
Java 8 also introduced a new class: a new
java.util.concurrent.CompletableFuture*
This class implements both: 1) Future and 2) CompletionStage interfaces. This class provides static methods as starting points of concurrent operations as we will see below.Here are some of the more common used methods:
1) - supplyAsync(Supplier supplier) : Create a CompletableFuturence out of Suplier functional type.2) runAsync(Runnable action) : Create a CompletableFuture instance out of Runnable functional type.
3) completeExceptionally(Throwable ex) : Return true if this invocation caused this CompletableFuture to transition to a completed state, else false.
4) thenRun(Runnable action) : Execute the action when it completes normally.
5) thenApply(Function f) : Return a new CompletionStage.
6) thenAccept(Consumer action) : Return a new CompletionStage.
7) thenCombine(Future fu, Function foo) : Combines a Future and a Function with two arguments to process both results.
8) thenCompose(Function f) : To chain two Futures sequentially.
.
Its important to understand how the new threads are obtained. In fact by default those threads are obtained from "ForkJoinPool.commonPool()", but we have also the possibility to create our Executor (see: "Executors" API) and pass it to the CompletableFuture callbacks.
//Creating a new executor Executor executor = Executors.newFixedThreadPool(5); //then passing to CompletableFuture.supplyAsync(supplier, executor);
We can also launch new sync or async callback by calling "thenApply" operation. This callback offer to variants sync (same thread) or async(a new thread) -
CompletableFuture thenApply(Function func) CompletableFuture thenApplyAsync(Function func) CompletableFuture thenApplyAsync(Function fn, Executor executor)
Code Example:
ExampleConcurrency.java
package pub.Java.src; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; public class ExampleConcurrency { public static void main(String[] args) { /* Example of CompletableFuture */ /*First completable Future*/ CompletableFuturefirstCompletableFuture = new CompletableFuture (); String s = "Is there someone? "; String r = null; try { firstCompletableFuture.complete(": Yes! Hello Concurrent");; r = firstCompletableFuture.get(); s = s + r; } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } System.out.println("Result: " + s); /* "runAsync" : Example accepts a Runnable this time implemented with Anonymous Class */ CompletableFuture cfuture = new CompletableFuture ().runAsync(new Runnable() { @Override public void run() { String[] mesg = {"We","wait","in a" ,"different", "Thread"}; int i; try { for(i = 0;i cfuture1 = new CompletableFuture ().runAsync( () -> { String[] mesg = {"We","wait","in a" ,"different", "Thread"}; int i; try { for(i = 0;i myLuckyNumberFuture = new CompletableFuture().supplyAsync(()->{ /* NOTE: STREAMS HAVE TO BE USED CAREFULLY IN CONCURRENT OPERATIONS ** In this case We need to use a Stream Supplier to get a new open Stream to invoke a new operation * ONCE we have invoked a Terminal Operation (in this case ) that closes the Stream * That is the way to AVOID the:" java.lang.IllegalStateException: stream has already been operated upon or closed" * */ Supplier > supplierOfStreamNums = ()->Stream.of(0,1,2,3,4,5,6,7,8,9).parallel(); List luckList = supplierOfStreamNums.get().collect(Collectors.toList()); int ridx = (int) (Math.random()*10) % 10; return (Integer) luckList.get(ridx); }); /* We add the callback "thenApply" Function to modify the result obtained before */ myLuckyNumberFuture.thenApply(( l ->{ Integer n = l; if(n%2==0) n = n +1; else n = n -1; return n; })); /* add callback "thenAccept" Consumer to consume the Integer generated, in this case by printing it */ myLuckyNumberFuture.thenAccept(n->System.out.println("My lucky number is (I'm in a different thread) : " + n)); //Invoking the blocking the CompletableFuture blocking operation "get" */ Integer myLuckyNumber = -1; try { myLuckyNumber = myLuckyNumberFuture.get(); System.out.println("My lucky number is : " + myLuckyNumber); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }