The code for all autosources & sample apps can be found on Github here
The aim of this article is to show how scalaz-stream could be plugged on existing Play Iteratee/Enumerator and used in your web projects. I also wanted to evaluate in depth the power of scalaz-stream Processes by trying to write a recursive streaming action: I mean a web endpoint streaming data and re-injecting its own streamed data in itself.
If you want to see now how scalaz-stream is used with Play, go to this paragraph directly.
Why Scalaz-Stream when you have Play Iteratees?
Play Iteratees are powerful & cool but…
I’m a fan of everything dealing with data streaming and realtime management in backends. I’ve worked a lot on Play Framework and naturally I’ve been using the cornerstone behind Play’s reactive nature: Play Iteratees.
Iteratees (with its counterparts, Enumerators and Enumeratees) are great to manipulate/transform linear streams of data chunks in a very reactive (non-blocking & asynchronous) and purely functional way:
- Enumerators identifies the data producer that can generate finite/infinite/procedural data streams.
- Iteratee is simply a data folder built as a state machine based on 3 states (Continue, Done, Error) which consumes data from Enumerator to compute a final result.
- Enumeratee is a kind of transducer able to adapt an Enumerator producing some type of data to an Iteratee that expects other type of data. Enumeratee can be used as both a pipe transformer and adapter.
Iteratee is really powerful but I must say I’ve always found them quite picky to use, practically speaking. In Play, they are used in their best use-case and they were created for that exactly. I’ve been using Iteratees for more than one year now but I still don’t feel fluent with them. Each time I use them, I must spend some time to know how I could write what I need. It’s not because they are purely functional (piping an Enumerator into an Enumeratee into an Iteratee is quite trivial) but there is something that my brain doesn’t want to catch.
If you want more details about my experience with Iteratees, go to this paragraph
That’s why I wanted to work with other functional streaming tools to see if they suffer the same kind of usability toughness or can bring something more natural to me. There are lots of other competitors on the field such as pipes, conduits and machines. As I don’t have physical time to study all of them in depth, I’ve chosen the one that appealed me the most i.e. Machines.
I’m not yet a Haskell coder even if I can mumble it so I preferred to evaluate the concept with scalaz-stream, a Scala implementation trying to bring machines to normal coders focusing on the aspect of IO streaming.
Scratching the concepts of Machine / Process ?
I’m not going to judge if Machines are better or not than Iteratees, this is not my aim. I’m just experimenting the concept in an objective way.
I won’t explain the concept of Machines in depth because it’s huge and I don’t think I have the theoretical background to do it right now. So, let’s focus on very basic ideas at first:
- Machine is a very generic concept that represents a data processing mechanism with potential multiple inputs, an output and monadic effects (typically Future input chunks, side-effects while transforming, delayed output…)
- To simplify, let say a machine is a bit like a mechano that you construct by plugging together other more generic machines (such as source, transducer, sink, tee, wye) as simply as pipes.
- Building a machine also means planning all the steps you will go through when managing streamed data but it doesn’t do anything until you run it (no side-effect, no resource consumption). You can re-run a machine as many times as you want.
- A machine is a state machine (Emit/Await/Halt) as Iteratee but it manages error in a more explicit way IMHO (fallback/error)
In scalaz-stream, you don’t manipulate machines which are too abstract for real-life use-cases but you manipulate simpler concepts:
Process[M, O]is a restricted machine outputting a stream of
O. It can be a source if the monadic effect gets input from I/O or generates procedural data, or a sink if you don’t care about the output. Please note that it doesn’t infer the type of potential input at all.
Wye[L, R, O]is a machine that takes 2 inputs (left
R) and outputs chunks of type
O(you can read from left or right or wait for both before ouputting)
Tee[L, R, O]is a Wye that can only read alternatively from left or from right but not from both at the same time.
Process1[I, O]can be seen as a transducer which accepts inputs of type
Iand outputs chunks of type
O(a bit like Enumeratee)
Channel[M, I, O]is an effectul channel that accepts input of type
Iand use it in a monadic effect
Mto produce potential
What I find attractive in Machines?
- Machines is producer/consumer/transducer in the same place and Machines can consume/fold as Iteratee, transform as Enumeratee and emit as Enumerator at the same time and it opens lots of possibilities (even if 3 concepts in one could make it more complicated too).
- I feel like playing with legos as you plug machines on machines and this is quite funny actually.
- Machines manages monadic effects in its design and doesn’t infer the type of effect so you can use it with I/O, Future and whatever you can imagine that is monadic…
- Machines provide out-of-the-box Tee/Wye to compose streams, interleave, zip them as you want without writing crazy code.
- The early code samples I’ve seen were quite easy to read (even the implementation is not so complex). Have a look at the
StartHeresample provided by scalaz-stream:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
But don’t think everything is so simple, machines is a complex concept with lots of theory behind it which is quite abstract. what I find very interesting is that it’s possible to vulgarize this very abstract concept with simpler concepts such as Process, Source, Sink, Tee, Wye… that you can catch quite easily as these are concepts you already manipulated when you were playing in your bathtub when you were child (or even now).
After these considerations, I wanted to experiment scalaz-stream with Play streaming capabilities in order to see how it behaves in a context I know.
Here is what I decided to study:
- Stream data out of a controller action using a scalaz-stream
- Call an AsyncWebService & consume the response as a stream of
Array[Byte]using a scalaz-stream
Here is existing Play API :
- Action provides
- WS call consuming response as a stream of data
WS.get(r: ResponseHeader => Iteratee)
As you can see, these API depends on Iteratee/Enumerator. As I didn’t want to hack Play too much as a beginning, I decided to try & plug scalaz-stream on Play Iteratee (if possible).
The idea is to take a scalaz-stream Source[O] (
Process[M,O]) and wrap it into an
Enumerator[O] so that it can be used in Play controller actions.
An Enumerator is a data producer which can generate those data using monadic
Future effects (Play Iteratee is tightly linked to
Process[Task, O] is a machine outputting a stream of
O so it’s logically the right candidate to be adapted with a
Enumerator[O]. Let’s remind’
Task is just a scalaz
Future[Either[Throwable,A]] with a few helpers and it’s used in scalaz-stream.
So I’ve implemented (at least tried) an
Enumerator[O] that accepts a
1 2 3 4 5 6
The implementation just synchronizes the states of the
Iteratee[O, A]consuming the
Enumeratorwith the states of
Process[Task, O]emitting data chunks of
O. It’s quite simple actually.
Process1[I, O] from
The idea is to drive an Iteratee from a scalaz-stream Process so that it can consume an Enumerator and be used in Play WS.
Iteratee[I, O] accepts inputs of type
I (and nothing else) and will fold the input stream into a single result of type
Process1[I, O] accepts inputs of type
I and emits chunks of type
O but not necessarily one single output chunk. So it’s a good candidate for our use-case but we need to choose which emitted chunk will be the result of the
Iteratee[I, O]. here, totally arbitrarily, I’ve chosen to take the first emit as the result (but the last would be as good if not better).
So I implemented the following:
1 2 3 4 5
The implementation is really raw for experimentation as it goes through the states of the
Process1[I,O]and generates the corresponding states of
Iteratee[I,O]until first emitted value. Nothing more nothing less…
A few basic action samples
Everything done in those samples could be done with Iteratee/Enumeratee more or less simply. The subject is not there!
Sample 1 : Generates a stream from a Simple Emitter Process
1 2 3 4 5
Sample 2 : Generates a stream from a continuous emitter
1 2 3 4 5 6 7
Sample 3 : Generates a stream whose output frequency is controlled by a tee with numeral generator on left and ticker on right
1 2 3 4 5 6 7 8 9 10 11 12 13 14
Please note :
scalaFuture2scalazTaskis just a helper to convert a
tickeris quite simple to understand: it awaits
Task[Int] and emits thisInt and repeats it again…
processes.zipWith((a,b) => a)is a tee (2 inputs left/right) that outputs only left data but consumes right also to have the delay effect.
.map(_.toString)simply converts into something writeable by
.intersperse(",")which simply add `”,” between each element
1 2 3 4 5 6
Sample 4 : Generates a stream using side-effect to control output frequency
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
delayedNumberuses an Akka scheduler to trigger our value after timeout
delayedNumeralsshows a simple recursive `Process[Task, Int] construction which shouldn’t be too hard to understand
1 2 3 4 5 6 7 8
Sample 5 : Generates a stream by consuming completely another stream
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
Process1[Array[Byte], String] that folds all receivedArray[Byte]
Iteratee[Array[Byte], String]driven by the
readerprocess that will fold all chunks of data received from WS call to
.get(rh => iterateeFirstEmit(reader))will return a
Future[Iteratee[Array[Byte], String]that is run in
.flatMap(_.run)to return a
Process.wrap(scalaFuture2scalazTask(maybeValues))is a trick to wrap the folded
Process.emitAll(values.split(","))splits the resulting string again and emits all chunks outside (stupid, just for demo)
Still there? Let’s dive deeper and be sharper!
Building recursive streaming action consuming itself
Hacking WS to consume & re-emit WS in realtime
WS.executeStream(r: ResponseHeader => Iteratee[Array[Byte], A]) is cool API because you can build an iteratee from the ResponseHeader and then the iteratee will consume received `Array[Byte] chunks in a reactive way and will fold them. The problem is that until the iteratee has finished, you won’t have any result.
But I’d like to be able to receive chunks of data in realtime and re-emit them immediately so that I can inject them in realtime data flow processing. WS API doesn’t allow this so I decided to hack it a bit. I’ve written
WSZ which provides the API:
1 2 3
This API outputs a realtime Stream of
Array[Byte] whose flow is controlled by promises (
Future) being redeemed in AsyncHttpClient
AsyncHandler. I didn’t care about ResponseHeaders for this experimentation but it should be taken account in a more serious impl.
I obtain a
Process[Future, Array[Byte]] streaming received chunks in realtime and I can then take advantage of the power of machines to manipulate the data chunks as I want.
Sample 6 : Generates a stream by forwarding/refolding another stream in realtime
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
def splitFold(splitter: String): Process.Process1[String, String]is just a demo that coding a Process transducer isn’t so crazy… Look at comments in code
Process[Task, Array[Byte]]using Scalaz Natural Transformation.
p |> splitFold(",")means “pipe output of process
pto input of
1 2 3 4 5 6
Let’s finish our trip with a bit of puzzle and mystery.
THE FINAL MYSTERY: recursive stream generating Fibonacci series
As soon as my first experimentations of scalaz-stream with Play were operational, I’ve imagined an interesting case:
Is it possible to build an action generating a stream of data fed by itself: a kind of recursive stream.
With Iteratee, it’s not really possible since it can’t emit data before finishing iteration. It would certainly be possible with an Enumeratee but the API doesn’t exist and I find it much more obvious with scalaz-stream API!
The mystery isn’t in the answer to my question: YES it is possible!
The idea is simple:
- Create a simple action
- Create a first process emitting a few initialization data
- Create a second process which consumes the WS calling my own action and re-emits the received chunks in realtime
- Append first process output and second process output
- Stream global output as a result of the action which will back-propagated along time to the action itself…
Naturally, if it consumes its own data, it will recall itself again and again and again until you reach the connections or opened file descriptors limit. As a consequence, you must limit the depth of recursion.
I performed different experiences to show this use-case by zipping the stream with itself, adding elements with themselves etc… And after a few tries, I implemented the following code quite fortuitously :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
WTF??? This is Fibonacci series?
Just to remind you about it:
1 2 3
Here is the mystery!!!
How does it work???
I won’t tell the answer to this puzzling side-effect and let you think about it and discover why it works XD
But this sample shows exactly what I wanted: Yes, it’s possible to feed an action with its own feed! Victory!
Ok all of that was really funky but is it useful in real projects? I don’t really know yet but it provides a great proof of the very reactive character of scalaz-stream and Play too!
I tend to like scalaz-stream and I feel more comfortable, more natural using Process than Iteratee right now… Maybe this is just an impression so I’ll keep cautious about my conclusions for now…
All of this code is just experimental so be aware about it. If you like it and see that it could be useful, tell me so that we create a real library from it!
Have Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun, Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun, Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,!
Here are a few things that bother me when I use Play Iteratee (you don’t have to agree, this is very subjective):
- Enumeratees are really powerful (maybe the most powerful part of the API) but they can be tricky: for ex, defining a new Enumeratee from scratch isn’t easy at first sight due to the signature of the Enumeratee itself, Enumeratee composes differently on left (with Enumerators) and on right (with Iteratees) and it can be strange at beginning…
- Enumerators are not defined (when not using helpers) in terms of the data they produce but with respect to the way an Iteratee will consume the data they will produce. You must somewhat reverse your way of thinking which is not so natural.
- Iteratees are great to produce one result by folding a stream of data but if you want to consume/cut/aggregate/re-emit the chunks, the code you write based on Iteratee/Enumeratee quickly becomes complex, hard to re-read and edge cases (error, end of stream) are hard to treat.
- When you want to manipulate multiple streams together, zip/interleave them, you must write very complex code too.
- End of iteration and Error management with Iteratees isn’t really clear IMHO and when you begin to compose Iteratees together, it becomes hard to know what will happen…
- If you want to manipulate a stream with side-effecting, you can do it with Enumeratees but it’s not so obvious…