In my recent models serving book I introduced treating models as data and showed how this approach can significantly simplify model serving, including real-time model updates. The main limitation of the solution presented in the book is a single model per data type, which is rarely the case in the real life deployments. As described in detail by Ted Dunning, in his Machine Learning Logistics book, in real life deployments, there is typically an ensemble of models scoring the same data item in parallel and then a decision block decides which result to use. In this blog post, I describe how to extend the solution described in the models serving book to support speculative model serving and I provide an implementation based on Akka Streams. The complete code of the implementation described in this post is available from GitHub.
According to Wikipedia, speculative execution is:
an optimization technique where a computer system performs some task that may not be needed. Work is done before it is known whether it is actually needed, so as to prevent a delay that would have to be incurred by doing the work after it is known that it is needed. If it turns out the work was not needed after all, most changes made by the work are reverted and the results are ignored.
The objective is to provide more concurrency if extra resources are available. This approach is employed in a variety of areas, including branch prediction in pipelined processors, value prediction for exploiting value locality, prefetching memory and files etc.
In the case of Model Serving, speculative execution means scoring data in parallel leveraging a set of models, then selecting the best score based on some metric. The use case where this becomes important is where there are several models that differ in performance or result quality from one case to another.
The importance of speculative execution in model serving stems from the ability of such a approach to provide the following features for machine serving applications:
The overall architecture for speculative model serving is presented below:
Here, incoming data is feed into both the Model Serving Controller and the Model Learning (training) component. The Model Learning component is used for periodic recalculation of models, after which updates are pushed into Kafka for ingestion by the Model Servers. In the heart of our implementation is the Model Serving Controller responsible for orchestration of execution of individual Model Servers and deciding on the final model serving result.
Once replies from all of the individual Model Servers are received or the wait time expires, the Model Serving Controller chooses the reply, based on some defined criteria. The reply is propagated downstream.
Individual Model Servers process the input data based on their current model and return results back to the Model Serving Controller. Additionally, the Model Servers are listening to the Kafka topics with model updates.
The model serving result can be accompanied by a confidence level. This information is optional and, if present, can be used by the Model Serving Controller to evaluate the result. Additional optional fields can be added to the Model Server replies for use when evaluating the results.
An additional aspect in the overall system is configuration of speculative server. Using another Kafka topic allows change models ensemble for a given data type and some of decision making configurations.
So that the Model Serving Controller knows when a reply has been received for a particular datum, a “handshake” in our implementation is provided by a unique id (GUID), which is passed to the Model Servers and is returned back to the controller along with the serving result.
With this in place, messages for invoking a Model Server and its reply can be represented as follows (we are using Protocol Buffers for encoding messages):
syntax = "proto3";
option java_package = "com.lightbend.speculative";
// Description of the model serving request.
message ServingRequest {
string uuid = 1;
bytes data = 2;
}
// Description of the model serving response.
message ServingResponse {
string uuid = 1;
bytes data = 2;
double confidence = 3;
repeated ServingQualifier qualifiers = 4;
}
// Description of the model serving qualifier.
message ServingQualifier{
string key = 1;
string value = 2;
}
Although there are many different options for implementing this architecture, we will show how to implement it leveraging Akka Streams integrated with Actors. We will implement individual model servers and model service controller as Actors and use Akka Streams for overall orchestration of execution and Akka HTTP for providing HTTP access to our implementation.
In addition to the speculative model serving functionality our implementation also includes two additional components:
- Custom persistence implementation (file-based persistence) allowing us to load or restore the state of participating actors, including currently running models and the overall configuration of speculative execution - model ensembles for a given data type and some of decision making configurations, etc. - Queryable state state implementation (compare to queryable state Flink and Kafka Streams) allowing us to observe execution - statistics for individual and speculative models processing. When creating an Akka Actors application we need to first design the actor hierarchy. Following this design documentation, we decided on the following actor hierarchy
The actors in this hierarchy are described below.
Model service manager Actor, a singleton that provides the entry point into the whole actor system. It is responsible for overall management of our application actors and routing messages across the rest of the actors. It supports all the methods provided by the rest of the actors by forwarding client requests to appropriate executors (ModelManager or DataManager). Implementation of this actor is fairly straightforward:
class ModelServingManager extends Actor {
implicit val askTimeout = Timeout(100, TimeUnit.MILLISECONDS)
implicit val ec = context.dispatcher
println(s"Creating Model Serving manager")
// Create support actors
val modelManager = context.actorOf(ModelManager.props, "modelManager")
val dataManager = context.actorOf(DataManager.props, "dataManager")
override def receive = {
// Model methods
// Update model
case model: ModelWithDescriptor => modelManager forward model
// Get list of model servers
case getModels : GetModels => modelManager forward getModels
// Get state of the model
case getState: GetModelServerState => modelManager forward getState
// Data methods
// Configure Data actor
case configuration : SpeculativeDescriptor =>
ask(modelManager, GetModelActors(configuration.models)).mapTo[GetModelActorsResult]
.map(actors => SetSpeculativeServer(configuration.datatype, configuration.tmout, actors.models.toList))
.pipeTo(dataManager)(sender())
// process data
case record: WineRecord => dataManager forward record
// Get state of speculative executor
case getState: GetSpeculativeServerState => dataManager forward getState
// Get List of data processors
case getProcessors : GetDataProcessors => dataManager forward getProcessors
}
}
Models manager is also a singleton responsible for managing all model servers. It is also responsible for routing model updates to them. The code for this class is presented below:
class ModelManager extends Actor {
// This is just for testing
def gen = ThreadLocalRandom.current()
private def getModelServer(modelID: String): ActorRef =
context.child(modelID).getOrElse(context.actorOf(ModelServingActor.props(modelID), modelID))
private def getInstances : GetModelsResult =
GetModelsResult(context.children.map(_.path.name).toSeq)
override def receive = {
// Redirect to model update.
case model: ModelWithDescriptor =>
// This is just for testing
val models = getInstances.models
val modelServer = getModelServer(models(gen.nextInt(models.size)))
modelServer forward model
// Get State of model server
case getState: GetModelServerState => {
context.child(getState.ModelID) match {
case Some(actorRef) => actorRef forward getState
case _ => sender() ! ModelToServeStats.empty
}
}
// Get current list of existing models
case getModels : GetModels => sender() ! getInstances
// Create actors from names. Support method for data processor configuration
case createList : GetModelActors => sender() ! GetModelActorsResult(createList.models.map(getModelServer(_)))
}
}
This actor is an implementation of the model server responsible for serving an individual model. Implementation of this actor is based on the models serving book with the addition of a local persistence implemented to make model updates durable in the event of restarts. It looks as follows:
class ModelServingActor(modelID : String) extends Actor {
println(s"Creating model serving actor $modelID")
private var currentModel: Option[Model] = None
private var newModel: Option[Model] = None
private var currentState: Option[ModelToServeStats] = None
private var newState: Option[ModelToServeStats] = None
// For testing
def gen = ThreadLocalRandom.current()
override def preStart : Unit = {
val state = FilePersistence.restoreModelState(modelID)
newState = state._2
newModel = state._1
}
override def receive = {
// Update Model. This only works for the local (in memory) invocation, because ModelWithDescriptor is not serializable
case model : ModelWithDescriptor =>
// Update model
println(s"Model Server $modelID has a new model $model")
newState = Some(ModelToServeStats(model.descriptor))
newModel = Some(model.model)
FilePersistence.saveModelState(modelID, newModel.get, newState.get)
sender() ! "Done"
// Process data
case record : ServingRequest =>
// Process data
newModel.foreach { model =>
// Update model
// close current model first
currentModel.foreach(_.cleanup())
// Update model
currentModel = newModel
currentState = newState
newModel = None
}
currentModel match {
case Some(model) =>
val start = System.nanoTime()
val quality = model.score(record.data.asInstanceOf[WineRecord]).asInstanceOf[Double]
// Just for testing
Thread.sleep(gen.nextInt(20)*10l)
val duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)
currentState = currentState.map(_.incrementUsage(duration))
sender() ! ServingResponse(record.GUID, ServingResult(modelID, quality, duration))
case _ =>
sender() ! ServingResponse(record.GUID, ServingResult.noModel)
}
// Get current state
case request : GetModelServerState => {
// State query
sender() ! currentState.getOrElse(ModelToServeStats.empty)
}
}
}
Persistence here is using a custom persistence implementation, that can be found at com.lightbend.akka.speculative.persistence.FilePersistence
Data manager is also a singleton, responsible for management of all speculative model serving controllers including their creation, configuration and routing model serving data requests to them.
class DataManager extends Actor {
println(s"Creating Data manager")
private def getDataServer(dataType: String): Option[ActorRef] = context.child(dataType)
private def createDataServer(dataType: String, tmout : Long, models : List[ActorRef]) : ActorRef=
context.actorOf(SpeculativeModelServingActor.props(dataType, tmout, models), dataType)
private def getInstances : GetDataProcessorsResult =
GetDataProcessorsResult(context.children.map(_.path.name).toSeq)
override def receive = {
// Configuration update
case configuration : SetSpeculativeServer =>
getDataServer(configuration.datatype) match {
case Some(actor) => actor forward configuration // Update existing one
case _ =>
createDataServer(configuration.datatype, configuration.tmout, configuration.models) // Create the new one
sender() ! "Done"
}
// process data record
case record: WineRecord => getDataServer(record.dataType) match {
case Some(actor) => actor forward record
case _ => sender() ! ServingResult.noModel
}
// Get current state
case getState: GetSpeculativeServerState => {
getDataServer(getState.dataType) match {
case Some(actorRef) => actorRef forward getState
case _ => sender() ! SpeculativeExecutionStats.empty
}
}
// Get List of data processors
case getProcessors : GetDataProcessors => sender() ! getInstances
}
}
This actor is an implementation of the speculative model serving controller responsible for coordination of individual model servers and deciding on the result. It uses a set of model servers based on the following parameters:
- List of model servers - list of ActorRefs used for actual model serving - Timeout - a time to wait response from the actual model server (by increasing this you can ensure that all model servers will have chance to deliver a result) - Decider - a custom class implementing Decider
. This class needs to implement a single method decideResult choosing 1 of the results collected by the speculative model server in the given time interval. Different implementations of this class can provide different speculative model processing policies. This is the most complex actor implemented as follows
class SpeculativeModelServingActor(dataType : String, tmout : Long, models : List[ActorRef]) extends Actor {
val ACTORTIMEOUT = new FiniteDuration(100, TimeUnit.MILLISECONDS)
val SERVERTIMEOUT = 100l
private val modelProcessors = models.to[ListBuffer]
implicit var askTimeout = Timeout(if(tmout <= 0) tmout else SERVERTIMEOUT, TimeUnit.MILLISECONDS)
val decider = SimpleDesider
implicit val ec = context.dispatcher
var state = SpeculativeExecutionStats(dataType, decider.getClass.getName, askTimeout.duration.length, getModelsNames())
override def preStart : Unit = {
// Restore state from persistence
val state = FilePersistence.restoreDataState(dataType)
state._1.foreach(tmout => askTimeout = Timeout(if(tmout > 0) tmout else SERVERTIMEOUT, TimeUnit.MILLISECONDS))
state._2.foreach(models => {
modelProcessors.clear()
models.foreach(path => context.system.actorSelection(path).resolveOne(ACTORTIMEOUT).onComplete {
case Success(ref) => modelProcessors += ref
case _ =>
}
)})
}
override def receive = {
// Model serving request
case record : WineRecord =>
val request = ServingRequest(UUID.randomUUID().toString, record)
val start = System.nanoTime()
Future.sequence(
// For every available model
modelProcessors.toList.map(
// Invoke model serving, map result and lift it to try
ask(_,request)(askTimeout).mapTo[ServingResponse]).map(f => f.map(Success(_)).recover({case e => Failure(e)})))
// collect all successful serving
.map(_.collect{ case Success(x) => x})
// Invoke decider
.map(decider.decideResult(_)).mapTo[ServingResult]
// Update stats
.map(servingResult => {
if(servingResult.processed)
state = state.incrementUsage(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start),
servingResult.actor)
servingResult
})
// respond
.pipeTo(sender())
// Current State request
case request : GetSpeculativeServerState => sender() ! state
// Configuration update
case configuration : SetSpeculativeServer =>
askTimeout = Timeout(if(configuration.tmout > 0) configuration.tmout else SERVERTIMEOUT, TimeUnit.MILLISECONDS)
modelProcessors.clear()
modelProcessors ++= configuration.models
state.updateConfig(askTimeout.duration.length, getModelsNames())
FilePersistence.saveDataState(dataType, configuration.tmout, configuration.models)
sender() ! "Done"
}
private def getModelsNames() : List[String] = modelProcessors.toList.map(_.path.name)
}
Execution of the individual actors is orchestrated by the Akka Stream implementation. The overall code for wine quality evaluation (models serving book) is presented below:
object AkkaModelServer {
// Set up Akka execution environment
implicit val system = ActorSystem("ModelServing")
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher
implicit val askTimeout = Timeout(30.seconds)
// Define Kafka reader settings
val dataConsumerSettings = ConsumerSettings(system, new ByteArrayDeserializer, new ByteArrayDeserializer)
.withBootstrapServers(KAFKA_BROKER)
.withGroupId(DATA_GROUP)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val modelConsumerSettings = ….
val speculativeConsumerSettings = ….
def main(args: Array[String]): Unit = {
// Create necessary actors
val models = List("model1", "model2", "model3")
val modelserver = system.actorOf(ModelServingManager.props)
ask(modelserver, SpeculativeDescriptor("wine", 100, models)).onComplete{
case Success(r) => println("Data Server initialized")
case _ =>
}
// Speculative stream processing
Consumer.atMostOnceSource(speculativeConsumerSettings, Subscriptions.topics(SPECULATIVE_TOPIC))
.map(record => SpeculativeConverter.fromByteArray(record.value)).collect { case Success(a) => a }
.mapAsync(1)(elem => modelserver ? elem)
.runWith(Sink.ignore) // run the stream, we do not read the results directly
// Model stream processing
Consumer.atMostOnceSource(modelConsumerSettings, Subscriptions.topics(MODELS_TOPIC))
.map(record => ModelToServe.fromByteArray(record.value)).collect { case Success(a) => a }
.map(record => ModelWithDescriptor.fromModelToServe(record)).collect { case Success(a) => a }
.mapAsync(1)(elem => modelserver ? elem)
.runWith(Sink.ignore) // run the stream, we do not read the results directly
// Data stream processing
Consumer.atMostOnceSource(dataConsumerSettings, Subscriptions.topics(DATA_TOPIC))
.map(record => DataRecord.fromByteArray(record.value)).collect { case Success(a) => a }
.mapAsync(1)(elem => (modelserver ? elem).mapTo[ServingResult])
.runForeach(result => {
result.processed match {
case true => println(s"Calculated quality - ${result.result} by actor ${result.actor}.Calculated in ${result.duration} ms")
case _ => println ("No model available - skipping")
}
})
// Rest Server
startRest(modelserver)
}
This implementation relies on REST Server implementing queryable state similar to the implementation described in models serving book.
Although the implementation described above works fine, the actor logic for the speculative model serving might be hard to read and debug. An alternative implementation, splitting this actor into speculative execution starter and collector is provided as well. An overall architecture for such an implementation is presented below:
This architecture is implemented with the following actor’s:
Here we introduced 2 additional actors - starter/collector pair.
Speculative Execution Starter is responsible only for starting speculative model serving for a given datum. It sends a notification to the Speculative Execution Collector about the start of a specific speculative execution (identified by GUID) and forwards model serving requests to all model servers (defined in the speculative model server configuration) specifying Speculative Execution Collector as a destination for serving result. The implementation of the actor is presented below
class SpeculativeModelServingStarterActor(dataType : String, models : List[ActorRef], collector :
ActorRef) extends Actor {
implicit val askTimeout = Timeout(100, TimeUnit.MILLISECONDS)
private val modelProcessors = models.to[ListBuffer]
implicit val ec = context.dispatcher
override def preStart : Unit = {
val state = FilePersistence.restoreDataState(dataType)
state._2 match {
case Some(models) =>
modelProcessors.clear()
ask(AkkaModelServer.modelserver, GetModelActors(models)).mapTo[GetModelActorsResult].onComplete {
case Success(servers) => modelProcessors ++= servers.asInstanceOf[GetModelActorsResult].models
case _ =>
}
case _ => // Do nothing
}
}
override def receive = {
// Model serving request
case record : WineRecord =>
val request = ServingRequest(UUID.randomUUID().toString, record)
collector ! StartSpeculative(request.GUID, System.nanoTime(), sender(), modelProcessors.size)
modelProcessors.foreach( _ tell(request, collector))
// Configuration update
case configuration : SetSpeculativeServerStarter =>
modelProcessors.clear()
modelProcessors ++= configuration.models
}
private def getModelsNames() : List[String] = modelProcessors.toList.map(_.path.name)
}
Speculative Execution Collector is responsible for collecting and processing results for model serving requests. It collects all serving results for a given GUID and calculates the final response based on the following parameters:
- Timeout - a time to wait response from the actual model server (by increasing this you can ensure that all model servers will have chance to deliver result) - Decider - a custom class implementing Decider
. This class needs to implement a single method decideResult
choosing 1 of the results collected by the speculative model server in the given time interval. Different implementations of this class can provide different speculative model processing policies. Implementation of the actor looks as follows
class SpeculativeModelServingCollectorActor(dataType : String, tmout : Long, models : List[String]) extends Actor {
val SERVERTIMEOUT = 100l
val decider = SimpleDesider
var timeout = new FiniteDuration(if(tmout > 0) tmout else SERVERTIMEOUT, TimeUnit.MILLISECONDS)
private val modelProcessors = models.to[ListBuffer]
var state = SpeculativeExecutionStats(dataType, decider.getClass.getName, timeout.length, models)
val currentProcessing = collection.mutable.Map[String, CurrentProcessing]()
implicit val ec = context.dispatcher
override def preStart : Unit = {
val state = FilePersistence.restoreDataState(dataType)
state._1.foreach(tmout => new FiniteDuration(if(tmout > 0) tmout else SERVERTIMEOUT, TimeUnit.MILLISECONDS))
state._2.foreach(models => {
modelProcessors.clear()
modelProcessors ++= models
})
}
override def receive = {
// Start speculative requesr
case start : StartSpeculative =>
// Set up the state
currentProcessing += (start.GUID -> CurrentProcessing(start.models, start.start, start.reply, new ListBuffer[ServingResponse]())) // Add to watch list
// Schedule timeout
context.system.scheduler.scheduleOnce(timeout, self, start.GUID)
// Result of indivirual model serving
case servingResponse : ServingResponse =>
currentProcessing.get(servingResponse.GUID) match {
case Some(processingResults) =>
// We are still waiting for this GUID
val current = CurrentProcessing(processingResults.models, processingResults.start, processingResults.reply, processingResults.results += servingResponse)
current.results.size match {
case size if (size >= current.models) => processResult(servingResponse.GUID, current) // We are done
case _ => currentProcessing += (servingResponse.GUID -> current) // Keep going
}
case _ => // Timed out
}
// Speculative execution completion
case stop : String =>
currentProcessing.contains(stop) match {
case true => processResult(stop, currentProcessing(stop))
case _ => // Its already done
}
// Current State request
case request : GetSpeculativeServerState => sender() ! state
// Configuration update
case configuration : SpeculativeDescriptor =>
timeout = new FiniteDuration(if(tmout > 0) tmout else SERVERTIMEOUT, TimeUnit.MILLISECONDS)
modelProcessors.clear()
modelProcessors ++= configuration.models
state.updateConfig(tmout, models)
FilePersistence.saveDataState(dataType, configuration.tmout, configuration.models.toList)
sender() ! "Done"
}
// Complete speculative execution
private def processResult(GUID : String, results: CurrentProcessing) : Unit = {
val servingResult = decider.decideResult(results.results.toList).asInstanceOf[ServingResult]
results.reply ! servingResult
if(servingResult.processed)
state = state.incrementUsage(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - results.start),servingResult.actor)
currentProcessing -= GUID
}
}
Splitting speculative model serving actor into two, although increases the amount of actors, allows to simplify their implementation and might be easier to debug and test
Execution of the individual actors is orchestrated by Akka stream implementation similar to the one presented above.
In this post I have described how to extend a model serving approach based on representation of model as data described in models serving book by incorporating speculative model serving as defined in Machine Learning Logistics book. I have defined the advantages of speculative model serving and presented an Akka-based implementation for it.