Online reactive Bayesian machine learning in Haskell
In this blog post,
we will learn how to include Bayesian machine learning in a reactive rhine application.
Imagine a program that reacts in real time to a sensor input:
It might be a robot, an app using GPS data on a phone,
or a device to track an object’s location.
Also assume that we have some prior knowledge about how this data is produced,
so it makes sense to express this knowledge as a Bayesian model,
and use Bayesian machine learning to infer the best estimate of the state of the robot, phone, or object.
Then our program should be able to react continuously in real time to this estimate.
There exist numerous libraries for Bayesian modelling and inference, such as Stan, PyMC, and (in Haskell) monad-bayes,
but it is not straightforward to incorporate them into a real time application.
There is theory to solve this problem,
and “inference in the loop” has been implemented in the domain-specific synchronous language ProbZelus.
A solution integrating well into a general purpose programming language is still missing, though.
In this blog post, I will outline how to write reactive machine learning programs with a new Haskell library, rhine-bayes,
which I developed during my Open Source Fellowship which was generously sponsored by Tweag.
Introduction to the main dependency libraries
Probability monads and inference: monad-bayes
monad-bayes is a
well-documented, modular Haskell library for probabilistic
programming, backed up by research. It is maintained by Tweag.
Different standard representations of probabilities arise by composing the appropriate monad transformers from the library.
For example, the discrete finite probability monad arises from the PopulationT transformer in monad-bayes.
A monad stack built from such transformers typically has instances of the two major type classes that the library introduces:
MonadDistribution,
which supplies the effect of randomly sampling values,
and MonadFactor,
which allows for conditioning on the relative probability of observations.
(The class MonadMeasure is simply the combination of both aforementioned classes.)
For example, assume that an object is randomly placed on a coordinate axis, according to a normal distribution with, say, mean 23 and standard deviation 42. We are able to observe the object with an unbiased but noisy sensor, modelling the noise as, again, a normal distribution with standard deviation 1. The goal now is to write a probabilistic program that generates a hypothesis as to where the object might be, and at the same time score the likelihood of this hypothesis, given one measurement.
Let us represent the sampled and measured positions with a Double each.
Then this program models the situation:
type Measurement1D = Double
type Position1D = Double
randomObjectPosition ::
  MonadMeasure m =>
  -- | The noisy measurement of the position
  Measurement1D ->
  m Position1D
randomObjectPosition measurement = do
  -- Randomly draw an actual ("latent") position
  position <- normal 23 42
  -- Change the probability by how likely the measurement is, given the position
  score $ normalPdf measurement 1 position
  -- Return the sampled position
  return positionNote how sampling treats the position argument differently from conditioning.
Both normal,
and normalPdf represent the normal distribution,
and they both expect an argument for mean and standard deviation.
But their type signatures show that they are used differently:
- normal :: MonadDistribution m => Double -> Double -> m Double“samples” this distribution and thus returns the sampled position as an output in- m.
- normalPdf :: Double -> Double -> Double -> Log Doubleis a “probability density function” (pdf), expects a position as an input, and returns its probability density.
This will be important later to distinguish the components that simulate (sample) the effects of a random variable,
while others observe (score) them.
Scoring is achieved with score :: MonadFactor m => Log Double -> m (),
which records a likelihood of an observation or measurement.
Since monad-bayes is a Haskell library, and not a separate language (such as Stan),
it can easily be integrated into applications that have other effects than sampling and conditioning.
m could be created by a diverse choice of transformers from monad-bayes on top of a stack of other monads,
corresponding to robotics, backend servers, database access, or file reading.
You could give an IoT backend the ability to do machine learning on a batch of measurement data you receive from devices,
by including a number of implemented inference methods.
There is even an implementation of Sequential Monte Carlo (SMC), with a tutorial. Unfortunately, it is not usable for real time inference, only batch inference. This means that you can space-efficiently pass a big amount of data through it and collect the results at the end, but you cannot observe the current state of inference as you input the data incrementally, so it is impossible to build an interactive system that reacts while new data is ingested.
Such a framework would often be called to support “online reactive machine learning”, or “inference in the loop”. This is what we are going to look at in this blog post.
Functional Reactive Programming: rhine
rhine is a library for Functional Reactive Programming (FRP)
featuring type level clocks.
Like monad-bayes, it is polymorphic over monads,
which makes it ideal for combination with probabilistic programming.
A comprehensive introduction explaining the design choices and inner workings can be found in the original article,
but note that the API has since been simplified.
Clocked signal functions
The basic building block of a rhine program is the type ClSF m cl a b,
short for “clocked signal function”.
It is a signal processor that receives input of type a and emits output of type b.
A processing step can produce effects in a monad m,
and occurs every time a Clock of type cl ticks.
Usually we would construct a ClSF in one of the following three basic ways.
First, we construct them from pure functions and effectful functions into a monad (“Kleisli arrows”):
arr    :: (a ->   b) -> ClSF m cl a b
arrMCl :: (a -> m b) -> ClSF m cl a bSecond, we can store internal state in a ClSF:
feedback ::
  c ->
  ClSF m cl (a, c) (b, c) ->
  ClSF m cl  a      bIn feedback c clsf, the internal state is initialized with c.
