Synopsis
The code & sample apps can be found on Github
Zpark-Zstream I article was a PoC trying to use Scalaz-Stream instead of DStream with Spark-Streaming. I had deliberately decided not to deal with fault-tolerance & stream graph persistence to keep simple but without it, it was quite useless for real application…
Here is a tryptic of articles trying to do something concrete with Scalaz-Stream and Spark.
So, what do I want? I wantttttttt a shrewburyyyyyy and to do the following:
- Plug Scalaz-Stream Process[Task, T] on Spark DStream[T] (Part 1)
- Build DStream using brand new Scalaz-Stream NIO API (client/server) (Part 2)
- Train Spark-ML recommendation-like model using NIO client/server (Part 3)
- Stream data from multiple NIO clients to the previous trained ML model mixing all together (Part 3)
[Part 1/3] From Scalaz-Stream Process to Spark DStream
Reminders on Process[Task, T]
Scalaz-stream Process[Task, T]
is a stream of T
elements that can interleave some Task
s (representing an external something doing somewhat). Process[Task, T]
is built as a state machine that you need to run to process all Task
effects and emit a stream of T
. This can manage both continuous or discrete, finite or infinite streams.
I restricted to
Task
for the purpose of this article but it can be anyF[_]
.
Reminders on DStream[T]
Spark DStream[T]
is a stream of RDD[T]
built by discretizing a continuous stream of T
. RDD[T]
is a resilient distributed dataset which is the ground data-structure behind Spark for distributing in-memory batch/map/reduce operations to a cluster of nodes with fault-tolerance & persistence.
In a summary, DStream
slices a continuous stream of T
by windows of time and gathers all T
s in the same window into one RDD[T]
. So it discretizes the continuous stream into a stream of RDD[T]
. Once built, those RDD[T]
s are distributed to Spark cluster. Spark allows to perform transform/union/map/reduce/… operations on RDD[T]
s. Therefore DStream[T]
takes advantage if the same operations.
Spark-Streaming also persists all operations & relations between DStream
s in a graph. Thus, in case of fault in a remote node while performing operations on DStream
s, the whole transformation can be replayed (it also means streamed data are also persisted).
Finally, the resulting DStream
obtained after map/reduce operations can be output to a file, a console, a DB etc…
Please note that
DStream[T]
is built with respect to aStreamingContext
which manages its distribution in Spark cluster and all operations performed on it. Moreover,DStream
map/reduce operations & output must be scheduled before starting theStreamingContext
. It could be somewhat compared to a state machine that you build statically and run later.
From Process[Task, T] to RDD[T]
You may ask why not simply build a
RDD[T]
from aProcess[Task, T]
?
Yes sure we can do it:
1 2 3 4 5 6 7 8 9 |
|
This works but what if this Process[Task, T]
emits huge quantity of data or is infinite?
You’ll end in a OutOfMemoryException
…
So yes you can do it but it’s not so interesting. DStream
seems more natural since it can manage stream of data as long as it can discretize it over time.
From Process[Task, T] to DStream[T]
Pull from Process[Task, T]
, Push to DStream[T]
with LocalInputDStream
To build a DStream[T]
from a Process[Task, T]
, the idea is to:
- Consume/pull the
T
emitted byProcess[Task, O]
, - Gather emitted
T
during a window of time & generate aRDD[T]
with them, - Inject
RDD[T]
into theDStream[T]
, - Go to next window of time…
Spark-Streaming library provides different helpers to create DStream
from different sources of data like files (local/HDFS), from sockets…
The helper that seemed the most appropriate is the NetworkInputDStream
:
- It provides a
NetworkReceiver
based on a Akka actor to which we can push streamed data. NetworkReceiver
gathers streamed data over windows of time and builds aBlockRDD[T]
for each window.- Each
BlockRDD[T]
is registered to the global SparkBlockManager
(responsible for data persistence). BlockRDD[T]
is injected into theDStream[T]
.
So basically, NetworkInputDStream
builds a stream of BlockRDD[T]
.
It’s important to note that NetworkReceiver
is also meant to be sent to remote workers so that data can be gathered on several nodes at the same time.
But in my case, the data source Process[Task, T]
run on the Spark driver node (at least for now) so instead of NetworkInputDStream
, a LocalInputDStream
would be better. It would provide a LocalReceiver
based on an actor to which we can push the data emitted by the process in an async way.
LocalInputDStream
doesn’t exist in Spark-Streaming library (or I haven’t looked well) so I’ve implemented it as I needed. It does exactly the same asNetworkInputDStream
without the remoting aspect. The current code is there…
Process
vs DStream
?
There is a common point between DStream
and Process
: both are built as state machines that are passive until run.
In the case of
Process
, it is run by playing all theTask
effects while gathering emitted values or without taking care of them, in blocking or non-blocking mode etc…In the case of
DStream
, it is built and registered in the context of aSparkStreamingContext
. Then you must also declare some outputs for theDStream
like a simple print, anHDFS
file output, etc… Finally you start theSparkStreamingContext
which manages everything for you until you stop it.
So if we want to adapt a Process[Task, T]
to a DStream[T]
, we must perform 4 steps (on the Spark driver node):
- build a
DStream[T]
usingLocalInputDStream[T]
providing aReceiver
in which we’ll be able to push asynchronouslyT
. - build a custom scalaz-stream
Sink[Task, T, Unit]
in charge of consuming all emitted data fromProcess[Task, T]
and pushing them using previousReceiver
. - pipe the
Process[Task, T]
to thisSink[Task, T, Unit]
& whenProcess[Task, T]
has halted, stop previousDStream[T]
: the result of this pipe operation is aProcess[Task, Unit]
which is a pure effectful process responsible for pushingT
into the dstream without emitting anything. - return previous
DStream[T]
and effectful consumerProcess[Task, Unit]
.
dstreamize
implementation
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
|
Please remark that this builds a
Process[Task, Unit]
and aDStream[T]
but nothing has happened yet in terms of data consumption & streaming. Both need to be run now.
Use it…
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
|
Please note that you have to:
- schedule your dstream operations/output before starting the streaming context.
- start the streaming context before running the consumer.
Run it…
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
|
Ok cool, we can see a warmup phase at beginning and then windows of 1 sec counting 20 elements which is great since one element every 50ms gives 20 elements in 1sec.
Part 1’s conclusion
Now we can pipe a Process[Task, T]
into a DStream[T]
.
Please not that as we run the Process[Task, T]
on the Spark driver node, if this node fails, there is no real way to restore lost data. Yet, LocalInputDStream
relies on DStreamGraph
& BlockRDD
s which persist all DStream relations & all received blocks. Moreover, DStream
has exactly the same problem with respect to driver node for now.
That was fun but what can we do with that?
In part2, I propose to have more fun and stream data to DStream
using the brand new Scalaz-Stream NIO API to create cool NIO client/server streams…
——————————————————————————————————-> GO TO PART2