Up until now we haven’t really taken advantage of backpressure to do anything useful. In our previous example we were able to rationally apply a timeout to a command, but this did nothing to backpressure the source and slow down the rate of incoming commands. Let’s learn some strategies for dealing with overload situations and apply them to our example.
Before we get started, let’s get some terminology out of the way. You see, there are few synonyms that are used interchangeably to indicate direction of data flow that can be confusing at first:
Observable
represents the “source”, “upstream”, or “producer”. It is the thing that sends
messages and can be back-pressured.Observer
represents the “sink”, “downstream”, or “consumer”. It is the thing that receives
messages and can deliver back-pressure.To add to this confusion there is an intermediate concept - that is a type that is both and Observable
and an Observer
. May of the common library-supplied operators (e.g. map
, flatMap
, filter
, etc) operate
this way. As a result, they can both deliver back-pressure upstream and respond to back-pressure downstream.
An Observer
provides back-pressure though the return value of onNext
. It can signal a few things back to
the Observable
that are really important.
Observer
synchronous or asynchronous?Observer
result in some kind of error?Observer
indicate that the stream should continue or stop?All of these elements are crucial to a fully formed idea of back-pressure because they allow for some
important and significant use cases to be implemented in an Observer
. Some examples that may be relevant
to systems you work with:
Observer
and an Observable
) may want to decouple its downstream
and upstream paths by means of buffering and asynchronous processing.Observer
may represent some kind of network hop, IPC boundary, database query, or other action
that is fundamentally asynchronous in nature.Observer
may do error checking or input validation and desire to cancel the stream early to
avoid continuing to computing bad data.Observable
may represent a source that cannot be back-pressured and, therefore, buffering
(along with some kind of management strategy such as tail drop) is required to avoid data loss.In each of these cases our onNext
call can return different values to signify different conditions
in the dowstream Observer
.
Future
is returned then the Observable
must asynchronously await the result.Future
is Cancelable
then it also indicates that the downstream computation can
be asynchronously shut down before it emits a result.Ack.Continue
result indicates that processing should continue as normal.Ack.Stop
result indicates that downstream is no longer interested in receiving events.In addition to these cases the returned Future
may also supply an exception indicating that an error
unhandled has occurred downstream.
So, that’s a lot to take in. It turns out, though, that it’s pretty straightforward to implement some code using
these semantics. Let’s consider the map
operator. Such an operator would have a few requirements.
x * 2
) it should apply that transformation to every element
received from the operator’s upstream and send it to the operator’s downstream.It turns out that this isn’t such a difficult thing to build and use.
import monix.execution.{Ack, Cancelable}
import monix.reactive.{Observable, Observer}
import monix.reactive.observers.Subscriber
import scala.concurrent.Future
final class MapOperator[A,B](upstream: Observable[A], mapFunc: A => B) extends Observable[B] {
override def unsafeSubscribeFn(downstream: Subscriber[B]): Cancelable =
upstream.subscribe(new FlatMapObserver(downstream))(downstream.scheduler)
private class FlatMapObserver(downstream: Subscriber[B]) extends Observer[A] {
override def onNext(elem: A): Future[Ack] =
downstream.onNext(mapFunc(elem))
override def onError(ex: Throwable): Unit =
downstream.onError(ex)
override def onComplete(): Unit =
downstream.onComplete()
}
}
Of course, under normal circumstances you would never reinvent this particular wheel. You would, instead,
use the built-in map
functionality.
import monix.execution.Scheduler.Implicits.global
import monix.reactive.Observable
Observable(1,2,3)
.map(_*2)
.toListL
.runSyncUnsafe()
// List(3,4,6)
Now, lets take on the use case of a data source that just keeps sending data even when my system becomes overloaded. That’s what we are all here for right? To deal with this we will employ a few strategies.
To kick this off, let’s build a particularly rude data source for the CommandProcessor
we build
previously. To be clear, the goal here is not provide an example of what to do but, perhaps, what
not do to and/or how to deal with sources that act in this manner.
Our RudeCommandSource
uses a tight loop continuously send commands forward without respect to backpressure.
There are analogies to this in the real world:
Also completely normal data sources can begin to look like this when dowstream systems experience issues. Perhaps the database is under an unusual amount of load from another source, or perhaps a disk has died and your backing RAID array is rebuilding. The possibilities are endless.
import monix.eval.Task
import monix.execution.{Ack, Cancelable}
import monix.execution.ChannelType.SingleProducer
import monix.execution.atomic.AtomicBoolean
import monix.reactive.{Observable, OverflowStrategy}
import monix.reactive.observers.{BufferedSubscriber, Subscriber}
import scala.util.{Failure, Success}
final class RudeCommandSource(commandsRepeated: List[String], overflowStrategy: OverflowStrategy[String])
extends Observable[String]
{
override def unsafeSubscribeFn(subscriber: Subscriber[String]): Cancelable = {
// A "Subscriber" is just an "Observer" with some extra magic attached
val buffered = BufferedSubscriber(subscriber, overflowStrategy, SingleProducer)
sendCommands(buffered, AtomicBoolean(false)).runToFuture(buffered.scheduler)
}
// This is _not_ the kind of code you would write in the real world. It's just a hack
// to build a particularly rude command source.
private def sendCommands(subscriber: Subscriber[String], stopped: AtomicBoolean): Task[Unit] =
Task.deferAction { implicit sched =>
commandsRepeated.foreach { cmd =>
if(!stopped.get) {
subscriber.onNext(cmd).onComplete {
case Success(Ack.Stop) | Failure(_) => stopped.set(true)
case Success(Ack.Continue) => ()
}
}
}
if(!stopped.get) {
sendCommands(subscriber, stopped)
} else {
Task.unit
}
}
}
Let’s use our fancy new RudeCommandSource
in an example and talk about what happens.
import monix.eval.Coeval
import monix.execution.Scheduler.Implicits.global
import monix.reactive.{Observable, OverflowStrategy}
import org.scalatest.wordspec.AnyWordSpec
import examples.{CommandProcessor, RudeCommandSource}
val overflowStrategy = OverflowStrategy.DropNewAndSignal(8, dropped => Coeval.delay(Some(s"echo dropped ${dropped}")))
val commandSource = new RudeCommandSource(List("echo before", "sleep", "echo after"), overflowStrategy)
CommandProcessor.processCommands(commandSource)
.take(10)
.foreachL(println)
.runSyncUnsafe()
Something like the following eventually gets printed to the console:
echo before
echo dropped 87
echo dropped 456
echo dropped 166
echo dropped 211
echo dropped 255
echo dropped 112
echo dropped 95
echo dropped 295
echo dropped 110
From that output it’s clear that we dropped quite a few events. After all, the second command issued was “sleep” which shuts down the system for a full second while it processes. Mean while our source is just pounding away with new commands. Let’s dive a bit deeper into each step to se what happened though.
BufferedSubscriber
.OverflowStrategy
to manage the buffer and set the buffer size to 8. We signalled downstream the
number of events dropped to the user by an “echo” command downstream.take(10)
to make it finite in nature.println
.So, for not a ton of effort we now have a system that rationally handles overload situations. Success! Don’t stop here, though, there are a ton of alternative strategies that can be employed. The Monix Documentation contains some great information about strategies you can employ in your applications - even if you aren’t a Scala developer.