Subsequently, this state is used as the second part of the input of clsf,
and updated by its output.
Finally, to distinguish it from mere stream programming,
ClSFs have builtin access to time:
timeInfo :: ClSF m cl a (TimeInfo cl)TimeInfo contains information about the current tick rate,
the time since the start of the program,
and additional information the clock may emit.
Clocked signal functions can be composed using the Category and Arrow type classes,
for example with the >>> combinator:
(>>>) ::
  ClSF m cl a b ->
  ClSF m cl   b c ->
  ClSF m cl a   cUsing these ingredients, classic signal processing units such as numeric integration or moving average have been implemented in the library.
Our previous example of a randomly chosen position with measurement becomes:
{-# LANGUAGE Arrows #-}
randomObjectPositionS ::
  MonadMeasure m =>
  ClSF m cl Measurement1D Position1D
randomObjectPositionS = proc measurement -> do
  -- Randomly draw an actual ("latent") position
  position <- arrMCl $ uncurry normal -< (23, 42)
  -- Change the probability by how likely the measurement is, given the position
  arrMCl $ score . uncurry (uncurry normalPdf) -< ((measurement, 1), position)
  -- Return the sampled position
  returnA -< positionWe have used the Arrows syntax extension to keep the previously established variable names.
The few changes we needed to do were:
- Use procand-<to explicitly declare the input for aClSF, which is distinguished from a regular function input.
- uncurryfunctions with several arguments to a function with one argument.
- Use arrMClto lift our effectful functions toClSFs.
- Use returnAinstead ofreturnto produce output.
Clock-driven integration
The central insight in rhine is that reactive programs are best divided into components by establishing when each component is active.
This is achieved with type level clocks.
A clock is, for our purposes, a stream of timestamps Time cl, which may depend on the clock type cl.
Since the clocked signal functions are also tagged by cl,
the framework can ensure through the type checker that they only be sampled at this particular rate.
Thus, one cannot accidentally intermingle, say, the audio and video stream of a multimedia application.
Some ClSFs can be polymorphic in the clock, however,
meaning that they can be run at any rate,
as long as the type of timestamps Time cl is given.
These are called “behaviour functions”:
type BehaviourF m time a b = forall cl. time ~ Time cl => ClSF m cl a bTo let components of different clocks exchange data,
there exists a further, asynchronous, building block:
The resampling buffer.
It has type ResamplingBuffer m cla clb a b where again a is the input type, b the output type, and m the monad of possible side effects.
Unlike a ClSF, it has two clocks: cla for the rate of incoming data, and clb for the rate of outgoing data.
To let several clocks run concurrently,
they are scheduled using monad-schedule,
requiring a MonadSchedule m constraint.
Complete applications of type Rhine m cl a b are then composed with a variety of combinators.
For example, let us run randomObjectPositionS at a particular speed, say, every 10 milliseconds:
randomObjectPositionR ::
  MonadMeasure m =>
  Rhine m (Millisecond 10) Measurement1D Position1D
randomObjectPositionR = randomObjectPositionS @@ waitClockWith this type signature, the component will always be sampled at the rate of 10 milliseconds.
waitClock is a particular strategy to achieve this rate in IO,
and the @@ combinator is used to attach this particular clock value to the signal function,
resulting in a complete Rhine component.
To supply the measurement values as input, we will use the standard input for now:
measurement :: Rhine m StdinClock () Measurement1D
measurement = tagS >-> readLn @@ StdinClock(In general, tagS gives further information that the clock may emit. In the case of StdinClock, it is the line of input that arrived.)
These Rhines now tick at different times,
and they need to be connected by a component that ticks at both.
This is called a ResamplingBuffer,
and there exist several standard choices in the library.
For example, keepLast implements a “zero order hold”,
which simply means that it keeps the last value of input and forwards it:
mainRhine = measurement >-- keepLast 0 --> randomObjectPositionRIn a similar way, big applications with very diverse concurrent components can be built up in rhine,
be they interactive GUIs or console applications, or streaming and webserver backends.
Even a proof-of-concept browser game has been implemented.
However, our example is not yet very interesting: We can sample values and score them, but we cannot learn anything from this process yet. We lack inference.
Combining monad-bayes and rhine: rhine-bayes
The library rhine-bayes allows us to express the components of our probabilistic model as stochastic processes.
We will take a dive in the library following an interactive graphical example you can find in https://github.com/turion/rhine/blob/master/rhine-bayes/app/Main.hs. You can try it yourself by:
- Cloning https://github.com/turion/rhine
- Running cabal run rhine-bayes-gloss
Stochastic processes as signal functions
A stochastic process is a time-varying random variable.
For our purposes, every stochastic process will have the Markov property:
It is implemented as a random state variable that progresses in time with probabilistic changes,
and emits outputs which also depend probabilistically on the state.
This is modelled in rhine-bayes as a behaviour function that can draw effects from MonadDistribution for probabilistic sampling:
type StochasticProcessF time a b
  = forall m . MonadDistribution m => BehaviourF m time a bWe have generalised the notion slightly and included an input a.
The randomly drawn position from the example is implemented as a library function whiteNoise,
but more complex stochastic processes are possible, such as Brownian motion:
brownianMotionVarying ::
  (Diff time ~ Double) =>
  StochasticProcessF time Double DoubleIn one-dimensional Brownian motion, a particle moves in one-dimensional space, while white noise governs its speed.
Here, the variance of the noise, which can be thought as the temperature of a heat bath,
can be given as a live input.
(This incurs the additional constraint Diff time ~ Double which ensures that the time step is of the same type as the variance, Double.)
Stochastic processes can also be combined with standard rhine library functions,
creating even more complex processes:
type Temperature = Double
-- | Harmonic oscillator with white noise
prior1d ::
  (MonadDistribution m, Diff td ~ Double) =>
  -- | Starting position
  Double ->
  -- | Starting velocity
  Double ->
  StochasticProcessF td Temperature Double
prior1d initialPosition initialVelocity = feedback 0 $ proc (temperature, position') -> do
  impulse <- whiteNoiseVarying -< temperature
  let springConstant = 3
      acceleration = (- springConstant) * position' + impulse
  -- Integral over roughly the last 10 seconds, dying off exponentially, as to model a small friction term
  velocity <- arr (+ initialVelocity) <<< decayIntegral 10 -< acceleration
  position <- integralFrom initialPosition -< velocity
  returnA -< (position, position)This process, suggestively called prior1d since we will extend it to two dimensions and use it as the prior in our inference model,
represents a pointlike object moving around in a harmonic potential,
while being additionally accelerated randomly all the time.
Let us quickly make it two-dimensional, and visualise it using the package gloss:
prior = prior1d 10 0 &&& prior1d 0 10
As expected, the point initially moves around on a circle, but then is quickly pushed into some random direction. We could imagine, for example, that the point represents a skateboarder in a skateboard bowl! They will skate up and down the bowl, and randomly make little adjustments to their speed.
Continuous-time Markov processes
In many fields of data science, it is assumed that a continuously varying quantity such as our point can be indirectly observed, but the sensor is noisy and does not report the accurate position. Instead, it adds a random error to the actual position, for example white noise.
It will then be the job of an inference algorithm to estimate the actual, “latent” position, from the noisy measurements. In order to achieve this, we first need to completely model how these measurements arise. The behaviour of the latent position was already described in the last section, it remains to model the sensor noise. We choose a very simple model, additive white noise with constant variance:
type Pos = (Double, Double)
type Sensor = Pos
-- | A generative model of the sensor noise
noise :: StochasticProcess td Pos
noise = whiteNoise 1 &&& whiteNoise 1
-- | A generative model of the sensor position, given the noise
generativeModel :: (Diff td ~ Double) => StochasticProcessF td Pos Sensor
generativeModel = proc latent -> do
  noiseNow <- noise -< ()
  returnA -< latent ^+^ noiseNowIn our visualization, the output of the sensor is displayed as a red dot:

We can imagine that the skater in the bowl has a GPS measurement device on them, which measures the position, but is off by an error every time.
Particle filter: Online Sequential Monte Carlo inference
We can now simulate noisy measurements from positions, but given only the measurements how do we infer the latent position from that? Bayesian machine learning allows us to do exactly that by turning our model around.
Like the generative model, the inference takes the same two ingredients:
- The random walk in the harmonic oscillator is used as a prior to create hypotheses where the latent position might be.
- The sensor model is used as a likelihood to condition how, well, likely a certain hypothesized position is, given a measurement. This requires us to reformulate the sensor as a probability density function:
{- | This remodels the distribution defined by `noise` as a PDF,
   as to be used in the inference later.
-}
sensorLikelihood :: Pos -> Sensor -> Log Double
sensorLikelihood (posX, posY) (sensorX, sensorY)
  = normalPdf posX sensorNoiseTemperature sensorX
  * normalPdf posY sensorNoiseTemperature sensorYA standard approach for this inference problem is the “particle filter”, or “Sequential Monte Carlo”.
It proceeds as follows:
- We let the generativeModelrun to create a continuously updated true latent position, and a noisy measurement.
- For inference, we continuously sample from a number of copies (say, 100) of prior. These are called our “particles”, and together form a “population”. Each of these copies runs independently from the others and can make its own random choices.
- Using the given measurements, we condition each particle individually on the likelihood of producing this measurement given its hypothetical position.
The function sensorLikelihoodis used for this.
- The most unlikely particles are removed, and the most likely ones are duplicated
(since they appear to have been the best guesses so far).
There is a lot of choice in the exact details of this step (known as “resampling”).
For example, when or how often it takes places, which particles exactly are removed and replicated, and how this replication is carried out.
Some standard choices are defined in monad-bayes.
Populations are implemented in monad-bayes as the monad transformer PopulationT,
and consequently resamplers have the type forall x. PopulationT m x -> PopulationT m x.
The implementation of the particle filter in rhine-bayes has the following type signature:
-- | Run the Sequential Monte Carlo algorithm continuously on a 'ClSF'.
runPopulationCl ::
  Monad m =>
  -- | Number of particles
  Int ->
  -- | Resampler (see 'Control.Monad.Bayes.Population' for some standard choices)
  (forall x. PopulationT m x -> PopulationT m x) ->
  -- | A signal function modelling the stochastic process on which to perform inference.
  --   @a@ represents observations upon which the model should condition, using e.g. 'score'.
  --   It can also additionally contain hyperparameters.
  --   @b@ is the type of estimated current state.
  ClSF (PopulationT m) cl a b ->
  ClSF m cl a [(b, Log Double)]In our example, a would represent the measurements, while b is the estimated position.
Apart from parameters such as the number of particles and a resampler,
we give runPopulationCl a signal function that contains a population.
The result is a signal function that takes the same input,
but the inference has been carried out and thus it returns a population of guesses for the latent variable, together with a probability for each.
The latent variable is the position in our case,
so the particle filter produces a cloud of 100 position guesses, visualized as little white dots:

Rest assured, these estimates really only depend on the simulated sensor output! Isn’t it a bit magical how well they approximate the actual position?
Extending particle filters: estimating parameters
In order to make the example richer, we add a knob to control the temperature, that is, the variance of the random walk of the green point. The hotter it becomes, the more it will be bumped in one direction or the other. One would expect that with a very high temperature, the algorithm will have a harder time guessing the position, since it will change unexpectedly a lot. Indeed, a high temperature results in a bigger variance, as one can see in the size of the particle cloud. Which opens the question whether we can estimate the temperature itself only from the measurements. After all, from a high randomness in the movement in the point, one might want to infer a higher temperature. And in fact, we can! But it is not quite as straightforward as it may seem.
The first naive attempt might be to treat the temperature like the initial state of the point: Guess a value from a prior, and then condition the guess based on the observations. But this has the problem that we will forever be restricted to the 100 guesses we made at the beginning, and if the true value is not among them, we will never estimate it. Even worse, through particle resampling, we constantly remove some particles with low probability. The highly probable particles are replicated then, and while the resulting replicated point positions then move on individually, the temperature guess does not evolve with time since it is a constant. Eventually, we end up with only a single temperature value shared by all particles, our estimate has collapsed.

The actual temperature is represented by a red bar, and the inference guesses by white markers above the bar. It is possible to change the temperature, and to reset the temperature inference state. Pretty soon after we reset, the more unlikely guesses disappear completely. Sometimes we are lucky and an estimate close to the actual value is the sole survivor, but if we have bad luck, a bad estimate might remain forever (or until we reset again). This surprisingly hard problem is well-known, and not yet settled decisively.
Isn’t it strange that we don’t have the same problem for the position estimate? The difference here is that the position follows a stochastic process, and thus rejuvenates itself throughout inference: If a single particle is replicated too often, risking collapse, all its copies have a random behaviour on their own, slowly moving away from each other again.
And in fact, a workaround for the constant parameter guess is to treat the parameter as a state as well that evolves according to some stochastic process. In this example, this is a better description of the situation, in fact! The user can change the temperature at any time, and from the viewpoint of the model, this is a random behaviour.
So let us model the temperature probabilistically. We assume a Poisson process that counts the changes the user makes, and multiply the temperature by a value from a log-normal distribution on every change:
-- | We assume the user changes the temperature randomly every 3 seconds.
temperatureProcess ::
  (MonadDistribution m, Diff td ~ Double) =>
  BehaviourF m td () Temperature
temperatureProcess
  -- Draw events from a Poisson process with a rate of one event per 3 seconds
  =   poissonHomogeneous 3
  -- For every event, draw a number from a normal distribution
  >>> arrMCl (flip replicateM $ normal 0 0.2)
  -- Sum the numbers and log-transform then into the positive reals
  >>> arr (exp . sum)
  -- Multiply original temperature with the random temperature changes
  >>> accumulateWith (*) initialTemperature(The >>> operator directly composes signal functions.)
We can now estimate the temperature, and this estimate reacts (somewhat slowly) to user changes:

The estimate will never converge to the true value, though, since it cannot rule out that the user has changed the temperature recently.
Summary and outlook
We combined the probabilistic programming library monad-bayes and the FRP library rhine,
to express stochastic processes and perform Sequential-Monte-Carlo inference, or particle filters, on them.
The resulting library rhine-bayes is neatly embedded in Haskell and allows for easy interaction between the machine learning parts and other components,
such as user interaction and visualization.
You can find the code for the complete application in https://github.com/turion/rhine/tree/master/rhine-bayes.
The integration of monad-bayes into rhine was relatively straightforward,
but the runtime performance turns out not yet to be optimal.
rhine, and its main dependency dunai introduce some machinery to make everything reactive,
and it seems that some of it causes momentary space leaks.
There is no worry about a program running out of space in the long run,
but at every step of the inference algorithm,
an unnecessarily large amount of thunks is built up and garbage collected again.
This slows down performance and has to be optimized.
While rhine-bayes already contains quite a few standard stochastic processes,
there are many more that could be implemented, for example Gaussian processes.
Also, more inference algorithms (such as Particle-Marginal-Metropolis-Hastings, Resample-Move-SMC, streaming delayed sampling, or smoothing) are in the works.
If you enjoyed this article, you might be interested in joining the Tweag team.