The code & sample apps can be found on Github
Zpark-Zstream I article was a PoC trying to use Scalaz-Stream instead of DStream with Spark-Streaming. I had deliberately decided not to deal with fault-tolerance & stream graph persistence to keep simple but without it, it was quite useless for real application…
So, what do I want? I wantttttttt a shrewburyyyyyy and to do the following:
- Plug Scalaz-Stream Process[Task, T] on Spark DStream[T] (Part 1)
- Build DStream using brand new Scalaz-Stream NIO API (client/server) (Part 2)
- Train Spark-ML recommendation-like model using NIO client/server (Part 3)
- Stream data from multiple NIO clients to the previous trained ML model mixing all together (Part 3)
[Part 1/3] From Scalaz-Stream Process to Spark DStream
Reminders on Process[Task, T]
Process[Task, T] is a stream of
T elements that can interleave some
Tasks (representing an external something doing somewhat).
Process[Task, T] is built as a state machine that you need to run to process all
Task effects and emit a stream of
T. This can manage both continuous or discrete, finite or infinite streams.
I restricted to
Taskfor the purpose of this article but it can be any
Reminders on DStream[T]
DStream[T] is a stream of
RDD[T] built by discretizing a continuous stream of
RDD[T] is a resilient distributed dataset which is the ground data-structure behind Spark for distributing in-memory batch/map/reduce operations to a cluster of nodes with fault-tolerance & persistence.
In a summary,
DStream slices a continuous stream of
T by windows of time and gathers all
Ts in the same window into one
RDD[T]. So it discretizes the continuous stream into a stream of
RDD[T]. Once built, those
RDD[T]s are distributed to Spark cluster. Spark allows to perform transform/union/map/reduce/… operations on
DStream[T] takes advantage if the same operations.
Spark-Streaming also persists all operations & relations between
DStreams in a graph. Thus, in case of fault in a remote node while performing operations on
DStreams, the whole transformation can be replayed (it also means streamed data are also persisted).
Finally, the resulting
DStream obtained after map/reduce operations can be output to a file, a console, a DB etc…
Please note that
DStream[T]is built with respect to a
StreamingContextwhich manages its distribution in Spark cluster and all operations performed on it. Moreover,
DStreammap/reduce operations & output must be scheduled before starting the
StreamingContext. It could be somewhat compared to a state machine that you build statically and run later.
From Process[Task, T] to RDD[T]
You may ask why not simply build a
Yes sure we can do it:
1 2 3 4 5 6 7 8 9
This works but what if this
Process[Task, T] emits huge quantity of data or is infinite?
You’ll end in a
So yes you can do it but it’s not so interesting.
DStream seems more natural since it can manage stream of data as long as it can discretize it over time.
From Process[Task, T] to DStream[T]
Process[Task, T], Push to
To build a
DStream[T] from a
Process[Task, T], the idea is to:
- Consume/pull the
- Gather emitted
Tduring a window of time & generate a
- Go to next window of time…
Spark-Streaming library provides different helpers to create
DStream from different sources of data like files (local/HDFS), from sockets…
The helper that seemed the most appropriate is the
- It provides a
NetworkReceiverbased on a Akka actor to which we can push streamed data.
NetworkReceivergathers streamed data over windows of time and builds a
BlockRDD[T]for each window.
BlockRDD[T]is registered to the global Spark
BlockManager(responsible for data persistence).
BlockRDD[T]is injected into the
NetworkInputDStream builds a stream of
It’s important to note that
NetworkReceiver is also meant to be sent to remote workers so that data can be gathered on several nodes at the same time.
But in my case, the data source
Process[Task, T] run on the Spark driver node (at least for now) so instead of
LocalInputDStream would be better. It would provide a
LocalReceiver based on an actor to which we can push the data emitted by the process in an async way.
LocalInputDStreamdoesn’t exist in Spark-Streaming library (or I haven’t looked well) so I’ve implemented it as I needed. It does exactly the same as
NetworkInputDStreamwithout the remoting aspect. The current code is there…
There is a common point between
Process: both are built as state machines that are passive until run.
In the case of
Process, it is run by playing all the
Taskeffects while gathering emitted values or without taking care of them, in blocking or non-blocking mode etc…
In the case of
DStream, it is built and registered in the context of a
SparkStreamingContext. Then you must also declare some outputs for the
DStreamlike a simple print, an
HDFSfile output, etc… Finally you start the
SparkStreamingContextwhich manages everything for you until you stop it.
So if we want to adapt a
Process[Task, T] to a
DStream[T], we must perform 4 steps (on the Spark driver node):
- build a
Receiverin which we’ll be able to push asynchronously
- build a custom scalaz-stream
Sink[Task, T, Unit]in charge of consuming all emitted data from
Process[Task, T]and pushing them using previous
- pipe the
Process[Task, T]to this
Sink[Task, T, Unit]& when
Process[Task, T]has halted, stop previous
DStream[T]: the result of this pipe operation is a
Process[Task, Unit]which is a pure effectful process responsible for pushing
Tinto the dstream without emitting anything.
- return previous
DStream[T]and effectful consumer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
Please remark that this builds a
Process[Task, Unit]and a
DStream[T]but nothing has happened yet in terms of data consumption & streaming. Both need to be run now.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
Please note that you have to:
- schedule your dstream operations/output before starting the streaming context.
- start the streaming context before running the consumer.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
Ok cool, we can see a warmup phase at beginning and then windows of 1 sec counting 20 elements which is great since one element every 50ms gives 20 elements in 1sec.
Part 1’s conclusion
Now we can pipe a
Process[Task, T] into a
Please not that as we run the
Process[Task, T] on the Spark driver node, if this node fails, there is no real way to restore lost data. Yet,
LocalInputDStream relies on
BlockRDDs which persist all DStream relations & all received blocks. Moreover,
DStream has exactly the same problem with respect to driver node for now.
That was fun but what can we do with that?
In part2, I propose to have more fun and stream data to
DStream using the brand new Scalaz-Stream NIO API to create cool NIO client/server streams…
——————————————————————————————————-> GO TO PART2