For the purposes of this talk we are going to focus on using an observable to build a system which uses backpressure to respond to overload situations. This is the solution provided as part of the reactive extensions specification and is, as a result, available in a wide variety of languages.
So, what is an Observable
? At its core it’s basically an interface for an Observer
to subscribe to a data source along with some contractual details about how that
interface will be used.
In general, an Observer
has a few important methods:
onNext
method called for each data element in the Observable
. In reactive implementations
that support backpressure (such as Monix) the return of this method indicates the synchronous or
asynchronous completion of processing on the work item with success or an error.onComplete
method that is called when the Observable
contains no more data elements.onError
method called when the Observable
encountered some kind of error is encountered
which results in the stream terminating.The interface looks something like this:
import monix.execution.Ack
import monix.reactive.Observer
import scala.concurrent.Future
final class MyObserver[-T] extends Observer[T] {
override def onNext(elem: T): Future[Ack] = {
println(elem)
Ack.Continue
}
override def onError(ex: Throwable): Unit =
println(ex.getMessage)
override def onComplete(): Unit =
println("Finished!")
}
See: Monix Documentation
Now, it’s useful to understand how these methods will be called. There are several rules to be followed
when interacting with an Observer
.
onNext
will be called 0 or more times. It will never be called more than once for the
same event. In reactive implementations supporting backpressure, it is also important to ensure
that the onNext
will never be called until its previous invocation ends either synchronously
or asynchronously. As a result, there is no need for expensive locking or synchronization of this
method.onComplete
, onError
, and onError
will never be called concurrently. As a result, there is
no need for possible expensive locking or synchronization in these calls when implementing an observer.NOTE: Your framework of choice may have rules that differ slightly from this, are more explicit in some situations, etc. Please refer to the documentation of the library you are using for a more complete listing.
On the other side is an Observable
which is basically just an interface that allows for an Observer
to subscribe to the data source. The exact details of this differ a bit between platforms. When using
scala and monix this is accomplished by using the factory methods available on the Observable
object.
import monix.execution.Scheduler.Implicits.global
import monix.reactive.Observable
val source = Observable("Alpha", "Beta", "Gamme", "Delta", "Epsilon")
source
.foreachL(name => println(name))
.runSyncUnsafe()
See: Monix Documentation
In most cases the details of these low level Observer
and Observable
contracts are transparent to
code authors. Why? Because a huge amount of higher-order operations are available for operating on
Observable
streams of data without having ever manually write an Observer
. You’ve already seen one
example above when using the foreachL
operator in Scala.
Let’s take an example use case to try this out. Let’s build an application that accepts requests from a client as stream of text lines where each line is a “command”. With that, let’s implement the following commands:
In each case the commands should be executed in-order. If a command cannot be processed within 1 second (e.g. because the ‘sleep’ command takes to long) then it should immediately echo the line “try again”.
In the following example we implement this simple “app” in scala by taking advantage of a few operators.
mapEval
operator executes a given Task
for each element
of the source observable.Task
itself which times out any command processing
after 1 second, cancels the execution of that task, and returns instead a message for the user indicating
that they should try again.import cats.effect.ExitCode
import monix.eval.{Task, TaskApp}
import monix.reactive.Observable
import scala.concurrent.duration._
object CommandProcessor extends TaskApp {
private val processingFinishedMsg: String =
"No more commands to process. Shutting down!"
private val processingTimeoutMsg: String =
"Your command could not be processed. Please try again later."
private val processingTimeout: FiniteDuration = 1.second
private val sleepTime: FiniteDuration = 10.second
def processCommand(cmd: String): Task[String] =
if(cmd.startsWith("echo")) {
Task.pure(cmd)
} else if(cmd.startsWith("sleep")) {
Task.delay("awake").delayExecution(sleepTime)
} else {
Task.pure(s"An unknown command $cmd was received.")
}
def processCommands(commands: Observable[String]): Observable[String] =
commands
.mapEval { cmd =>
processCommand(cmd).timeoutTo(processingTimeout, Task.pure(processingTimeoutMsg))
}
.append(processingFinishedMsg)
override def run(args: List[String]): Task[ExitCode] = {
val commands = Observable("echo before", "sleep", "echo after")
processCommands(commands)
.foreachL(println)
.map(_ => ExitCode.Success)
}
}
When we put all of this together we get a system which provides the following output when given the commands:
echo before
Your command could not be processed. Please try again later.
echo after
No more commands to process. Shutting down!