michael wolfendale

I'm a software developer based in Newcastle (UK). I have a particular interest in Scala and functional programming.

Getting Started With Akka Streams

posted on 2021-05-31

Akka is a suite of libraries for Scala and Java which provides tools for organising asynchronous programs.

Here I’ll be focusing on using akka with Scala.

What is akka streams?

Akka streams is a part of the Akka ecosystem which implements the concept of reactive streaming. It is fully compatible with other implementations of the reactive streams specification.

Why akka streams?

There are lots of good reasons to use akka streams. I’ll go through a few of them here but this is by no means an exhaustive list:

Graphs

At the core of akka streams is the concept of a graph. Like in the mathematical sense, a graph is a set of nodes connected by edges. In our case, each node is a processing step in our computation, and an edge indicates data flowing from one node to another.

Each Graph in akka is a blueprint, or a piece of a blueprint of a program that we can run.

Let’s dig in and take a look at Graph in more detail.

type Graph[+S <: Shape, +M]

There are two type parameters here that are going to take some breaking down.

S Shape

The Shape type parameter indicates the external interface of the Graph.

To get a feel for what this means let’s take a look at some of its implementations.

object ClosedShape extends Shape
final case class SourceShape[+T](out: Outlet[T]) extends Shape
final case class SinkShape[-T](in: Inlet[T]) extends Shape
final case class FlowShape[-I, +O](in: Inlet[I], out: Outlet[O]) extends Shape

Note: These aren’t all of the implementations included with akka streams but they’re illustrative. In fact, you can even create your own Shape to suit your needs.

Essentially, a Shape has a number of inlets and outlets which determine how it can be combined with other Graphs. Let’s examine each of these implementations in turn:

M Materialized Value

The M type parameter is known as the materialized value of the graph. This represents the type of value that is produced when the stream is run (a.k.a. materialized).

Materialized values can be anything but there are common types that crop up often:

Running streams

In order to run a graph we need an instance of a Materializer. There is an implicit conversion from akka’s ActorSystem to Materializer.

Note: If you use akka streams in a Play framework project, there is a Materializer created from the default ActorSystem available by default through dependency injection.

Rather than representing a computation itself, each Graph is a blueprint of a computation which can be materialized any number of times.

In the following sections we’ll need to materialize our own graphs so I have included the below boilerplate which can be used to run examples and to experiment. For brevity I will omit the boilerplate from the examples themselves.

import akka.NotUsed
import akka.actor.ActorSystem

// imports for common akka streams methods
import akka.stream._
import akka.stream.scaladsl._

// imports for evaluating `Future`s
import scala.concurrent.Future
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

object AkkaApp extends App {

  implicit val system: ActorSystem = ActorSystem()
 
  try {

    // our code here

  } finally system.terminate()
}

Closed graphs

In order to materialize a graph, it must be closed (i.e. have no open inlets or outlets).

val materializer: Materializer = implicitly
val graph: Graph[ClosedShape, Future[Int]] = Source.single(1).toMat(Sink.head)(Keep.right)
val future: Future[Int] = materializer.materialize(graph)
val result: Int = Await.result(future, 1.second)
println(result)

Don’t worry about the graph creation syntax, we’ll get to that in a moment.

First, let’s step through this: 1) Fetch our materializer from implicit scope. 2) Create a Graph[ClosedShape, Future[Int]]. 3) Materialize the graph into a Future[Int]. 4) Await the result of the Future[Int]. 5) Print the result to the console.

This should print 1 to the console.

This is a perfectly acceptable way to materialize a graph, however, we can tidy up this code a little:

val graph: RunnableGraph[Future[Int]] = Source.single(1).toMat(Sink.head)(Keep.right)
val future: Future[Int] = graph.run()
val result: Int = Await.result(future, 1.second)
println(result)

Most of the methods that return a closed graph will specifically return a RunnableGraph[M] which gives us access to the .run() method.

final case class RunnableGraph[+M](...) extends Graph[ClosedShape, M] {
  def run()(implicit materializer: Materializer): M = materializer.materialize(this)
}

Composing linear graphs

Let’s get started building our own graphs!

In akka streams, individual graph nodes are often called operators and they come in 3 kinds:

