by Viktor Klang
Futures are a neat way to use multiple cores without having to worry about managing specific threads or tasks. If you have some work that you want to get done while you get on with something else, then you simply create one or more Future(s) and give them the work to do in the form of a function that returns a result. Later you can test the Future for completion and pick up the result. An elegant way to get work done that you need in the Future!
Akka, part of the Typesafe Stack, has had a Future implementation for a long time. It’s so useful we decided to put a major effort into enhancing the capability, making it faster, more scalable and more intuitive to use. Key parts of the underpinning code used locks and blocking to ensure the integrity of the multi-threaded execution. These do not scale well in multi processor environments, as processors must wait while contentions are resolved. So we spent a lot of time to create a not-so-obvious non-blocking alternative. With the new implementation, things scale better and execution time is faster.
Comprehensions are the way we Scala programmers tend to think about doing things on collections. It is one of the function styles that leads to concise and easy to read code. We re-thought the Futures library to facilitate using Comprehensions and the more typical Scala “foreach” pattern, You can now handle sequences in an intuitive non-blocking way. We have found it a whole lot more powerful way to express our intentions.
Now let’s take a detailed look how the changes look at the code level:
akka.dispatch.Futures
there are new, Java API methods named "future
" to use Callables
that are executed on another thread to produce the result in a Future
. This means that they are non-blocking and you have the option to specify which Dispatcher
will execute your callable.Futures.awaitAll
has been deprecated in favor of: futures.foreach(_.await)
Futures.awaitOne
has been deprecated in favor of: firstCompletedOf(futures).await
. Because firstCompletedOf
is non-blocking, and await
is blocking, it's better if this is not encouraged by the APIFutures.awaitMap
has been deprecated in favor of: futures map { f => fun(f.await) }
. This is because Future
now supports a non-blocking map, which wasn't the case before, so you'd have to await before mappingFutures.reduce
.Futures.sequence
method takes a Traversable[Future[T]]
and non-blockingly returns a Future[Traversable[T]]
.Futures.traverse
method transforms a Traversable[A]
to a Future[Traversable[B]]
using a provided function from A
to Future[B]
. This is a great way of performing "map" in parallel.DataFlowVariable
to Future
. This includes:
apply: val f: Future[Int] = ... val i = f() // Logically f.await.resultOrException.get <<: val f: CompletableFuture[Int] = ... // Write side of Future, compare with the concept of Promise, or DataFlowVariable f << 5 // Sets the value 5 into f, since CompletableFutures can only be written once, they act like dataFlowVariables f << otherF // You can also set the value to the value of another Future, this will be done when that Future is completed // same behavior as CompletableFuture.completeWith(f: Future[...])
Future.flow
method, like this:
import Future.flow def add(a: Future[Int], b: Future[Int]): Future[Int] = flow { a() + b() }
Future
now sports a couple of new methods:
get: //Warning! Blocking val f: Future[Int] = ... f.get // Semantically f.await.resultOrException.get but for use in a non "flow" context value: val f: Future[Int] = ... val v: Option[Either[Throwable, Int]] = f.value // The current value of the Future, None if no value, and Left(error) or Right(result) otherwise onResult: val f: Future[Any] = ... f onResult { case "foo" => doSomething case 6 => doSomethingElse case SomeRegex(param) => doSomethingOther case _ => doAnything } // Applies the specified partial function to the result of the future when it is completed with a result recover: val f: Future[Any] = ... val result = f recover { case n: NumberFormatException => 0 } // Returns a new future that when the first future has been completed with an exception, will contain the transformed result" onException: val f: Future[Any] = ... f onException { case npe: NullPointerExcep => doSomething case 6 => doSomethingElse case SomeRegex(param) => doSomethingOther case _ => doAnything } // Applies the specified partial function to the result of the future when it is completed with an exception onTimeout: val f: Future[Any] = ... f onTimeout { future => doSomethingWhenTimeout(future) }
Future
is now fully monadic so it can be used in for-comprehensions; all methods: map, flatMap, filter and foreach are non-blockingThese are exciting times. With Typesafe and Akka, the Future is here!