Zpark-Zstream I article was a PoC trying to use Scalaz-Stream instead of DStream with Spark-Streaming. I had deliberately decided not to deal with fault-tolerance & stream graph persistence to keep simple but without it, it was quite useless for real application…
Here is a tryptic of articles trying to do something concrete with Scalaz-Stream and Spark.
So, what do I want? I wantttttttt a shrewburyyyyyy and to do the following:
Plug Scalaz-Stream Process[Task, T] on Spark DStream[T] (Part 1)
Build DStream using brand new Scalaz-Stream NIO API (client/server) (Part 2)
Train Spark-ML recommendation-like model using NIO client/server (Part 3)
Stream data from multiple NIO clients to the previous trained ML model mixing all together (Part 3)
[Part 2/3] From Scalaz-Stream NIO client & server to Spark DStream
Scalaz-Stream NIO Client
What is a client?
Something sending some data W(for Write) to a server
Something reading some data I(for Input) from a server
Client seen as Process
A client could be represented as:
a Process[Task, I] for input channel (receiving from server)
a Process[Task, W] for output channel (sending to server)
In scalaz-stream, recently a new structure has been added :
Now, let’s consider that we work in NIO mode with everything non-blocking, asynchronous etc…
In this context, a client can be seen as something generating soon or later one (or more)Exchange[I, W] i.e :
1
Client[I, W]===Process[Task, Exchange[I, W]]
In the case of a pure TCP client, I and W are often Bytes.
Creating a client
Scalaz-Stream now provides a helper to create a TCP binary NIO client:
123456789101112
// the address of the server to connect tovaladdress:InetSocketAddress=newInetSocketAddress("xxx.yyy.zzz.ttt",port)// create a clientvalclient:Process[Task, Exchange[Bytes, Bytes]]=nio.connect(address)clientmap{ex:Exchange[Bytes, Bytes]=>// read data sent by server in ex.read???// write data to the server with ex.write???}
Plug your own data source on Exchange
To plug your own data source to write to server, Scalaz-Stream provides 1 more API:
123456789
caseclassExchange[I, W](read:Process[Task, I],write:Sink[Task, W]){/** * Runs supplied Process of `W` values by sending them to remote system. * Any replies from remote system are received as `I` values of the resulting process. */defrun(p:Process[Task,W]):Process[Task,I]// the W are sent to the server and we retrieve only the received data}
With this API, we can write data to the client and output received data.
123456789
// some data to be sent by clientvaldata:Process[Task, W]=...// send data and retrieve only responses received by clientvaloutput:Process[Task, I]=clientflatMap{ex=>ex.run(data)}valreceivedData:Seq[Bytes]=output.runLog.run
Yet, in general, we need to:
send custom data to the server
expect its response
do some business logic
send more data
etc…
So we need to be able to gather in the same piece of code received & emitted data.
Managing client/server business logic with Wye
Scalaz-stream can help us with the following API:
123456789101112
caseclassExchange[I, W](read:Process[Task, I],write:Sink[Task, W]){.../** * Transform this Exchange to another Exchange where queueing, and transformation of this `I` and `W` * is controlled by supplied WyeW. */defwye(w:Wye[Task, I, W2, W\/I2]):Exchange[I2, W2]...// It transforms the original Exchange[I, W] into a Exchange[I2, W2]}
Whoaaaaa complex isn’t it? Actually not so much…
Wye is a fantastic tool that can:
read from a left and/or right branch (in a non-deterministic way: left or right or left+right),
perform computation on left/right received data,
emit data in output.
I love ASCII art schema:
12345678910111213
>Wye[Task, I, I2, W]I(left)I2(right)vv||---------|---------------------------|Wye[Task, I, I2, W]|---------------------------|vW
\/ is ScalaZ disjunction also called `Either in the Scala world.
So Wye[Task, I, W2, W \/ I2] can be seen as:
123456789101112131415
>Wye[Task, I, W2, W\/I2]IW2vv||---------|---------------------------|Wye[Task, I, W2, W\/I2]|---------------------------|---------||vvWI2
So what does this Exchange.wye API do?
It plugs the original Exchange.write: Sink[Task, W] to the W output of the Wye[Task, I, W2, W \/ I2] for sending data to the server.
It plugs the Exchange.read: Process[Task, I] receiving data from server to the left input of the Wye[Task, I, W2, W].
The right intput W2 branch provides a plug for an external source of data in the shape of Process[Task, W2].
The right output I2 can be used to pipe data from the client to an external local process (like streaming out data received from the server).
Finally it returns an Exchange[I2, W2].
In a summary:
1234567891011121314151617181920212223
>(ex:Exchange[I, W]).wye(w:Wye[Task, I, W2, W\/I2])ex.read|vIW2vv||---------|-----------------------------|w:Wye[Task, I, W2, W\/I2]|-----------------------------|---------||vvWI2|vex.write======>ReturnsExchange[I2, W2]
As a conclusion, Exchange.wyecombines the original Exchange[I, W] with your custom Wye[Task, I, W2, W \/ I2] which represents the business logic of data exchange between client & server and finally returns a Exchange[I2, W2] on which you can plug your own data source and retrieve output data.
Implement the client with wye/run
123456789101112131415
// The source of data to send to servervaldata2Send:Process[Task, Bytes]=...// The logic of your exchange between client & servervalclientWye:Wye[Task, Bytes, Bytes, Bytes\/Bytes])=...// Scary, there are so many BytesvalclientReceived:Process[Task, Bytes]=for{// client connects to the server & returns Exchangeex<-nio.connect(address)// Exchange is customized with clientWye// Data are injected in it with runoutput<-ex.wye(clientWye).run(data2Send)}yield(output)
Implement simple client/server business logic?
Please note, I simply reuse the basic echo example provided in scalaz-stream ;)
defclientEcho(address:InetSocketAddress,data:Bytes):Process[Task, Bytes]={// the custom Wye managing business logicdefechoLogic:Wye[Bytes, Bytes, Bytes, Byte\/Bytes]={defgo(collected:Int):WyeW[Bytes, Bytes, Bytes, Bytes]={// Create a Wye that can receive on both sidesreceiveBoth{// Receive on left == receive from servercaseReceiveL(rcvd)=>// `emitO` outputs on `I2` branch and then...emitO(rcvd)fby// if we have received everything sent, halt(if(collected+rcvd.size>=data.size)halt// else go on collectingelsego(collected+rcvd.size))// Receive on right == receive on `W2` branch == your external data sourcecaseReceiveR(data)=>// `emitW` outputs on `W` branch == sending to server// and loopsemitW(data)fbygo(collected)// When server closescaseHaltL(rsn)=>Halt(rsn)// When client closes, we go on collecting echoescaseHaltR(_)=>go(collected)}}// Initgo(0)}// Finally wiring all...for{ex<-nio.connect(address)rslt<-ex.wye(echoSent).run(emit(data))}yield{rslt}}
This might seem hard to catch to some people because of scalaz-stream notations and wye Left/Right/Both or wye.emitO/emitW. But actually, you’ll get used to it quite quickly as soon as you understand wye. Keep in mind that this code uses low-level scalaz-stream API without anything else and it remains pretty simple and straighforward.
Run the client for its output
123456789
// create a client that sends 1024 random bytesvaldataArray=Array.fill[Byte](1024)(1)scala.util.Random.nextBytes(dataArray)valclientOutput=clientEcho(addr,Bytes.of(dataArray))// consumes all received data... (it should contain dataArray)valresult=clientOutput.runLog.runprintln("Client received:"+result)
Whoooohoooo, a server is just a stream of streams!!!!
Writing a server
Scalaz-Stream now provides a helper to create a TCP binary NIO server:
12345678910111213141516
// the address of the servervaladdress:InetSocketAddress=newInetSocketAddress("xxx.yyy.zzz.ttt",port)// create a servervalserver:Process[Task, Process[Task, Exchange[Bytes, Bytes]]]=nio.server(address)servermap{client=>// for each clientclientflatMap{ex:Exchange[Bytes, Bytes]=>// read data sent by client in ex.read???// write data to the client with ex.write???}}
Don’t you find that quite elegant? ;)
Managing client/server interaction business logic
There we simply re-use the Exchange described above so you can use exactly the same API than the ones for client. Here is another API that can be useful:
typeWrites1[W, I, I2]=Process[I, W\/I2]caseclassExchange[I, W](read:Process[Task, I],write:Sink[Task, W]){.../** * Transforms this exchange to another exchange, that for every received `I` will consult supplied Writer1 * and eventually transforms `I` to `I2` or to `W` that is sent to remote system. */defreadThrough[I2](w:Writer1[W, I, I2])(implicitS:Strategy=Strategy.DefaultStrategy):Exchange[I2,W]...}// A small schema?ex.read|vI|---------------------------|Writer1[W, I, I2]|---------------------------|---------||vvWI2|vex.write======>ReturnsExchange[I2, W]
With this API, you can compute some business logic on the received data from client.
Let’s write the echo server corresponding to the previous client (you can find this sample in scalaz-stream too):
12345678910111213141516171819
defserverEcho(address:InetSocketAddress):Process[Task, Process[Task, Bytes]]={// This is a Writer1 that echoes everything it receives to the client and emits it locallydefechoAll:Writer1[Bytes, Bytes, Bytes]=receive1[Bytes, Bytes\/Bytes]{i=>// echoes on left, emits on right and then loop (fby = followed by)emitSeq(Seq(\/-(i),-\/(i)))fbyechoAll}// The server that echoes everythingvalreceivedData:Process[Task, Process[Task, Bytes]]=for{client<-nio.server(address)rcv<-ex.readThrough(echoAll).run()}yieldrcv}receivedData}
receivedData is Process[Task, Process[Task, Bytes]] which is not so practical: we would prefer to gather all data received by clients in 1 single Process[Task, Bytes] to stream it to another module.
Scalaz-Stream has the solution again:
1234567
packageobjectmerge{/** * Merges non-deterministically processes that are output of the `source` process. */defmergeN[A](source:Process[Task, Process[Task, A]])(implicitS:Strategy=Strategy.DefaultStrategy):Process[Task, A]}
Please note the Strategy which corresponds to the way Tasks will be executed and that can be compared to Scala ExecutionContext.
Fantastic, let’s plug it on our server:
123456789101112131415161718192021
// The server that echoes everythingdefserverEcho(address:InetSocketAddress):Process[Task, Bytes]={// This is a Writer1 that echoes everything it receives to the client and emits it locallydefechoAll:Writer1[Bytes, Bytes, Bytes]=receive1[Bytes, Bytes\/Bytes]{i=>// echoes on left, emits on right and then loop (fby = followed by)emitSeq(Seq(\/-(i),-\/(i)))fbyechoAll}// The server that echoes everythingvalreceivedData:Process[Task, Process[Task, Bytes]]=for{client<-nio.server(address)rcv<-ex.readThrough(echoAll).run()}yieldrcv}// Merges all client streamsmerge.mergeN(receivedData)}
Finally, we have a server and a client!!!!!
Let’s plug them all together
Run a server
First of all, we need to create a server that can be stopped when required.
Let’s do in the scalaz-stream way using:
wye.interrupt :
123456
/** * Let through the right branch as long as the left branch is `false`, * listening asynchronously for the left branch to become `true`. * This halts as soon as the right branch halts. */definterrupt[I]:Wye[Boolean, I, I]
async.signal which is a value that can be changed asynchronously based on 2 APIs:
12345678910
/** * Sets the value of this `Signal`. */defset(a:A):Task[Unit]/** * Returns the discrete version of this signal, updated only when `value` * is changed ... */defdiscrete:Process[Task, A]
Without lots of imagination, we can use a Signal[Boolean].discrete to obtain a Process[Task, Boolean] and wye it with previous server process using wye.interrupt. Then, to stop server, you just have to call:
1
signal.set(true)
Here is the full code:
1234567891011121314151617
// local bind addressvaladdr=localAddress(12345)// The stop signal initialized to falsevalstop=async.signal[Boolean]stop.set(false).run// Create the server controlled by the previous signalvalstoppableServer=(stop.discretewyeechoServer(addr))(wye.interrupt)// Run server in async without taking care of output datastopServer.runLog.runAsync(_=>())// DO OTHER THINGS// stop serverstop.set(true)
Run server & client in the same code
1234567891011121314151617181920212223
// local bind addressvaladdr=localAddress(12345)// the stop signal initialized to falsevalstop=async.signal[Boolean]stop.set(false).run// Create the server controlled by the previous signalvalstoppableServer=(stop.discretewyeserverEcho(addr))(wye.interrupt)// Run server in async without taking care of output datastoppableServer.runLog.runAsync(_=>())// create a client that sends 1024 random bytesvaldataArray=Array.fill[Byte](1024)(1)scala.util.Random.nextBytes(dataArray)valclientOutput=clientEcho(addr,Bytes.of(dataArray))// Consume all received data in a blocking way...valresult=clientOutput.runLog.run// stop serverstop.set(true)
Naturally you rarely run the client & server in the same code but this is funny to see our easily you can do that with scalaz-stream as you just manipulate Process run on provided Strategy
Finally, we can go back to our subject: feeding a DStream using a scalaz-stream NIO client/server
Pipe server output to DStream
clientEcho/serverEcho are simple samples but not very useful.
Now we are going to use a custom client/server I’ve written for this article:
NioClient.sendAndCheckSize is a client streaming all emitted data of a Process[Task, Bytes] to the server and checking that the global size has been ack’ed by server.
NioServer.ackSize is a server acknowledging all received packets by their size (as a 4-bytes Int)
Now let’s write a client/server dstreamizing data to Spark:
// First create a streaming contextvalssc=newStreamingContext(clusterUrl,"SparkStreamStuff",Seconds(1))// Local bind addressvaladdr=localAddress(12345)// The stop signal initialized to falsevalstop=async.signal[Boolean]stop.set(false).run// Create the server controlled by the previous signalvalstoppableServer=(stop.discretewyeNioServer.ackSize(addr))(wye.interrupt)// Create a client that sends a natural integer every 50ms as a string (until reaching 100)valclientData:Process[Task, Bytes]=naturalsEvery(50milliseconds).take(100).map(i=>Bytes.of(i.toString.getBytes))valclientOutput=NioClient.sendAndCheckSize(addr,clientData)// Dstreamize the server into the streaming contextval(consumer,dstream)=dstreamize(stoppableServer,ssc)// Prepare dstream outputdstream.map(bytes=>newString(bytes.toArray)).print()// Start the streaming contextssc.start()// Run the server just for its effectsconsumer.run.runAsync(_=>())// Run the client in a blocking wayclientOutput.runLog.run// Await SSC termination a bitssc.awaitTermination(1000)// stop serverstop.set(true)// stop the streaming contextssc.stop()
I spent this second part of my tryptic mainly explaining a few concepts of the new scalaz-stream brand new NIO API. With it, a client becomes just a stream of exchanges Process[Task, Exchange[I, W]] and a server becomes a stream of stream of exchanges Process[Task, Process[Task, Exchange[I, W]]].
As soon as you manipulate Process, you can then use the dstreamize API exposed in Part 1 to pipe streamed data into Spark.
Let’s go to Part 3 now in which we’re going to do some fancy Machine Learning training with these new tools.