These match the Shapes that we talked about earlier. I’ve labelled the type parameters here to match the flow of data, for example: Data flows out of a source and in to a sink. None of these graphs are closed so they can’t be materialized by themselves.

In order to make runnable graph, we can compose different operators.

I’ve omitted the materialized values in the examples below to simplify them. We’ll cover what happens to the type of materialized values once we know how to combine graphs.

Source ~> Sink

The simplest runnable graphs we can make are by connecting a Source directly to a Sink. We can do this with the .to method.

val source: Source[Int, _] = Source(List(1, 2, 3))
val sink: Sink[Int, _] = Sink.fold[Int](0)(_ + _)
val graph: RunnableGraph[_] = source.to(sink)

When combining operators like this, the types of the inlets/outlets must match up. In this case the outlet of the Source and the inlet of the Sink both have the type Int.

Source ~> Flow

Not all graphs compose to make RunnableGraphs, composing a Source with a Flow gives us a Source. To connect a Source and a Flow we use the .via method.

val source: Source[Int, _] = Source(List(1, 2, 3))
val flow: Flow[Int, String, _] = Flow[Int].map(_.toString)
val graph: Source[String, _] = source.via(flow)

Here we combine a Source which produces Ints with a Flow that turns each Int into a String. This returns a Source that produces Strings. This makes sense because although we’ve paired up the Source’s outlet with the Flow’s inlet, the Flow’s outlet hasn’t been attached to anything. That leaves us with a graph with a single outlet of String, a graph with a single outlet is a Source!

Flow ~> Sink

As with the above we can create a Sink by combining a Flow and a Sink. Take note of the types of the elements flowing through each operator. Because we are attaching something to a Sink we use .to.

val flow: Flow[Int, String, _] = Flow.fromFunction(_.toString)
val sink: Sink[String, _] = Sink.ignore
val graph: Sink[Int, _] = flow.to(sink)

Flow ~> Flow

Finally, we can chain together Flows to create another flow, this works in the same way that function composition does:

val f: Int => String = _.toString
val g: String => Char = _.head
val h: Int => Char = f.andThen(g)

val ff: Flow[Int, String, _] = Flow.fromFunction(_.toString)
val gg: Flow[String, Char, _] = Flow.fromFunction(_.head)
val hh: Flow[Int, Char, _] = ff.via(gg)

These methods compose in data flow order, from source to flow to Sink.

.via is used to connect most operators, but when you connect an operator to a Sink you use .to.

For example:

source
	.via(flow1)
	.via(flow2.via(flow3))
	.to(sink)

Selecting materialized values

Up to now, when combining graphs we haven’t considered what happens to the materialized value. Let’s look at .to and .via in more detail to see how they work:

final class Source[+Out, +M] extends Graph[SourceShape[Out], M] {
	def to[M2](sink: Graph[SinkShape[Out], M2]): RunnableGraph[M]
	
	def via[T, M2](flow: Graph[FlowShape[Out, T], M2]): Source[T, M]
}

Note: This is simplified slightly from the code in akka streams itself.

So, when you compose graphs with these methods, the resulting graphs always have a materialized value of the graph from the left hand side.

This means that if you composed a bunch of graphs together in data flow order you’d end up with the materialized value of the initial source. But what if you want to keep the materialized value on the right? Or what if you want to keep both?

final class Source[+Out, +M] extends Graph[SourceShape[Out], M] {
	def toMat[M2, M3](sink: Graph[SinkShape[Out], M2])(combine: (M, M2) => M3): RunnableGraph[M]
	
	def viaMat[T, M2, M3](flow: Graph[FlowShape[Out, T], M2])(combine: (M, M2) => M3): Source[T, M]
}

All the methods for composing operators have two forms:

The combine function can be anything, but there are a few cases that are extremely common:

These cases are so common that akka has optimised function instances that do this! Keep.left, Keep.right, Keep.both, and Keep.none respectively. As with NotUsed these have the added convenience of conveying intent and being available in both Java and Scala.

Although these are the most common ways of composing operators, it’s good to remember that you can use arbitrary functions to produce the new materialized value. For example, imagine we have two graphs which both materialize into Futures. we can use a function to combine them in the graph definition itself:

