The Futures Abstraction
Concurrency has many different abstractions: There are the classic processes and threads, the older but newly-new actor-based concurrency, CSP style concurrency (looking at you Go), and the quite popular future/promise. There are more, but one thing they all have in common is the goal to create a better way to achieve concurrent computation. This is great, if we understand what we’re abstracting.
When it comes to futures, there are a great number of articles online about how to get started using a particular implementation/framework for futures. However there is a much smaller number of articles that properly explain what a future is, how they work, and what exactly the abstraction provides you. This article will walk through those details by implementing futures from the ground-up.
Before we jump right into code, what is the definition of future? From Wikipedia we get:
… future, promise, delay, and deferred refer to constructs used for synchronizing program execution in some concurrent programming languages. They describe an object that acts as a proxy for a result that is initially unknown, usually because the computation of its value is yet incomplete.
A future represents the result of some computation, which will eventually yield a result. The “eventually” bit typically implies that there is some computation that is executing asynchronously (in another thread, for example) but can also imply a deferred value. You can think of a deferred value as a lazy value that is not computed until needed. For this article, we will assume that a future is the result of an async computation. Note that different implementations may use the names “future,” “promise,” and “deferred” interchangeably and may differ from this post.
Great! Now that we understand what a future is, let’s build our abstraction from the ground-up.
The Tools
For this post we’ll be using Scala without any any additional libraries and will only be utilizing threads for our concurrency. We’ll also make use of some data-structures for sharing values between threads. Even though I am using Scala, I will avoid any advanced syntax so those unfamiliar can still (hopefully) follow along.
Part 1 - Concurrent Computations
Before we can get into the meat of our abstraction, we need to at least be able to execute some code concurrently, which can be done simply with threads. Using a new thread for each computation looks like:
This is fairly simple, but some things are worth pointing out.
- 1)
Unit
means that we didn’t return a value (similar tovoid
in other languages). - 2) The
Thread
object does not start executing therun
function until we callstart()
on line 7.
Despite its simplicity, this is a lot of code to type for each block of code we want to run concurrently.
Let’s rewrite this as a function that we can pass a block of code into. For this article, all code
will live in the simple.future
namespace.
1
2
3
4
5
6
7
8
9
10
11
package simple.future
def future(block: => Unit): Unit = {
val concurrentComputation = new Thread(new Runnable {
def run(): Unit = {
block
}
})
concurrentComputation.start()
}
And we can now call our function like so
The Scala bits that may throw you here are
- 1)
block: => Unit
is a pass-by-name value that returns aUnit
(not executed until called). - 2)
block
on line 6 evaluates the user-provided code within therun
method, which is called once theThread
is running. - 3)
{ ... }
is a block of code that is passed tofuture
as the first parameter to the function.
Just a quick note on point #1: Pass-by-name means that we can pass around blocks of user-code without actually executing them. Only once we request the “value” from the block (like we do on line 6 above), is the code executed. This is essential for how our futures execute code asynchronously.
Fantastic! Now we just need to somehow return a result and we’re done, right? Well, not exactly. While that would get us pretty close to the initial definition, I want to stay a little more grounded and mimic what a future implementation would do in practice. We’re also missing something from our current function.
In a real, multi-threaded application there are often multiple operations happening at once; many of which may be simultaneously using futures. However, not all futures are equally important. For example, pushing work to a queue that came from a user is arguably more important than flushing metrics to your metrics store. Currently, we create a thread for each future. It is up to the threads to fight it out with the JVM/OS scheduler for resources. A better solution would make use of thread pools.
Thread pools are small groups of threads that work is executed on. If there is more work than threads in the pool, then they are queued up and executed once a thread from the pool is available. We can control priority within our application by creating larger pools for more important, time-sensitive work and smaller pools for less important, less critical work.
Let’s rewrite our future
function to work with a Java thread pool!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package simple.future
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
def future(tp: ThreadPoolExecutor)(block: => Unit): Unit = {
if (tp.isShutdown || tp.isTerminating || tp.isTerminated) {
throw new RuntimeException(
"Cannot execute on threadpool that is not active/running.")
}
tp.execute(new Runnable {
def run(): Unit = {
block
}
})
}
val tp = new ThreadPoolExecutor(4, 4, 0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue[Runnable])
future(tp) { print("I'm running concurrently! whoop!") }
We now have a future
function that works with thread pools! To explain some things
- 1)
future
has multiple parameter lists(...)
. This allows us to continue to use the block-syntax for user-provided code{ ... }
. - 2)
future
now throws aRuntimeException
if it is given aThreadPoolExecutor
that is not usable. - 3)
tp.execute(...)
adds work to the thread pool’s queue and runs it asynchronously. - 4)
tp
on line 19 is a thread pool with exactly 4 threads which uses aLinkedBlockingQueue
as the backing queue when there is more work than available threads.
Part 2 - Returning the Result
We’re quickly working our way towards a future implementation, but currently we don’t have a way to return a value back to the caller. According to our definition at the beginning, we need to use some sort of “proxy” object that will eventually contain the value. Let’s define what this proxy object could look like.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package simple.future
class Future[T] {
private var result: Option[T] = None
// writer functions
def write(value: T): Unit = {
this.result = Some(value)
}
// read functions
def isReady: Boolean = result.isDefined
def get: T = {
if (! isReady) {
throw new RuntimeException(
"Future is not ready yet, no value to get")
}
return result.get
}
}
Some quick notes
- 1)
[T]
is a generic type ofT
to be determined by the user. - 2)
Option
is a type that is either present (Some
) or absent (None
). - 3) If the future is not completed, we throw an exception on access.
Now that we have our “proxy” object, we can update our future
function to return a value using our new
Future[T]
class.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package simple.future
def future[T](tp: ThreadPoolExecutor)(block: => T): Future[T] = {
val fut = new Future[T]
if (tp.isShutdown || tp.isTerminating || tp.isTerminated) {
throw new RuntimeException(
"Cannot execute on threadpool that is not active/running.")
}
tp.execute(new Runnable {
def run(): Unit = {
// call user-provided code and write result to the future
val result = block
fut.write(result)
}
})
return fut
}
Some quick notes:
- 1) The
block
is now a block of code that evaluates to a typeT
. - 2)
future
now has type-parameter ofT
and a return type ofFuture[T]
. - 3) The value of the
Future[T]
is written inside therun
function.
With this newly updated function, we can still call it the same way. The exception is that it now means something if we capture the result.
Awesome! We have our proxy object. But the next question becomes, “How do I use it?” If we wanted to
wait on the call we made above for f
, we might do something like the following:
This code is far from ideal, but it works for what we want to do. Later on in this article we’ll get to more advanced (and more idiomatic) techniques for working with future values.
However, even though this works, there are two issues with this setup: The first is that anyone can
write a value into the future. Ideally this should be hidden as part of the async processing and
separated from the Future
object. For example, this code should not be valid:
The second issue is that the value is not immutable. Once it is written, it should not be possible
to change the value. As in our example above, once the future stops sleeping, the value would change
to 5
. A future is always the result of a single computation, and that value must never change.
First let’s address the issue that anyone can write into our Future
. The common way this is solved
is to introduce another type, called the promise. If a future is meant to read the value, then the
promise is meant to write the value. By creating two separate objects, we can hand them to the
appropriate parties. Let’s see what this would look like:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package simple.future
class Future[T](private val promise: Promise[T]) {
def isReady: Boolean = promise.isReady
def get: T = promise.get
}
class Promise[T] {
private var result: Option[T] = None
lazy val futurePair: Future[T] = new Future[T](this)
// writer functions
def write(value: T): Unit = this.synchronized {
this.result = Some(value)
}
// reader functions
def isReady: Boolean = this.synchronized { result.isDefined }
def get: T = this.synchronized {
if (!isReady) {
throw new RuntimeException(
"Future is not ready yet, no value to get")
}
result.get
}
}
With this new version we can see that Future
is now just a read-only interface to our new Promise
class, and that each future contains a private reference to its Promise
pair. The new Promise
class
is mostly the same as our original Future
implementation with two exceptions:
- 1) A new
futurePair
value which creates aFuture
object that is paired with the currentPromise
object. This is only generated when needed (thus thelazy
part). - 2) All read and write methods are now contained in
this.synchronizedblocks
, which synchronizes access to thePromise
object. This is good practice for any object that will be accessed from multiple threads.
However, even with this separation of concerns, Promise
can be written to more than once. Since we’ve
already synchronized our methods (ensuring no concurrent updates), we can add a simple check to solve
our problem:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package simple.future
class Promise[T] {
private var result: Option[T] = None
private var isWritten: Boolean = false;
// writer functions
def write(value: T): Unit = this.synchronized {
if (this.isWritten) {
throw new RuntimeException(
"Cannot write value. Already written" )
}
this.result = Some(value)
this.isWritten = true
}
// other functions ...
}
Part 3 - Handling Failures
We can now return values from futures, but what happens if the computation fails and we throw an exception? How do we handle that? What value is returned for the future?
Well exceptions are easy, we can just use a try-catch block for that, right?
I’m sorry, but no. This is never going to work.
Why you ask? Well, the throw
in the above code is actually not called on the current thread.
Remember that the code in block
is not evaluated until called within the Runnable.run
method. It
is called asynchronously on another thread — and guess what? That exception will not propagate to
the current thread the above code is executing on. What would actually happen is that future f
would
just never receive a value because Promise.write
would never be called.
Well the answer is simple right? Just put the try-catch inside the Runnable.run
function that future
creates to execute the code block. Well, what if an exception occurs, what value do you return? To
handle this, we need to wrap our T
value in Future[T]
in a success/fail object:
With these new types, we can now represent the result of a future, previously just T
, as a Try[T]
.
Of course with this, we need to update our read and write functions:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package simple.future
class Future[T](private val promise: Promise[T]) {
def get: Try[T] = promise.get
// ...
}
class Promise[T] {
private var result: Option[Try[T]] = None
def write(value: Try[T]): Unit = // ...
def get: Try[T] = // ...
// ...
}
As well as our future
function:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package simple.future
def future[T](tp: ThreadPoolExecutor)(block: => T): Future[T] = {
val promise = new Promise[T]
// omitted code
tp.execute(new Runnable {
def run(): Unit = {
try {
// call user-provided code and write result to the future
val result = block
promise.write(Success(result))
} catch {
case t: Throwable => promise.write(Failure(t))
}
}
})
return promise.futurePair
}
Now that we are wrapping values into either a success or failure state and capturing any exceptions, we can now write code like:
Here we are creating a future which may or may not fail, depending on a randomly generated boolean.
Once we’ve waited for it to complete, we access the value with get
and then use pattern matching
(the match
keyword and a popular feature of Scala) to either print the returned string or the
error trace, depending on the value returned.
Part 4 - Async Access
To recap what we’ve created so far:
- A
future
method that takes a block of code, executes on a thread-pool, and returns aFuture
. Future
class for reading values, and aPromise
for writing values.Try[T]
to represent success and failure values from our asynchronously executed code-block.
This is a good start to creating a realistic implementation of futures. However, there are still some very big holes. Namely, accessing the final value leaves a lot to be desired. Currently to get the value we have to block on the current thread and wait for the future to be completed.
The easy way to not block is to provide callback functions. Callbacks are user-provided blocks of code
that are registered with the Future
object and called once the Future
is ready. We’ll define three
functions: onComplete
, onSuccessand
onFailure
. Let’s start first with onComplete
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package simple.future
class Future[T](private var promise: Promise[T]) {
// ...
def onComplete(tp: ThreadPoolExecutor)(f: Try[T] => Unit): Unit = {
promise.registerOnComplete(tp, f)
}
}
class Promise[T] {
// ...
var onCompleteHandlers: List[(ThreadPoolExecutor, Try[T] => Unit)] = Nil
def registerOnComplete(tp: ThreadPoolExecutor, f: Try[T] => Unit): Unit = {
// add 2-tuple of thread-pool and function to list
onCompleteHandlers ::= (tp, f)
}
def callOnCompletes(value: T): Unit = {
for (handler <- onCompleteHandlers) {
val tp: ThreadPoolExecutor = handler._1
val f: Try[T] => Unit = handler._2
future(tp) {
f(value)
}
}
}
def write(value: Try[T]): Unit = this.synchronized {
if (this.isWritten) {
throw new RuntimeException(
"Cannot write value. Already written")
}
this.result = Some(value)
this.isWritten = true
this.callOnCompletes(value)
}
}
We now have a new method on Future
of onComplete
which registers a callback handler with Promise
.
The reason that Promise
handles the firing of the callback is because it maintains the relationship
that Future
is read-only and that Promise
is the write interface. With this, we can fire the callbacks
as soon as Promise.write
is called.
Some final notes about the above code:
- 1)
onCompleteHandlers
is a list of 2-tuples containing the thread pool and the block of code to execute on the result. It is also initialized to an empty list (Nil
). - 2) The code
onCompleteHandlers ::= (tp, f)
appends a 2-tuple to the listonCompleteHandlers
. - 3) Values in 2-Tuples are accessed by
x._1
andx._2
as on line 23 and 24.
And we can now use our new function like:
With this, we could easily imagine how to write other functions such as onSuccess
, onFailure
, etc.
Part 5 - Composing Futures
Callbacks are an excellent way to access data retrieved/generated from an asynchronous operation. However, what if we wanted to combine the value from two async operations, or we wanted to transform the result of an async operation, but still maintain a future to that transformed value in the current thread? This is where futures really shine: The ability to combine the result of multiple futures together.
To achieve this, we need to have callbacks that return yet another Future
as a proxy object to a
result of the callback. The first function we’ll look at is map with the following signature:
map
takes a thread pool and a function and returns another future. The user-provided function, f
,
takes as a parameter a successful value from the current future and returns a value of type U
.
However map
returns a value of type Future[U]
. This allows us to call map
on a Future
that may or
may not be completed. This means that we can’t return the result of our transformation (f
) directly,
but must use another Future
. This is best explained with some examples:
Because f
takes at least 2 seconds to run, you can see why we can’t immediately apply our
transformations to the value. Instead we use map
to defer the transformations for when the future
has completed, but we can still hold onto a proxy object representing the result of those
calculations.
But what happens if the future being mapped fails? In that case the error propagates to all failed values:
The above code would print out a stacktrace for f10
. The stacktrace would say “whoops!” and would
be the same exception from f
passed along to f10
. This makes sense, as we cannot transform an
exception as if it were a successful value.
Enough examples, let’s take a stab at implementing map
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package simple.future
class Future[T](...) {
def map[U](tp: ThreadPoolExecutor)(f: T => U): Future[U] = {
promise.map(tp)(f)
}
}
class Promise[T] {
def map[U](tp: ThreadPoolExecutor)(f: T => U): Future[U] = {
val mapPromise = new Promise[U]
// create onComplete block that calls user-function (f) on success
// and passes through exceptions on failure
val block: Try[T] => Unit = { res: Try[T] =>
res match {
case Success(v) => futureWithPromise(tp, mapPromise) {
f(v)
}
case Failure(t) => mapPromise.write(Failure(t))
}
}
registerOnComplete(tp, block)
return mapPromise.futurePair
}
}
You can see that we are defining this on Promise
. This is because Promise
is our writable interface
and we want to take advantage of the registerOnCompletefunction
we saw earlier. map
works by creating
a new promise/future pair and returning the future immediately. However the callback that is
registered allows us to call our user-provided function f
in the callback and write the result to the
new promise/future pair we created. For failures we can see the write happening:
This allows us to propagate failure and by-pass the user-provided code which expects a value. This allows us to call map without having to worry about additional error handling.
But wait, what is this futureWithPromise
function, and what does it do? Remember future
from earlier
that takes a block of user code and a thread pool and returns a future? Well this is pretty much the
same thing except that it takes a pre-created promise object. With this, we can re-define our
future
method as well (to avoid duplication) and we get:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package simple.future
def futureWithPromise[T](tp: ThreadPoolExecutor, promise: Promise[T])
(block: => T): Future[T] = {
if (tp.isShutdown || tp.isTerminating || tp.isTerminated) {
throw new RuntimeException(
"Cannot execute on threadpool that is not active/running.")
}
tp.execute(new Runnable {
def run(): Unit = {
try {
// call user-provided code and write result to the future
val result = block
promise.write(Success(result))
} catch {
case t: Throwable => promise.write(Failure(t))
}
}
})
return promise.futurePair
}
def future[T](tp: ThreadPoolExecutor)(block: => T): Future[T] = {
val promise = new Promise[T]
return futureWithPromise(tp, promise)(block)
}
We can see that futuerWithPromise
is basically our previous implementation of future and future is
now just a convenience method for when you don’t have your own Promise already created.
So much more!
When it comes to composing futures, there is often a myriad of functions available to transform futures in different ways. For example, these two functions are very common (although the naming may differ across implementations):
flatMap
is very much like our map
, but this allows the user-provided code to return a Future[U]
instead of just a U
. Think of making a web-request which returns a future, and then using the result
of the first web-request to make a second web-request. For this case you would use a flatMap
.
recover
is a way to not only handle a failed future, but also provide/calculate some sort of default
value. Imagine making a web-request and, assuming it fails, using recover to provide a cached result.
When combining recover with other compositing functions like map and flatMap
, we can define a future
with all of our transformations and error handling in a declarative way.
Since you’ve seen the basic format for implementing these type of functions (with the map
example),
I’ll leave it as an exercise to the reader (you) to define flatMap
and recover
.
Conclusion
That was quite a bit of work! This is still a very bare-bones, basic implementation of futures but, it should be enough to give us some solid ground to stand on for understanding futures, and hopefully understanding why this is a useful (in my humble opinion) and popular abstraction. If you’re interested in what a real-world future implementation looks like, check out some of the links below (note that this post most closely mimics the API of the Scala implementation):