One of the challenges of implementing streaming applications is the need to gain access to a current application’s in-memory state. General approaches for obtaining this execution information include the following:
Although both of these approaches work and are often used, they typically suffer from the following limitations:
In order to overcome these drawbacks, version 0.10.1
of Kafka Streams introduced Interactive Queries.
Interactive Queries allow you to treat the stream processing layer as a lightweight embedded database and to directly query the latest state of your stream processing application, without needing to materialize that state to external storage first.
Kafka Streams’ Interactive Queries leverages local state information in the node where the application is running. If the application runs in a distributed mode on multiple nodes, then each node contains the respective state information. Kafka Streams does not publish any unifying API that allows you to query across all the nodes for the state information. However it offers a set of infrastructure components that can be used to implement a query service based on your favorite end points.
Although Kafka streams provides all of the machinery for accessing state information locally inside Kafka Stream process, exposing this information via HTTP is left as an exercise for the developer.
At Lightbend, we have been using Kafka Streams for quite some time now and we have been developing utilities that make using the streams library idiomatic and expressive from Scala. As part of this initiative we are open sourcing kafka-streams-query that implements an HTTP based query layer on top of Interactive Queries. This is the second Kafka Streams Scala library that we are open sourcing, after kafka-streams-scala, a Scala API for Kafka Streams.
The HTTP functionality is implemented on top of the Akka module, akka-http, which is based on akka-streams and akka-actor, provide the right level of abstraction for providing and consuming HTTP-based services.
kafka-streams-query
is published and cross-built for Scala 2.11
, and 2.12
, so you can just add the following to your SBT build:
val kafka_streams_query_version = "0.1.0"
libraryDependencies ++= Seq("com.lightbend" %%
"kafka-streams-query" % kafka_streams_query_version)
For Maven, assuming Scala 2.12:
<dependency>
<groupId>com.lightbend</groupId>
<artifactId>kafka-streams-query_2.12</artifactId>
<version>0.1.0</version>
</dependency>
For Gradle builds, assuming Scala 2.12:
compile 'com.lightbend:kafka-streams-query_2.12:0.1.0'
Note: kafka-streams-query
requires Kafka Streams 1.0.0
.
The API docs for kafka-streams-query
is available here for Scala 2.12 and here for Scala 2.11.
The library is organized around 3 main packages containing the following:
http
: The main end point implementation includes a class InteractiveQueryHttpService
that provides methods for starting and stopping the HTTP service. The other classes provided are HttpRequester
that handles the request, does some validations, and forwards the request to KeyValueFetcher
, which invokes the actual service for fetching the state information.services
: This layer interacts with the underlying Kafka Streams API to fetch data from the local state. The 2 classes in this layer are (a) MetadataService
that uses Kafka Streams API to fetch the metadata for the state and (b) LocalStateStoreQueryService
that does the actual query for the state.serializers
: A bunch of serializers useful for application development that help you serialize your model structures.For sample usages of these abstractions, the library comes bundled with a couple of examples - one that uses the Kafka Streams DSL and the other that uses the lower level Procedure based APIs. In the section below we provide some basic usage patterns of these abstractions.
Here are examples of how each layer is used.
This layer offers an abstract class InteractiveQueryHttpService
that takes care of starting and stopping the HTTP service. The implementation is based on akka-http
. This is an abstract class - for a concrete implementation, the routes
need to be defined in a derived class. Here’s an example:
class MyHTTPService(
hostInfo: HostInfo,
actorSystem: ActorSystem,
actorMaterializer: ActorMaterializer
) extends InteractiveQueryHttpService(hostInfo, actorSystem, actorMaterializer, ec) {
// define the routes
val routes = handleExceptions(myExceptionHandler) {
pathPrefix("...") {
//.. impl
}
}
}
Another key abstraction that this layer provides is the KeyValueFetcher
, which allows querying the key/value store using HTTP-based requests. The package also includes an implementation of HttpRequester
that provides a generic API over HTTP to query from a host and a store. The result is returned as a Future
. This implementation is again based on akka-http and akka-streams.
Using the Kafka Streams query class, KeyValueFetcher
, from within your application is easy - you just have to define your own implementation and delegate calls to the fetch methods of the base class. Here’s a snippet from the bundled example application:
class SummaryInfoFetcher(kvf: KeyValueFetcher) {
// fetch
def fetchAccessCountSummary(hostKey: String): Future[Long] =
kvf.fetch(hostKey, WeblogProcessing.ACCESS_COUNT_PER_HOST_STORE, "/weblog/access/" + hostKey)
// windowed fetch
def fetchWindowedAccessCountSummary(hostKey: String, fromTime: Long, toTime: Long): Future[List[(Long, Long)]] =
kvf.fetchWindowed(hostKey, WeblogProcessing.WINDOWED_ACCESS_COUNT_PER_HOST_STORE, "/weblog/access/win/", fromTime, toTime)
}
Note: The library offers a fetcher implementation for key/value based stores only. However you can define your own store as well and integrate with the rest of the library components. In our example of using the Kafka Streams Procedure based APIs, we define a custom state store based on bloom filters and implement the fetcher accordingly.
This layer uses the abstractions from the HTTP layer and builds higher-level services. The principal abstraction in this layer is LocalStateStoreQuery[K, V]
, which offers a generic query service from a local Kafka Streams state store.
Besides providing the basic query service, the implementation also takes care of some of the lower level issues to abstract users from the distribution semantics of Kafka Streams. For an application deployed in the distributed mode, if the user issues a query to the state store when Kafka Streams is in the process of rebalancing the partition, the query may fail because states may be in the process of migration across nodes. The API for LocalStateStoreQuery
implements retry semantics to take care of such situations. The section Handling Rebalancing of Partitions below explains this in more detail.
The API also takes care of the situation when the state information could not be located in the node where the query is issued. The section Distributed Query below explains this situation in detail.
Here’s a sample of how to use the service LocalStateStoreQuery
when defining a custom HTTP-based application service:
// Step 1: service for fetching metadata information
val metadataService = new MetadataService(streams)
// Step 2: service for fetching from local state store
val localStateStoreQuery = new LocalStateStoreQuery[String, Long]
// Step 3: http service for request handling
val httpRequester = new HttpRequester(system, materializer, executionContext)
// Step 4: create custom http application service
val restService = new MyAppHttpService(
hostInfo,
new MyKeyValueFetcher(new KeyValueFetcher(metadataService, ..)),
system, materializer, executionContext
)
// enjoy!
restService.start()
If the application is run in a distributed mode across multiple physical nodes, local state information is spread across all the nodes. The http
services that the library offers can handle this and provide a unified view of the global application state.
Consider the following scenario:
ip1
, ip2
and ip3
. Assuming the application uses this library, the HTTP services run on port 7070
in each of the nodes.http://ip1:7070/<path>/<to>/<key>
.It may so happen that the <key>
that she is looking for may not reside in host ip1
. The query service handles this situation by interacting with the MetadataService
as follows:
ip1
MetadataService
to get information about the key
that the user is looking forip1
, then we are done. Return the query resultIt may so happen that when the user does the query, Kafka Streams may be doing a partition rebalancing when states may migrate from one store (node) to another. During such a situation Kafka Streams throws InvalidStateStoreException
.
Migration is typically done when new instances of the application come up or Kafka Streams does a rebalancing. The library handles such situation through retry semantics. The query API will continue to retry until the rebalancing is complete or the retry count is exhausted.
We hope you will find this library and our Scala API for Kafka Streams useful. We look forward to receiving your feedback!