We all need a Holiday Destination Finder
- 13 minutes read - 2769 wordsTiptoeing Through the Asynch Door with the CompletionStage Interface JSE17
When it comes to holidays, I’m as spontaneous as they come. I enjoy not planning and picking the best option available just before or even while traveling.
So I needed a holiday… and a reason to explore Java’s async magic.

We’ll get to this Holiday Finder but first let’s get the theory out of the way, shall we?
Diving Right In
Considering two API calls A & B, API A is called first, and then API B. This is what synchronous code does: it waits before executing the next task and things run in order. However, could the thread blocked on the API response be doing some other work instead?
For data pipelines or long-running tasks, blocking the flow of execution to wait for results might bring the application to a halt. This is directly impacting its throughput and ability to scale.
On the contrary, asynchronous code is non-blocking: if calling API B isn’t dependent on calling API A, then both calls can be done asynchronously. It does not guarantee they will run in parallel or simultaneously but indicates that the executing thread will go about the asynchronous work when not busy at some point in the future.
Asynch is most useful in I/O heavy applications.
More Definitions, just so we’re clear
So Asynch also means Concurrency & Multi-threading?
Not exactly. Multi-core processors allow multiple threads to operate concurrently.
Concurrency is the execution of multiple tasks conceptually simultaneously. They could actually be running in parallel aka on multiple threads, or sharing the execution thread. If the same thread is shared, tasks can’t possibly be running exactly concurrently but conceptually, they will be in progress at the same time. However, one can perform asynchronous operations without multi-threading: non-blocking operations would free the current thread so that it can do other tasks. It is worth noting, asynch operation can also be executed in different threads - we will see the examples in the next paragraphs.
Key Concepts
- Callbacks: enables to conceptually chain task execution by passing functions as arguments to other functions to be executed after.
~~~~ Pseudo Code ~~~~
TaskA.thenExecute(TaskB).thenExecute(TaskC)
-
Promises (or Futures): represent a value that may be available in the future. While the value contained in a future does not need to be evaluated, this construct allows the program execution to continue even if it is not populated yet.
-
Synchronizers: conceptually allows to wait for multiple results of asynch tasks and/or combine them.
~~~~ Pseudo Code ~~~~
AllOf(TaskA, TaskB, TaskC) // wait for all tasks
AnyOf(TaskD, TaskE) // wait for one task
And the Event-loop in all that?
The event loop is most commonly associated with JavaScript, but let’s explore the broader concept (especially since my current JavaScript level is somewhere around -2 on a scale from 0 to 10).
An event loop is a mechanism that continuously monitors and executes queued asynchronous tasks or events, typically once the thread or system resources they depend on become available.
In this discussion, we’ll look at different patterns for performing asynchronous programming in Java — namely, using the ExecutorService and the CompletableFuture API. It might be tempting to compare task distribution and execution in an ExecutorService or a ForkJoinPool to something “event-loop-ish” — and in a very broad conceptual sense, that’s not entirely wrong. However, both solutions exhibit key behavioral differences from a true event loop, which we’ll explore in the following sections.
Pattern 1: Executor Service
Executor Service Quick Notes
The ExecutorService (contained in java.util.concurrent) is a pool of threads
with a waiting queue, common to all threads. Its interface provides methods to help manage
tasks needing to run concurrently.
When a task is submitted to the Executor Service via the submit() method, it is added to the waiting queue
and run by the first available thread.
The task can be a Runnable or a Callable and returns a Future object - which either gives the result or an exception if the task completed exceptionally. Note that getting the result from a Future object is still a blocking operation: this halts the thread getting the future’s result until the value is present.
Executor Service come in various flavours such as fixed-size, cached, scheduled, etc. More to come on different types of scheduled executor services in a separate post.
Short Example
In the below example, an Executor Service of 4 threads is created while we submit 5 tasks. From the logs, thread 4 has handled 2 of those tasks.
public static void run() {
ExecutorService executor = Executors.newFixedThreadPool(4);
List<Future<Integer>> futures = new ArrayList<>();
Callable<Integer> answerToTheUniverse = () -> {
System.out.println("Currently on thread" + Thread.currentThread().getName());
return new Random().nextInt(39, 49);
};
for (int i = 0; i < 5; i++) {
futures.add(executor.submit(answerToTheUniverse));
}
try {
double avg = futures.stream().mapToInt(result -> {
try {
return result.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}).average()
.orElse(0);
System.out.printf("The answer to the universe is %s", avg);
} catch (Exception e) {
e.printStackTrace();
} finally {
executor.shutdown(); // VM won't exit without it
}
}
Currently on threadpool-1-thread-4
Currently on threadpool-1-thread-1
Currently on threadpool-1-thread-2
Currently on threadpool-1-thread-3
Currently on threadpool-1-thread-4
The answer to the universe is 42.6
In the case where a following task depends on the result of a previous one, the task chaining
needs to be handled explicitly by performing a future.get(). This blocks the thread handling
the current task.
Pattern 2: Completable Future
A Completable Future, similarly to a Future, is a container that holds the eventual result of an operation. While retrieving results with the Executor Service can block the thread, it is not the case for Completable Futures.
Tasks executed via the Completable Future API are submitted to the Common Fork Join Pool, number of threads being the number of CPU cores. Those threads have individual task queues and can perform work stealing from other threads when they become available, yielding higher CPU utilisation, higher throughput and lower latency than an Executor Service.
By default, a task is executed in the same thread as the one that submitted
it. However, the Completable Future API provides a set of methods keyworded async,
which indicates the task may be executed in a different thread.
Chaining, Synchronizing & Composing Tasks
The Completable Future API provides out-of-the-box methods to chain tasks: thenApply, thenCompose, or thenCombine to chain
dependent computations. These methods are non-blocking — they callback code to run once results are available.
Note the method names apply, accept and supply, matching the single method of Function, Consumer and Supplier interfaces.
Those methods can accept an optional Executor Service to provide control over which thread executes which task - see in below example.
Task results can be composed or combined. Both thenCompose and thenCombine are available depending on the use case.
In the below example, we went for a thenCombine that takes in a CompletableFuture and a BiFunction.
allOf() and anyOf() are used to return a Completable Future of all/void or one of the tasks once completed. These
are non-blocking and still require a join (Future.get).
Handling Exceptions (or not)
In chained asynchronous tasks, unhandled failure is propagated to the downstream operations which will fail too.
exceptionally() and handle() are used recover from failures and provide fallback values while whenComplete()
can observe failures/successes without the opportunity to recover.
Holiday Destination Finder ~Pattern2Impl
Back to our actual goal: the Holiday Destination Finder. The basis of picking a good holiday destination is:
- It is reachable from one of the city’s airport you currently are in,
- Its current temperature is close to the temperature you are targeting for your holidays & the combo (temperature, humidity) is considered pleasant.
In order to proceed, we start by creating two clients: a weather client,
getting temperature and humidity details from https://wttr.in & an airline client.
To start simple, we’re only considering the Ryanair airline.
package holidaysfinder;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.Set;
import java.util.stream.Collectors;
public class AirlineClient {
private static final String DESTINATIONS_URL = "https://www.ryanair.com/api/views/locate/searchWidget/routes/en/airport/";
private static final String AIRPORTS_URL = "https://www.ryanair.com/api/views/locate/5/airports/en/active";
private static final HttpClient client = HttpClient.newHttpClient();
public static Set<String> getAirportsFrom(String city) throws Exception {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(AIRPORTS_URL))
.setHeader("Accept", "application/json")
.GET()
.build();
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
// new way of parsing json - usual objMapper.jsonFactory.parse from gson and jackson are deprecated
JsonArray json = JsonParser.parseString(response.body()).getAsJsonArray();
if (response.statusCode() != 200) {
throw new Exception("Bad response code");
}
return extractAirports(json, city);
}
public static Set<String> getDestinationsFrom(String airport) throws Exception {
String url = DESTINATIONS_URL + airport;
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(url))
.setHeader("Accept", "application/json")
.GET()
.build();
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
// new way of parsing json - usual objMapper.jsonFactory.parse from gson and jackson are deprecated
JsonArray json = JsonParser.parseString(response.body()).getAsJsonArray();
if (response.statusCode() != 200) {
throw new Exception("Bad response code");
}
return extractCities(json);
}
private static Set<String> extractCities(JsonArray jsonArray) {
return jsonArray.asList().stream()
.map(JsonElement::getAsJsonObject)
.filter(w -> w.has("arrivalAirport")
&& w.getAsJsonObject("arrivalAirport").has("city")
&& w.getAsJsonObject("arrivalAirport").getAsJsonObject("city").has("name"))
.map(x -> x.getAsJsonObject("arrivalAirport").getAsJsonObject("city").get("name").getAsString())
.collect(Collectors.toSet());
}
private static Set<String> extractAirports(JsonArray json, String city) {
return json.asList().stream().map(JsonElement::getAsJsonObject)
.filter(w -> w.has("city") && w.getAsJsonObject("city").has("name"))
.filter(w -> w.getAsJsonObject("city").get("name").getAsString().equals(city))
.map(x -> x.get("code").getAsString())
.collect(Collectors.toSet());
}
}
package holidaysfinder;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class WeatherClient {
private static final String BASE_URL = "https://wttr.in/%s?";
private static final String TEMPERATURE_FMT = "format=%25t";
private static final String HUMIDITY_FMT = "format=%25h";
private static final HttpClient client = HttpClient.newHttpClient();
private static final Pattern temp_regex = Pattern.compile("([+-]?\\d+)");
private static final Pattern humid_regex = Pattern.compile("(\\d+)");
public int getTemperature(String city) throws Exception {
return resToInt(fetchData(TEMPERATURE_FMT, city), temp_regex);
}
public int getHumidity(String city) throws Exception {
return resToInt(fetchData(HUMIDITY_FMT, city), humid_regex);
}
private String fetchData(String format, String city) throws Exception {
String url = BASE_URL.replace("%s", city) + format;
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(url))
.GET()
.build();
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() != 200) {
throw new Exception("Bad response code");
}
return response.body();
}
private int resToInt(String formattedRes, Pattern regex) {
Matcher matcher = regex.matcher(formattedRes);
if (matcher.find()) {
return Integer.parseInt(matcher.group());
}
return 0;
}
}
The data fetched is compiled into a DestinationCandidate object - which simple
implementation will be left out of this post.
Once all the data has been fetched, a score is computed
for each available destination. This functions acts as a penalty, hence lower
scores are technically better destinations. See below the ScoreHelper.class.
package holidaysfinder;
public class ScoreHelper {
private static final int TEMP_COEFF = 2;
private static final int HUMID_COEFF = 1;
public static int getScore(int targetTemperature, int temperature, int humidity) {
return TEMP_COEFF * Math.abs(targetTemperature - temperature) + HUMID_COEFF * Math.abs(idealHumidity(temperature) - humidity);
}
private static int idealHumidity(int temperature){
if (temperature < 15) return 65;
if (temperature < 22) return 55;
if (temperature < 28) return 45;
if (temperature < 33) return 35;
if (temperature < 38) return 25;
return 15;
}
}
If I take the example of London: Ryanair operates in 3 of London’s airports, providing a little more than 200 different destinations.
Getting those weather details per destination would take a little too long: 200 ms * 200 destinations, give or take. That’s where asynch programming can be helpful.
The Idea
Asynchronous programming is leveraged to reduce the time taken to fetch temperature and humidity data for each destination available from airports available at the current location.
Main Thread
|
|-- getAirportsFrom(myLocation)
|
|-- for each airport:
| |
| |--/ supplyAsync(getDestinationsFrom) <----------------------\
| | |
| | |---- fetch destinations for airport |
| | \-- thenAcceptAsync(destinations ->) |
| | |
| | |-- for each city:
| | | |
| | | |--/ supplyAsync(getTemperature) ------\ |
| | | | | |
| | | |--/ supplyAsync(getHumidity) --------\| |
| | | | || |
| | | |---- thenCombineAsync (temp + hum) -->|| |
| | | | |
| | | | \-- thenApplyAsync(score) --------->|| |
| | | | || |
| | | \--------------------------------------/| |
| | | | |
| | \---------------------------------------------/ |
| \-------------------------------------------------------------/
|
|-- CompletableFuture.allOf(destinationFetchTasks).join()
| (waits for all cities to be discovered and tasks queued)
|
|-- CompletableFuture.allOf(futures).join()
| (waits for all weather/humidity+score tasks to finish)
|
|-- Find best destinations and print result
|
Weather Data & Destination Score tasks
Fetching the temperature and humidity are defined as suppliers, using the Weather Client defined earlier. The temperature task is then combined with the humidity task to create a new Destination Candidate object.
The computeScore task is chained to this result via a thenApplyAsync.
The error handling on the temp/humidity task is done using the exceptionally method, allowing us to recover and
fall back to a default value.
In this example it is set to 100 in order to avoid a Destination Candidate with incomplete data to be picked.
// Temperature task: Definition and Submittion
Supplier<Integer> fetchTemperature = () -> {
try {
return weatherClient.getTemperature(city);
} catch (Exception e) {
throw new RuntimeException(e);
}
};
CompletableFuture<Integer> temperatureTask = CompletableFuture
.supplyAsync(fetchTemperature, weather_executor)
.exceptionally(e -> {
System.err.println("Failed to fetch temperature for " + city + ": " + e.getMessage());
return DEFAULT_FALLBACK_VALUE;
});
// Humidity task: Definition and Submittion
Supplier<Integer> fetchHumidity = () -> {
try {
return weatherClient.getHumidity(city);
} catch (Exception e) {
throw new RuntimeException(e);
}
};
CompletableFuture<Integer> humidityTask = CompletableFuture
.supplyAsync(fetchHumidity, humidity_executor)
.exceptionally(e -> {
System.err.println("Failed to fetch humidity for " + city + ": " + e.getMessage());
return DEFAULT_FALLBACK_VALUE;
});
// Compute Score task: Definition
Function<DestinationCandidate, DestinationCandidate> computeScore = destination -> {
destination.setScore(ScoreHelper.getScore(
targetTemperature,
destination.getTemperature(),
destination.getHumidity()
));
return destination;
};
// Temperature and Humidity task Combinaison
// Chaining of Compute Score task
CompletableFuture<DestinationCandidate> destinationTask =
temperatureTask.thenCombineAsync(humidityTask, (temperature, humidity) -> {
DestinationCandidate destination = new DestinationCandidate(city, airport);
destination.setTemperature(temperature);
destination.setHumidity(humidity);
return destination;
}).thenApplyAsync(computeScore);
Collecting the results
The above logic is applied for each destination, under the form of a Consumer on which is called thenAcceptAsync; itself
a chained operation on top of getDestinationsFrom(airport) taking the form of a Supplier.
I did the initial mistake of explicitly waiting on the Destination Candidate futures only - to wait until the scores were computed. Reasonable, isn’t it?
Without waiting for the destination to be fetched first, allOf(destinationCandidatesFutures)
was being called on an empty array of futures.
Hence, the allOf method is utilized twice: once to wait for all destination fetches to be done, and once to wait for all scores to be computed.
Note allOf() itself is non-blocking, it is used to coordinate and aggregate multiple futures
into a single one but does not block. Hence, the explicit wait destinationsDone.join().
Because allOf() returns a Completable Futures of Void, the result is not usable as-is. To collect the final
data generated by the tasks, we perform a join operation on each Destination Candidate Completable Future.
From there, we extract the destination(s) with the minimum score and display it to the user.
package holidaysfinder;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.function.Supplier;
import java.net.URLEncoder;
public class DestinationFinder {
private static final WeatherClient weatherClient = new WeatherClient();
private static final ExecutorService weather_executor = Executors.newFixedThreadPool(2);
private static final ExecutorService humidity_executor = Executors.newFixedThreadPool(2);
private static final int DEFAULT_FALLBACK_VALUE = 100;
public static void run(String myLocation, int targetTemperature) throws Exception {
Set<String> airports = AirlineClient.getAirportsFrom(myLocation);
List<CompletableFuture<DestinationCandidate>> destinationCandidatesFutures = Collections.synchronizedList(new ArrayList<>());
Instant begin = Instant.now();
List<CompletableFuture<Void>> destinationFetchTasks = airports.stream()
.map(airport ->
CompletableFuture.supplyAsync(() -> {
try {
return AirlineClient.getDestinationsFrom(airport);
} catch (Exception e) {
throw new RuntimeException("Failed to fetch destinations for airport: " + airport, e);
}
}).thenAcceptAsync(destinations -> { //doesn't return a value but returns a completableFuture<Void> that will indicate the side work (populating futures) is done
for (String cityRaw : destinations) {
String city = URLEncoder.encode(cityRaw, StandardCharsets.UTF_8);
/*
TEMP/HUMID/SCORE TASK LOGIC DEFINED ABOVE
*/
destinationCandidatesFutures.add(destinationTask);
}
})
).toList();
CompletableFuture<Void> destinationsDone = CompletableFuture.allOf(destinationFetchTasks.toArray(new CompletableFuture[0]));
destinationsDone.join(); // make sure all destinations are discovered
CompletableFuture<Void> allDone = CompletableFuture.allOf(destinationCandidatesFutures.toArray(new CompletableFuture[0]));
List<DestinationCandidate> destinationCandidates = allDone.thenApply(x ->
destinationCandidatesFutures.stream()
.map(CompletableFuture::join)
.toList()
).join();
int minScore = destinationCandidates.stream()
.mapToInt(DestinationCandidate::getScore)
.min()
.orElseThrow();
List<DestinationCandidate> bestDestinations = destinationCandidates.stream()
.filter(ws -> ws.getScore() == minScore)
.toList();
Instant end = Instant.now();
System.out.println("Best destination(s): " + bestDestinations +
" | Evaluated in " + Duration.between(begin, end).toMillis() + "ms");
}
public static void shutdownExecutors() {
weather_executor.shutdown();
humidity_executor.shutdown();
}
}
The executors shutdown lives in its separate static method that is called from the main method.
I had initially inserted them at the bottom of the run() method but in an attempt to successively get the
best destinations from London, Madrid and Paris, I realised the executors were being shutdown prematurely after the first
run. Call it a rookie mistake - do I care? I’m being called to Marrakesh 🍹!
Best destination(s) from London: [WeatherStatus{originAirport=STN, temperature=24, humidity=47, score=2, city='Marrakesh'}] | Evaluated in 1180ms