Skip to content

Notes on CompletableFuture

Raypuff edited this page Nov 29, 2020 · 4 revisions

Hello! I was a bit confused when I first learned about CompletableFuture so made some note on completable future and its methods. Hope it helps!

What is a completable future?

CompletableFuture is a class that implements Future and CompletionStage. Let's take a look at the CompletableFuture example to see how it works.

Before that let's define log method so that we can visually see what is going on in which thread.

public static void log(String msg) {
	System.out.println(LocalTime.now() + " ("
	+ Thread.currentThread().getName() + ") " + msg);
}

How to use it as Future

CompletableFuture can be generated by new CompletableFuture and Type refers to the type of data that the job has completed and saved. You can create a CompletableFuture as follows and use it as a general Future.

CompletableFuture<String> future
	= new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> {
	Thread.sleep(2000);
	future.complete("Finished");
	return null;
});

log(future.get());

Output:
22:58:40.478 (main) Finished

If you already know the completedFuture()value, you can assign it without creating a thread using CompletableFuture.completedFuture(value)

Future<String> completableFuture =
	CompletableFuture.completedFuture("Skip!");
String result = completableFuture.get();
log(result);

Output:
22:59:42.553 (main) Skip!

supplyAsync()

supplyAsync() can be used to handle the async work without creating a direct thread to provide. You can pass supplier into SupplyAsync() and it will be processed by another thread.

In the example below, I put thread in sleep for 2 seconds so that future.get() (which will be called from main thread) will be delayed by 2 seconds before receiving the return value from supplyAsync().

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            log("Starting....");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Finished works";
        });

log("get(): " + future.get());

Output :
15:46:09.715 (ForkJoinPool.commonPool-worker-1) Starting....
15:46:11.716 (main) get(): Finished works

As you can see from the output, that it is processed by a different thread other than main before getting called back from main.

Exception handling

You may get an Exception when processing your work in CompletableFuture. In such cases, you use handle() to handle the exception.

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    String name = null;
    if (name == null) {
        throw new RuntimeException("Computation error!");
    }
    return "Hello, " + name;
}).handle((s, t) -> s != null ? s : "Hello, Stranger!");

log(future.get());

Output :
15:51:35.385 (main) Hello, Stranger!

thenApply() : execute a task with a return value

Once something is processed though SupplyAsync(), you can perform another task using thenApply().
thenApply() runs the function passed as a parameter using the return value from SupplyAsync() as an argument. *This method is equivalent to map.

CompletableFuture<String> future = CompletableFuture
        .supplyAsync(() -> "Future1")
        .thenApply(s -> s + " + Future2");

log("future.get(): " + future.get());

Output :
15:57:49.343 (main) future.get(): Future1 + Future2

thenApply() also has a return value, so you can apply one after another.

E.g)

CompletableFuture<String> future = CompletableFuture
        .supplyAsync(() -> "Hello")
        .thenApply(s -> s + " World")
        .thenApply(s -> s + " Future");

log("future.get(): " + future.get());

Output:
16:00:00.513 (main) future.get(): Hello World Future

thenAccept() : execute a task with no return value

thenAccept() and thenApply() are similar but the argument thenAccept() handle has no return value(i.e Consumer )
Thus, the type of CompletableFuture is Void

E.g)

CompletableFuture<String> future1 = CompletableFuture
        .supplyAsync(() -> "Hello");
CompletableFuture<Void> future2 = future1.thenAccept(
        s -> log(s + " World"));

log("future1.get(): " + future1.get());
log("future2.get(): " + future2.get());

Output:
16:02:05.452 (main) Hello World
16:02:05.453 (main) future1.get(): Hello
16:02:05.453 (main) future2.get(): null

As you can see from the output above, the one with thenAccept returns null.

thenCompose() : Perform multiple tasks in sequence

thenCompose() plays the role of creating one CompletableFuture from two CompletableFutures like a chain.
When the result of the first CompletableFuture is returned, the result is transmitted to the second CompletableFuture and the jobs are processed sequentially.
It takes in a function that returns CompletableFutures as an argument.
*it is equivalent to flatmap

CompletableFuture<String> future = CompletableFuture
        .supplyAsync(() -> "Hello")
        .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
log(future.get());

Output:
16:07:33.196 (main) Hello World

thenCombine() : execute multiple tasks at the same time

thenCombine() takes in two CompletableFutures and process them parallelly.
After both processes of two CompletableFutures are done, it combines the two and returns one CompletableFuture.\

