Zpark-Zstream I article was a PoC trying to use Scalaz-Stream instead of DStream with Spark-Streaming. I had deliberately decided not to deal with fault-tolerance & stream graph persistence to keep simple but without it, it was quite useless for real application…
Here is a tryptic of articles trying to do something concrete with Scalaz-Stream and Spark.
So, what do I want? I wantttttttt a shrewburyyyyyy and to do the following:
Plug Scalaz-Stream Process[Task, T] on Spark DStream[T] (Part 1)
Build DStream using brand new Scalaz-Stream NIO API (client/server) (Part 2)
Train Spark-ML recommendation-like model using NIO client/server (Part 3)
Stream data from multiple NIO clients to the previous trained ML model mixing all together (Part 3)
Let’s remind that I’m not an expert in ML but more a student. So if I tell or do stupid ML things, be indulgent ;)
Here is what I propose:
Train a collaborative filtering rating model for a recommendation system (as explained in Spark doc there) using a first NIO server and a client as presented in part 2.
When model is trained, create a second server that will accept client connections to receive data.
Stream/merge all received data into one single stream, dstreamize it and perform streamed predictions using previous model.
Train collaborative filtering model
Training client
As explained in Spark doc about collaborative filtering, we first need some data to train the model. I want to send those data using a NIO client.
Here is a function doing this:
123456789101112131415161718192021222324
//////////////////////////////////////////////////// TRAINING CLIENTdeftrainingClient(addr:InetSocketAddress):Process[Task, Bytes]={// naturally you could provide much more datavalbasicTrainingData=Seq(// user ID, item ID, Rating"1,1,5.0","1,2,1.0","1,3,5.0","1,4,1.0","2,4,1.0","2,2,5.0")valtrainingProcess=Process.emitAll(basicTrainingDatamap(s=>Bytes.of((s+"\n").getBytes)))// sendAndCheckSize is a special client sending all data emitted // by the process and verifying the server received all data // by acknowledging all data sizevalclient=NioClient.sendAndCheckSize(addr,trainingProcess)client}
Training server
Now we need the training NIO server waiting for the training client to connect and piping the received data to the model.
Here is a useful function to help creating a server as described in previous article part:
12345678910111213
defserver(addr:InetSocketAddress):(Process[Task, Bytes],Signal[Boolean])={valstop=async.signal[Boolean]stop.set(false).run// this is a server that is controlled by a stop signal// and that acknowledges all received data by their sizevalserver=(stop.discretewyeNioServer.ackSize(addr))(wye.interrupt)// returns a stream of received data & a signal to stop server(server,stop)}
We can create the training server with it:
1234
valtrainingAddr=NioUtils.localAddress(11100)//////////////////////////////////////////////////// TRAINING SERVERval(trainingServer,trainingStop)=server(trainingAddr)
trainingServer is a Process[Task, Bytes] streaming the training data received from training client. We are going to train the rating model with them.
Training model
To train a model, we can use the following API:
12345678
// the rating with user ID, product ID & ratingcaseclassRating(valuser:Int,valproduct:Int,valrating:Double)// A RDD of ratingsvalratings:RDD[Rating]=...// train the model with itvalmodel:MatrixFactorizationModel=ALS.train(ratings,1,20,0.01)
Building RDD[Rating] from server stream
Imagine that we have a continuous flow of training data that can be very long.
We want to train the model with just a slice of this flow. To do this, we can:
valtrainingAddr=NioUtils.localAddress(11100)//////////////////////////////////////////////////// TRAINING SERVERval(trainingServer,trainingStop)=server(trainingAddr)//////////////////////////////////////////////////// TRAINING CLIENTvaltclient=trainingClient(trainingAddr)//////////////////////////////////////////////////// DStreamize server received dataval(trainingServerSink,trainingDstream)=dstreamize(trainingServer// converts bytes to String (doesn't care about encoding, it shall be UTF8).map(bytes=>newString(bytes.toArray))// rechunk received strings based on a separator \n // to keep only triplets: "USER_ID,PROD_ID,RATING".pipe(NioUtils.rechunk{s:String=>(s.split("\n").toVector,s.last=='\n')}),ssc)//////////////////////////////////////////////////// Prepare dstream output // (here we print to know what has been received)trainingDstream.print()//////////////////////////////////////////////////// RUN// Note the time beforevalbefore=newTime(System.currentTimeMillis)// Start SSCssc.start()// Launch servertrainingServerSink.run.runAsync(_=>())// Sleeps a bit to let server listenThread.sleep(300)// Launches client and awaits until it endstclient.run.run// Stop servertrainingStop.set(true).run// Note the time aftervalafter=newTime(System.currentTimeMillis)// retrieves all dstreamized RDD during this periodvalrdds=trainingDstream.slice(before.floor(Milliseconds(1000)),after.floor(Milliseconds(1000)))// unions them (this operation can be expensive)valunion:RDD[String]=newUnionRDD(ssc.sparkContext,rdds)// converts "USER_ID,PROD_ID,RATING" triplets into Ratingsvalratings=unionmap{e=>e.split(',')match{caseArray(user,item,rate)=>Rating(user.toInt,item.toInt,rate.toDouble)}}// finally train the model with itvalmodel=ALS.train(ratings,1,20,0.01)// Predictprintln("Prediction(1,3)="+model.predict(1,3))//////////////////////////////////////////////////// Stop SSCssc.stop()
Fantastic, we have trained our model in a very fancy way, haven’t we?
Personally, I find it interesting that we can take advantage of both APIs…
Predict Ratings
Now that we have a trained model, we can create a new server to receive data from clients for rating prediction.
Prediction client
Firstly, let’s generate some random data to send for prediction.
1234567891011121314151617181920212223
//////////////////////////////////////////////////// PREDICTION CLIENTdefpredictionClient(addr:InetSocketAddress):Process[Task, Bytes]={// PREDICTION DATAdefrndData=// userID(Math.abs(scala.util.Random.nextInt)%4+1).toString+// productID(Math.abs(scala.util.Random.nextInt)%4+1).toString+"\n"valrndDataProcess=Process.eval(Task.delay{rndData}).repeat// a 1000 elements process emitting every 10msvalpredictDataProcess=(Process.awakeEvery(10milliseconds)zipWithrndDataProcess){(_,s)=>Bytes.of(s.getBytes)}.take(1000)valclient=NioClient.sendAndCheckSize(addr,predictDataProcess)client}
predictServer is the stream of data to predict. Let’s stream it to the model by dstreamizing it and transforming all built RDDs by passing them through model
123456789101112131415161718192021
//////////////////////////////////////////////////// DStreamize serverval(predictServerSink,predictDstream)=dstreamize(predictServer// converts bytes to String (doesn't care about encoding, it shall be UTF8).map(bytes=>newString(bytes.toArray))// rechunk received strings based on a separator \n.pipe(NioUtils.rechunk{s:String=>(s.split("\n").toVector,s.last=='\n')}),ssc)//////////////////////////////////////////////////// pipe dstreamed RDD to prediction model// and print resultpredictDstreammap{_.split(',')match{// converts to integers required by the model (USER_ID, PRODUCT_ID)caseArray(user,item)=>(user.toInt,item.toInt)}}transform{rdd=>// prediction happens heremodel.predict(rdd)}print()
Running all in same StreamingContext
I’ve discovered a problem here because the recommendation model is built in a StreamingContext and uses RDDs built in it. So you must use the same StreamingContext for prediction. So I must build my training dstreamized client/server & prediction dstreamized client/server in the same context and thus I must schedule both things before starting this context.
Yet the prediction model is built from training data received after starting the context so it’s not known before… So it’s very painful and I decided to be nasty and consider the model as a variable that will be set later. For this, I used a horrible SyncVar to set the prediction model when it’s ready… Sorry about that but I need to study more about this issue to see if I can find better solutions because I’m not satisfied about it at all…
So here is the whole training/predicting painful code:
//////////////////////////////////////////////////// TRAININGvaltrainingAddr=NioUtils.localAddress(11100)// TRAINING SERVERval(trainingServer,trainingStop)=server(trainingAddr)// TRAINING CLIENTvaltclient=trainingClient(trainingAddr)// DStreamize serverval(trainingServerSink,trainingDstream)=dstreamize(trainingServer// converts bytes to String (doesn't care about encoding, it shall be UTF8).map(bytes=>newString(bytes.toArray))// rechunk received strings based on a separator \n.pipe(NioUtils.rechunk{s:String=>(s.split("\n").toVector,s.last=='\n')}),ssc)// THE HORRIBLE SYNCVAR CLUDGE (could have used atomic but not better IMHO)varmodel=newSyncVar[org.apache.spark.mllib.recommendation.MatrixFactorizationModel]// THE HORRIBLE SYNCVAR CLUDGE (could have used atomic but not better IMHO)//////////////////////////////////////////////////// PREDICTINGvalpredictAddr=NioUtils.localAddress(11101)// PREDICTION SERVERval(predictServer,predictStop)=server(predictAddr)// PREDICTION CLIENTvalpClient=predictionClient(predictAddr)// DStreamize serverval(predictServerSink,predictDstream)=dstreamize(predictServer// converts bytes to String (doesn't care about encoding, it shall be UTF8).map(bytes=>newString(bytes.toArray))// rechunk received strings based on a separator \n.pipe(NioUtils.rechunk{s:String=>(s.split("\n").toVector,s.last=='\n')}),ssc)// Piping received data to the modelpredictDstream.map{_.split(',')match{caseArray(user,item)=>(user.toInt,item.toInt)}}.transform{rdd=>// USE THE HORRIBLE SYNCVARmodel.get.predict(rdd)}.print()//////////////////////////////////////////////////// RUN ALLvalbefore=newTime(System.currentTimeMillis)// Start SSCssc.start()// Launch training servertrainingServerSink.run.runAsync(_=>())// Sleeps a bit to let server listenThread.sleep(300)// Launch training clienttclient.run.run// Await SSC termination a bitssc.awaitTermination(1000)// Stop training servertrainingStop.set(true).runvalafter=newTime(System.currentTimeMillis)valrdds=trainingDstream.slice(before.floor(Milliseconds(1000)),after.floor(Milliseconds(1000)))valunion:RDD[String]=newUnionRDD(ssc.sparkContext,rdds)valratings=unionmap{_.split(',')match{caseArray(user,item,rate)=>Rating(user.toInt,item.toInt,rate.toDouble)}}// SET THE HORRIBLE SYNCVARmodel.set(ALS.train(ratings,1,20,0.01))println("**** Model Trained -> Prediction(1,3)="+model.get.predict(1,3))// Launch prediction serverpredictServerSink.run.runAsync(_=>())// Sleeps a bit to let server listenThread.sleep(300)// Launch prediction clientpClient.run.run// Await SSC termination a bitssc.awaitTermination(1000)// Stop serverpredictStop.set(true).run
3 long articles to end in training a poor recommendation system with 2 clients/servers… A bit bloated isn’t it? :)
Anyway, I hope I printed in your brain a few ideas, concepts about spark & scalaz-stream and if I’ve reached this target, it’s already enough!
Yet, I’m not satisfied about a few things:
Training a model and using it in the same StreamingContext is still clumsy and I must say that calling model.predict from a map function in a DStream might not be so good in a cluster environment. I haven’t been digging this code enough to have a clear mind on it.
I tried using multiple clients for prediction (like 100 in parallel) and it works quite well but I have encountered problems ending both my clients/servers and the streaming context and I often end into having zombies SBT process that I can’t kill until reboot (some threads remain RUNNING while other AWAITS and sockets aren’t released… resources issues…). Closing cleanly all of these tools creating threads & more after intensive work isn’t yet good.
But, I’m satisfied globally:
Piping a scalaz-stream Process into a spark DStream works quite well and might be interesting after all.
The new scalaz-stream NIO API considering clients & servers as pure streams of data gave me so many ideas that my free-time has suddenly been frightened and went away.