def combine[A, B](fl: Future[A], fr: Future[B]): Future[(A, B)] =
  for {
    l <- fl
    r <- fr
  } yield (l, r)

val source: Source[Int, NotUsed] =
  Source(List(1, 2, 3))

val countSink: Sink[Int, Future[Int]] =
  Sink.fold(0)((m, _) => m + 1)

val collectSink: Sink[Int, Future[List[Int]]] =
  Sink.collection[Int, List[Int]]

val graph2: RunnableGraph[Future[(Int, List[Int])]] =
  source
    .alsoToMat(countSink)(Keep.right)
    .toMat(collectSink)(combine)

println(Await.result(graph2.run(), 1.second))

This graph outputs a single future of a tuple containing the count of all the elements that have passed through it and a List of the collected elements at the end.

Demand and back pressure

Now that we have a good understanding of graphs, we should talk about back pressure - the reactive part of reactive streams.

Let’s look at a few examples

Slower Source

Imagine we have a Source which produces data slowly. For example, a weather report updated twice a day, and a Sink which prints that report to the console.

Even without reactive streaming, most systems would deal with this scenario. For example, if we represented our Source and Sink as actors, the sink actor can always handle the amount of data that the source is sending to it - everything is fine!

Faster Source

Imagine we have a Source which will produce data super quickly. For example, a stream of tweets from a particularly vocal user, and a Sink which would send each tweet to a physical printer one at a time.

In this case we would start to see problems if we tried to model this with actors. As the source is producing data faster than the sink can consume it, the sink’s mailbox will start to fill up and eventually overflow. Depending on the configuration of the system this could result in a loss of data, or the entire system could fail - That’s pretty bad!

Demand

In akka streams, we handle these cases explicitly with the concept of demand:

Demand is also referred to as back pressure. Think of it like this, a Sink will push back against a source until it’s ready to consume data and a Source will not emit while it is pushed against.

Note: We say back pressure as this mechanism is similar to pressure in fluid dynamics.

How demand is implemented depends on the specifics of the Source or Sink in question. Let’s look at how this would apply to our above examples.

Demand for Sources

Our super-fast tweet source would need to make sure it only emits when there is demand. To do this we need to decide how we would turn a stream of lots of tweets into a stream of fewer tweets.

One way to do this would be to only poll for tweets when we receive demand. However, to prevent us emitting duplicate elements we’d need to maintain a buffer of the last element to compare in care there have been no updates since since demand was last signalled.

Another way would be to use a buffer to store tweets waiting to be emitted once demand is signalled. However, what happens when that buffer fills up? We only have finite memory to deal with so just adding elements to a buffer isn’t really enough. We’ll cover how to deal with that shortcoming in a moment.

Demand for Sinks

Our sloth-like printer sink would need to be implemented to make sure that we only signal demand once we’re sure we can handle the next element in the stream.

The specifics of this are only really relevant when implementing a new graph stage (which I won’t talk about in this article) however, it’s good to think about this conceptually.

In blocking, imperative pseudocode we could think of an implementation something like this:

setup()
while(!upstreamCompleted) {
	demand()
	getNextElement()
	print()
}
shutdown()

This kind of implementation is generally preferable to using buffers as it doesn’t have a failure case to consider, it always back pressures until it can handle a new element

Buffers

When defining akka streams graphs we can add explicit buffers to specify how demand should be handled for certain parts of our graphs, for example:

val source: Source[_, _] = ???
source.buffer(10, OverflowStrategy.fail)

As you can see, the buffer method takes 2 parameters:

In my experience, this explicit boundedness and error handling forces us to think about these aspects of a system at the design stage which leads to more predictable and maintainable systems.

The different values for OverflowStrategy are:

Earlier, I mentioned that only closed graphs can be materialized. With our new understanding of demand hopefully that makes more sense:

A graph with an open inlet would have nowhere to pull data from, and a graph with an open outlet would never receive demand. In either case no data can flow, so it makes sense that akka streams’ API only lets you materialize closed graphs.

Conclusion

So far we’ve only covered the basics of working with akka streams but hopefully this has been useful for anyone trying to understand the basic concepts of akka streams and reactive streaming.

In the future I’d like to cover some more advanced topics such as:

But this article has already gotten too long! Feel free to leave any comments or corrections in the issues section on this repo