For example, let's have two CompletableFutures and use thenCombine() to process them so that these two futures are done in parallel and the results are combined into one.

CompletableFuture<String> future1 = CompletableFuture
        .supplyAsync(() -> "Future1")
        .thenApply((s) -> {
            log("Starting future1");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return s + "!";
        });

CompletableFuture<String> future2 = CompletableFuture
        .supplyAsync(() -> "Future2")
        .thenApply((s) -> {
            log("Starting future2");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return s + "!";
        });

future1.thenCombine(future2, (s1, s2) -> s1 + " + " + s2)
        .thenAccept((s) -> log(s));

Output:
16:12:03.569 (main) Starting future1
16:12:05.571 (main) Starting future2
16:12:07.573 (main) Future1! + Future2!

Looking at the results, it looks as if they were processed sequentially. The reason is that thenApply() is using the same thread which makes the waiting time.

thenApply() vs thenApplyAsync() : thenCombine()

Instead of using thenApply(), using thenApplyAsync() will make it to process them in different threads.

CompletableFuture<String> future1 = CompletableFuture
        .supplyAsync(() -> "Future1")
        .thenApplyAsync((s) -> {
            log("Starting future1");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return s + "!";
        });

CompletableFuture<String> future2 = CompletableFuture
        .supplyAsync(() -> "Future2")
        .thenApplyAsync((s) -> {
            log("Starting future2");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return s + "!";
        });

future1.thenCombine(future2, (s1, s2) -> s1 + " + " + s2)
        .thenAccept((s) -> log(s));

Output:\ 16:15:39.532 (ForkJoinPool.commonPool-worker-2) Starting future2
16:15:39.537 (ForkJoinPool.commonPool-worker-1) Starting future1
16:15:41.537 (ForkJoinPool.commonPool-worker-1) Future1! + Future2!

As you can see from the output, now both tasks are done in different threads.

anyOf()

anyOf() is a method to get the fastest result that is processed from multiple CompletableFutures.

For example, let's have three CompletableFuture with different completion times.

        CompletableFuture<String> future1 = CompletableFuture
            .supplyAsync(() -> "Future1")
            .thenApplyAsync((s) -> {
                log("Starting future1");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return s + "!";
            });

        CompletableFuture<String> future2 = CompletableFuture
            .supplyAsync(() -> "Future2")
            .thenApplyAsync((s) -> {
                log("Starting future2");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return s + "!";
            });

        CompletableFuture<String> future3 = CompletableFuture
            .supplyAsync(() -> "Future3")
            .thenApplyAsync((s) -> {
                log("Starting future3");
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return s + "!";
            });

        CompletableFuture.anyOf(future1, future2, future3)
                .thenAccept(s -> log("Result: " + s));

Output:
03:49:58.100954 (ForkJoinPool.commonPool-worker-1) Starting future1
03:49:58.101765 (ForkJoinPool.commonPool-worker-2) Starting future2
03:49:58.102536 (ForkJoinPool.commonPool-worker-3) Starting future3
03:49:59.102378 (ForkJoinPool.commonPool-worker-1) Result: Future1!

As you can see from the output, all the tasks will be processed but only the one executed the fastest will be passed to the thenAccept().

allOf()

Unlike anyOf(), allOf() waits for all of the tasks to be completed and performs another task.
allOf() can be processed using stream API as well.

e.g.)

CompletableFuture<String> future1 = CompletableFuture
        .supplyAsync(() -> "Future1");

CompletableFuture<String> future2 = CompletableFuture
        .supplyAsync(() -> "Future2");

CompletableFuture<String> future3 = CompletableFuture
        .supplyAsync(() -> "Future3");

CompletableFuture<Void> combinedFuture
        = CompletableFuture.allOf(future1, future2, future3);

log("get() : " + combinedFuture.get());
log("future1.isDone() : " + future1.isDone());
log("future2.isDone() : " + future2.isDone());
log("future3.isDone() : " + future3.isDone());

String combined = Stream.of(future1, future2, future3)
        .map(CompletableFuture::join)
        .collect(Collectors.joining(" + "));
log("Combined: " + combined);

Output:
16:22:26.615 (main) get() : null
16:22:26.615 (main) future1.isDone() : true
16:22:26.615 (main) future2.isDone() : true
16:22:26.616 (main) future3.isDone() : true
16:22:26.620 (main) Combined: Future1 + Future2 + Future3

Clone this wiki locally