Over a million developers have joined DZone.

Iteratees for Imperative Programmers

DZone 's Guide to

Iteratees for Imperative Programmers

· Java Zone ·
Free Resource

When I first heard the word iteratee, I thought it was a joke. Turns out, it wasn't a joke, in fact there are also enumerators (that's ok) and enumeratees (you're killing me). If you're an imperative programmer, or rather a programmer who feels more comfortable writing imperative code than functional code, then you may be a little overwhelmed by all the introductions to iteratees out there, because they all assume that you think from a functional perspective. Well I just learnt iteratees, and although I'm feeling more and more comfortable with functional programming every day, I still think like an imperative programmer at heart. This made learning iteratees very difficult for me. So while I'm still in the imperative mindset, I thought this a very good opportunity to explain iteratees from an imperative programmers perspective, taking no functional knowledge for granted. If you're an imperative programmer who wants to learn iteratees, this is the blog post for you. I'm going to specifically be looking at Play's Iteratee API, but the concepts learnt here will apply to all Iteratees in general.

So let's start off with explaining what iteratees, and their counterparts, are trying to achieve. An iteratee is a method of reactively handling streams of data that is very easily composable. By reactive, I mean non blocking, ie you react to data being available to read, and react to the opportunity to write data. By composable, I mean you write simple iteratees that do one small thing well, then you use those as the building blocks to write iteratees that do bigger things, and you use those as the building blocks to write iteratees to do even bigger things, and so on. At each stage, everything is simple and easy to reason about.

Reactive stream handling

If you're looking for information about iteratees, then I'm guessing you already know a bit about what reactive stream handling is. Let's contrast it to synchronous IO code:

trait InputStream {
  def read(): Byte

So this should be very familiar, if you want to read a byte, you call read. If no byte is currently available to be read, that call will block, and your thread will wait until a byte is available. With reactive streams, obviously it's the other way around, you pass a callback to the stream you want to receive data from, and it will call that when it's ready to give data to you. So typically you might implement a trait that looks like this:

trait InputStreamHandler {
  def onByte(byte: Byte)

So before we go on, let's look at how the same thing would be achieved in a pure functional world. At this point I don't want you to ask why we want to do things this way, you will see that later on, but if you know anything about functional programming, you know that everything tends to be immutable, and functions have no side effects. The trait above has to have side effects, because unless you are ignoring the bytes passed to onByte, you must be changing your state (or something elses state) somehow in that function. So, how do we handle data without changing our state? The answer is the same way other immutable data structures work, we return a copy of ourselves, updated with the new state. So if the InputStreamHandler were to be functional, it might look like this:

trait InputStreamHandler {
  def onByte(byte: Byte): InputStreamHandler

And an example implementation of one, that reads input into a seq, might look like this:

class Consume(data: Seq[Byte]) extends InputStreamHandler {
  def onByte(byte: Byte) = new Consume(data :+ byte)

And so here we have a functional solution for reactive stream handling. But it's not a very good one, for a start, there's no way for the handlers to communicate to the code that uses them that they don't want to receive any more input, or if they've encountered an error (exceptions are frowned upon in functional programming). We could add things to handle this, but very soon our interface would become quite complex, hard to break up into small pieces that can be composed, etc. I'm not going to justify this now, I think you'll see it later when I show you just how easy iteratees are to compose.

So, by this stage I hope you have understood two important points. Firstly, reactive stream handling means twofold reacting, both your code has to react to the stream being ready, and the stream has to react to you being ready. Secondly, when I say that we want a functional solution, I mean a solution where everything is immutable, and that is achieved by our stream handlers producing copies of themselves each time they receive/send data. If you've understood those two important points, then now we can move on to introducing iteratees.


There are a few things that our interface hasn't yet addressed. The first is, how does the stream communicate to us that it is finished, that is, that it has no more data for us? To do this, instead of passing in a byte directly, we're going to abstract our byte to be something of type Input[Byte], and that type can have three possible implementations, EOF, an element, or empty. Let's not worry about why we need empty just yet, but assume for some reason we might want to pass empty. So this is what Input looks like:

sealed trait Input[+E]
object Input {
  case object EOF extends Input[Nothing]
  case object Empty extends Input[Nothing]
  case class El[+E](e: E) extends Input[E]

Updating our InputStreamHandler, we now get something that looks like this:

trait InputStreamHandler[E] {
  def onInput(in: Input[E]): Future[InputStreamHandler[E]]

Now updating our Consumer from before to handle this, it might look like this:

class Consume(data: IndexedSeq[Byte]) extends InputStreamHandler[Byte] {
  def onInput(in: Input[Byte]) = in match {
    case El(byte) => Future.successful(new Consume(data :+ byte))
    case _ => Future.successful(this)

You can see that when we get EOF or Empty, there's nothing for us to do to change our state, so we just return ourselves again. If we were writing to another stream, we might, when we receive EOF, close that stream (or rather, send it an EOF).

The next thing we're going to do is make it easier for our handler to consume input immediately without having to create a future. To do this, rather than passing the byte directly, we'll pass a function, that takes a function as a parameter, and that function will take the byte as a parameter. So, our handler, when it's ready, will create a function to handle the byte, and then invoke the function that was passed to it, with that function. We'll call the first function the cont function, which is short for continue, and means when you're ready to continue receiving input invoke me. Too many functions? Let's look at the code:

trait InputStreamHandler[E] {
  def onByte[B](cont: (Input[E] => InputStreamHandler[E]) => Future[B]): Future[B]

Now where did this Future[B] come from? B is just the mechanism that the stream uses to pass state back to itself. As the handler, we don't have to worry about what it is, we just have to make sure that we eventually invoke the cont function, and eventually make sure that the B it returns makes it back to our caller. And what does this look like in our Consume iteratee? Let's have a look:

class Consume(data: IndexedSeq[Byte]) extends InputStreamHandler {
  def onByte(cont: (Input[Byte] => InputStreamHandler) => Future[B]) = cont {
    case Input.El(byte) => new Consume(data :+ byte)
    case _ => this

You can see in our simple case of being ready to handle input immediately, we just immediately invoke cont, we no longer need to worry about creating futures. If we want to handle the input asynchronously, it is a little more complex, but we'll take a look at that later.

Now we have one final step in producing our iteratee API. How does the handler communicate back to the stream that it is finished receiving data? There could be two reasons for this, one is that it's finished receiving data. For example, if our handler is a JSON parser, it might have reached the end of the object it was parsing, and so doesn't want to receive anymore. The other reason is that it's encountered an error, for a JSON parser, this might be a syntax error, or if it's sending data through to another stream, it might be an IO error on that stream.

To allow our iteratee to communicate with the stream, we're going to create a trait that represents its state. We'll call this trait Step, and the three states that the iteratee can be in will be Cont, Done and Error. Our Cont state is going to contain our Input[Byte] => InputStreamHandler function, so that the stream can invoke it. Our Done state will contain our result (in the case of Consume, a Seq[Byte]) and our Error state will contain an error message.

In addition to this, both our Done and Error states need to contain the left over input that they didn't consume. This will be important for when we are composing iteratees together, so that once one iteratee has finished consuming input from a stream, the next can pick up where the first left off. This is one reason why we need Input.Empty, because if we did consume all the input, then we need some way to indicate that.

So, here's our Step trait:

sealed trait Step[E, +A]
object Step {
  case class Done[+A, E](a: A, remaining: Input[E]) extends Step[E, A]
  case class Cont[E, +A](k: Input[E] => InputStreamHandler[E, A]) extends Step[E, A]
  case class Error[E](msg: String, input: Input[E]) extends Step[E, Nothing]

The type parameter E is the type of input our iteratee wants to accept, and A is what it's producing. So our handler trait now looks like this:

trait InputStreamHandler[E, A] {
  def onInput[B](step: Step[E, A] => Future[B]): Future[B]

And our consumer is implemented like this:

class Consume(data: Seq[Byte]) extends InputStreamHandler[Byte, Seq[Byte]] {
  def onInput(step: Step[Byte, Seq[Byte]] => Future[B]) = step(Step.Cont({
    case Input.El(byte) => new Consume(data :+ byte)
    case Input.EOF => new InputStreamHandler[Byte, Seq[Byte]] {
      def onInput(cont: Step[Byte, Seq[Byte]] => Future[B]) = step(Step.Done(data, Input.Empty))
    case Input.Empty => this

One big difference here that you now notice is when we receive EOF, we actually pass Done into the step function, to say we are done consuming the input.

And so now we've built our iteratee interface. Our naming isn't quite right though, so we'll rename the trait obviously to Iteratee, and we'll rename onInput to fold, since we are folding our state into one result. And so now we get our interface:

trait Iteratee[E, +A] {
  def fold[B](folder: Step[E, A] => Future[B]): Future[B]

Iteratees in practice

So far we've started with the requirements of a traditional imperative input stream, and described what an iteratee is in constrast to that. But looking at the above code, you might think that using them is really difficult. They seem like they are far more complex than they need to be, at least conceptually, to implement reactive streams. Well, it turns out that although so far we've shown the basics of the iteratee interface, there is a lot more that a full iteratee API has to offer, and once we start understanding this, and using it, you will start to see how powerful, simple and useful iteratees are.

So remember how iteratees are immutable? And remember how iteratees can be in one of three states, cont, done and error, and depending on which state it's in, it will pass its corresponding step class to the folder function? Well, if an iteratee is immutable and it can be in one of three states, then it can only ever be in that state that it's in, and therefore it will only ever pass that corresponding step to the folder function. If an iteratee is done, it's done, it doesn't matter how many times you call its fold function, it will never become cont or error, and its done value will never change, it will only ever pass the Done step to the folder function with the same A value and the same left over input. Because of this, there is only one implementation of a done iteratee that we'll ever need, it looks like this:

case class Done[E, A](a: A, e: Input[E] = Input.Empty) extends Iteratee[E, A] {
  def fold[B](folder: Step[E, A] => Future[B]): Future[B] = folder(Step.Done(a, e))

This is the only done iteratee you'll ever need to indicate that you're done. In the Consume iteratee above, when we reached EOF, we created a done iteratee using an anonymous inner class, we didn't need to do this, we could have just used the Done iteratee above. The exact same thing holds for error iteratees:


Published at DZone with permission of

Opinions expressed by DZone contributors are their own.

{{ parent.title || parent.header.title}}

{{ parent.tldr }}

{{ parent.urlSource.name }}