Synopsis

The code & sample apps can be found on Github

Zpark-Zstream I article was a PoC trying to use Scalaz-Stream instead of DStream with Spark-Streaming. I had deliberately decided not to deal with fault-tolerance & stream graph persistence to keep simple but without it, it was quite useless for real application…

Here is a tryptic of articles trying to do something concrete with Scalaz-Stream and Spark.

So, what do I want? I wantttttttt a shrewburyyyyyy and to do the following:

  1. Plug Scalaz-Stream Process[Task, T] on Spark DStream[T] (Part 1)
  2. Build DStream using brand new Scalaz-Stream NIO API (client/server) (Part 2)
  3. Train Spark-ML recommendation-like model using NIO client/server (Part 3)
  4. Stream data from multiple NIO clients to the previous trained ML model mixing all together (Part 3)

[Part 3/3] Fancy Spark Machine Learning with NIO client/server & DStream…


Let’s remind that I’m not an expert in ML but more a student. So if I tell or do stupid ML things, be indulgent ;)

Here is what I propose:

  • Train a collaborative filtering rating model for a recommendation system (as explained in Spark doc there) using a first NIO server and a client as presented in part 2.

  • When model is trained, create a second server that will accept client connections to receive data.

  • Stream/merge all received data into one single stream, dstreamize it and perform streamed predictions using previous model.

Train collaborative filtering model

Training client

As explained in Spark doc about collaborative filtering, we first need some data to train the model. I want to send those data using a NIO client.

Here is a function doing this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//////////////////////////////////////////////////
// TRAINING CLIENT
def trainingClient(addr: InetSocketAddress): Process[Task, Bytes] = {

  // naturally you could provide much more data
  val basicTrainingData = Seq(
    // user ID, item ID, Rating
    "1,1,5.0",
    "1,2,1.0",
    "1,3,5.0",
    "1,4,1.0",
    "2,4,1.0",
    "2,2,5.0"
  )
  val trainingProcess =
    Process.emitAll(basicTrainingData map (s => Bytes.of((s+"\n").getBytes)))

  // sendAndCheckSize is a special client sending all data emitted 
  // by the process and verifying the server received all data 
  // by acknowledging all data size
  val client = NioClient.sendAndCheckSize(addr, trainingProcess)

  client
}

Training server

Now we need the training NIO server waiting for the training client to connect and piping the received data to the model.

Here is a useful function to help creating a server as described in previous article part:

1
2
3
4
5
6
7
8
9
10
11
12
13
def server(addr: InetSocketAddress): (Process[Task, Bytes], Signal[Boolean]) = {

  val stop = async.signal[Boolean]
  stop.set(false).run

  // this is a server that is controlled by a stop signal
  // and that acknowledges all received data by their size
  val server =
    ( stop.discrete wye NioServer.ackSize(addr) )(wye.interrupt)

  // returns a stream of received data & a signal to stop server
  (server, stop)
}

We can create the training server with it:

1
2
3
4
val trainingAddr = NioUtils.localAddress(11100)
//////////////////////////////////////////////////
// TRAINING SERVER
val (trainingServer, trainingStop) = server(trainingAddr)

trainingServer is a Process[Task, Bytes] streaming the training data received from training client. We are going to train the rating model with them.


Training model

To train a model, we can use the following API:

1
2
3
4
5
6
7
8
// the rating with user ID, product ID & rating
case class Rating(val user: Int, val product: Int, val rating: Double)

// A RDD of ratings
val ratings: RDD[Rating] = ...

// train the model with it
val model: MatrixFactorizationModel = ALS.train(ratings, 1, 20, 0.01)

Building RDD[Rating] from server stream

Imagine that we have a continuous flow of training data that can be very long.

We want to train the model with just a slice of this flow. To do this, we can:

  • dstreamize the server output stream
  • run the dstream for some time
  • retrieve the RDDs received during this time
  • union all of those RDDs
  • push them to the model

Here is the whole code with previous client:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
val trainingAddr = NioUtils.localAddress(11100)

//////////////////////////////////////////////////
// TRAINING SERVER
val (trainingServer, trainingStop) = server(trainingAddr)

//////////////////////////////////////////////////
// TRAINING CLIENT
val tclient = trainingClient(trainingAddr)

//////////////////////////////////////////////////
// DStreamize server received data
val (trainingServerSink, trainingDstream) = dstreamize(
  trainingServer
      // converts bytes to String (doesn't care about encoding, it shall be UTF8)
      .map  ( bytes => new String(bytes.toArray) )
      // rechunk received strings based on a separator \n 
      // to keep only triplets: "USER_ID,PROD_ID,RATING"
      .pipe (NioUtils.rechunk { s:String => (s.split("\n").toVector, s.last == '\n') } )
  , ssc
)

//////////////////////////////////////////////////
// Prepare dstream output 
// (here we print to know what has been received)
trainingDstream.print()

//////////////////////////////////////////////////
// RUN

// Note the time before
val before = new Time(System.currentTimeMillis)

// Start SSC
ssc.start()

// Launch server
trainingServerSink.run.runAsync( _ => () )

// Sleeps a bit to let server listen
Thread.sleep(300)

// Launches client and awaits until it ends
tclient.run.run

// Stop server
trainingStop.set(true).run

// Note the time after
val after = new Time(System.currentTimeMillis)

// retrieves all dstreamized RDD during this period
val rdds = trainingDstream.slice(
  before.floor(Milliseconds(1000)), after.floor(Milliseconds(1000))
)

// unions them (this operation can be expensive)
val union: RDD[String] = new UnionRDD(ssc.sparkContext, rdds)

// converts "USER_ID,PROD_ID,RATING" triplets into Ratings
val ratings = union map { e =>
  e.split(',') match {
    case Array(user, item, rate) => Rating(user.toInt, item.toInt, rate.toDouble)
  }
}

// finally train the model with it
val model = ALS.train(ratings, 1, 20, 0.01)

// Predict
println("Prediction(1,3)=" + model.predict(1, 3))

//////////////////////////////////////////////////
// Stop SSC
ssc.stop()

Run it

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
-------------------------------------------
Time: 1395079621000 ms
-------------------------------------------
1,1,5.0
1,2,1.0
1,3,5.0
1,4,1.0
2,4,1.0
2,2,5.0

-------------------------------------------
Time: 1395079622000 ms
-------------------------------------------

-------------------------------------------
Time: 1395079623000 ms
-------------------------------------------

-------------------------------------------
Time: 1395079624000 ms
-------------------------------------------

-------------------------------------------
Time: 1395079625000 ms
-------------------------------------------

Prediction(1,3)=4.94897842056338

Fantastic, we have trained our model in a very fancy way, haven’t we?

Personally, I find it interesting that we can take advantage of both APIs…



Predict Ratings

Now that we have a trained model, we can create a new server to receive data from clients for rating prediction.


Prediction client

Firstly, let’s generate some random data to send for prediction.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//////////////////////////////////////////////////
// PREDICTION CLIENT
def predictionClient(addr: InetSocketAddress): Process[Task, Bytes] = {

  // PREDICTION DATA
  def rndData =
    // userID
    (Math.abs(scala.util.Random.nextInt) % 4 + 1).toString +
    // productID
    (Math.abs(scala.util.Random.nextInt) % 4 + 1).toString +
    "\n"

  val rndDataProcess = Process.eval(Task.delay{ rndData }).repeat

  // a 1000 elements process emitting every 10ms
  val predictDataProcess =
    (Process.awakeEvery(10 milliseconds) zipWith rndDataProcess){ (_, s) => Bytes.of(s.getBytes) }
      .take(1000)

  val client = NioClient.sendAndCheckSize(addr, predictDataProcess)

  client
}

Prediction server

1
2
3
4
val predictAddr = NioUtils.localAddress(11101)
//////////////////////////////////////////////////
// PREDICTION SERVER
val (predictServer, predictStop) = server(predictAddr)

Prediction Stream

predictServer is the stream of data to predict. Let’s stream it to the model by dstreamizing it and transforming all built RDDs by passing them through model

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//////////////////////////////////////////////////
// DStreamize server
val (predictServerSink, predictDstream) = dstreamize(
  predictServer
      // converts bytes to String (doesn't care about encoding, it shall be UTF8)
      .map  ( bytes => new String(bytes.toArray) )
      // rechunk received strings based on a separator \n
      .pipe (NioUtils.rechunk { s:String => (s.split("\n").toVector, s.last == '\n') } )
  , ssc
)

//////////////////////////////////////////////////
// pipe dstreamed RDD to prediction model
// and print result
predictDstream map { _.split(',') match {
  // converts to integers required by the model (USER_ID, PRODUCT_ID)
  case Array(user, item) => (user.toInt, item.toInt)
}} transform { rdd =>
  // prediction happens here
  model.predict(rdd)
} print()

Running all in same StreamingContext

I’ve discovered a problem here because the recommendation model is built in a StreamingContext and uses RDDs built in it. So you must use the same StreamingContext for prediction. So I must build my training dstreamized client/server & prediction dstreamized client/server in the same context and thus I must schedule both things before starting this context.

Yet the prediction model is built from training data received after starting the context so it’s not known before… So it’s very painful and I decided to be nasty and consider the model as a variable that will be set later. For this, I used a horrible SyncVar to set the prediction model when it’s ready… Sorry about that but I need to study more about this issue to see if I can find better solutions because I’m not satisfied about it at all…

So here is the whole training/predicting painful code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
//////////////////////////////////////////////////
// TRAINING

val trainingAddr = NioUtils.localAddress(11100)

// TRAINING SERVER
val (trainingServer, trainingStop) = server(trainingAddr)

// TRAINING CLIENT
val tclient = trainingClient(trainingAddr)

// DStreamize server
val (trainingServerSink, trainingDstream) = dstreamize(
  trainingServer
      // converts bytes to String (doesn't care about encoding, it shall be UTF8)
      .map  ( bytes => new String(bytes.toArray) )
      // rechunk received strings based on a separator \n
      .pipe (NioUtils.rechunk { s:String => (s.split("\n").toVector, s.last == '\n') } )
  , ssc
)

// THE HORRIBLE SYNCVAR CLUDGE (could have used atomic but not better IMHO)
var model = new SyncVar[org.apache.spark.mllib.recommendation.MatrixFactorizationModel]
// THE HORRIBLE SYNCVAR CLUDGE (could have used atomic but not better IMHO)


//////////////////////////////////////////////////
// PREDICTING
val predictAddr = NioUtils.localAddress(11101)

// PREDICTION SERVER
val (predictServer, predictStop) = server(predictAddr)

// PREDICTION CLIENT
val pClient = predictionClient(predictAddr)

// DStreamize server
val (predictServerSink, predictDstream) = dstreamize(
  predictServer
      // converts bytes to String (doesn't care about encoding, it shall be UTF8)
      .map  ( bytes => new String(bytes.toArray) )
      // rechunk received strings based on a separator \n
      .pipe ( NioUtils.rechunk { s:String => (s.split("\n").toVector, s.last == '\n') } )
  , ssc
)

// Piping received data to the model
predictDstream.map {
  _.split(',') match {
    case Array(user, item) => (user.toInt, item.toInt)
  }
}.transform { rdd =>
  // USE THE HORRIBLE SYNCVAR
  model.get.predict(rdd)
}.print()

//////////////////////////////////////////////////
// RUN ALL
val before = new Time(System.currentTimeMillis)

// Start SSC
ssc.start()

// Launch training server
trainingServerSink.run.runAsync( _ => () )

// Sleeps a bit to let server listen
Thread.sleep(300)

// Launch training client
tclient.run.run

// Await SSC termination a bit
ssc.awaitTermination(1000)
// Stop training server
trainingStop.set(true).run
val after = new Time(System.currentTimeMillis)

val rdds = trainingDstream.slice(before.floor(Milliseconds(1000)), after.floor(Milliseconds(1000)))
val union: RDD[String] = new UnionRDD(ssc.sparkContext, rdds)

val ratings = union map {
  _.split(',') match {
    case Array(user, item, rate) => Rating(user.toInt, item.toInt, rate.toDouble)
  }
}

// SET THE HORRIBLE SYNCVAR
model.set(ALS.train(ratings, 1, 20, 0.01))

println("**** Model Trained -> Prediction(1,3)=" + model.get.predict(1, 3))

// Launch prediction server
predictServerSink.run.runAsync( _ => () )

// Sleeps a bit to let server listen
Thread.sleep(300)

// Launch prediction client
pClient.run.run

// Await SSC termination a bit
ssc.awaitTermination(1000)
// Stop server
predictStop.set(true).run

Run it…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
-------------------------------------------
Time: 1395144379000 ms
-------------------------------------------
1,1,5.0
1,2,1.0
1,3,5.0
1,4,1.0
2,4,1.0
2,2,5.0

**** Model Trained -> Prediction(1,3)=4.919459410565401

...

-------------------------------------------
Time: 1395144384000 ms
-------------------------------------------
----------------

-------------------------------------------
Time: 1395144385000 ms
-------------------------------------------
Rating(1,1,4.919459410565401)
Rating(1,1,4.919459410565401)
Rating(1,1,4.919459410565401)
Rating(1,1,4.919459410565401)
Rating(1,2,1.631952450379809)
Rating(1,3,4.919459410565401)
Rating(1,3,4.919459410565401)

-------------------------------------------
Time: 1395144386000 ms
-------------------------------------------
Rating(1,1,4.919459410565401)
Rating(1,1,4.919459410565401)
Rating(1,3,4.919459410565401)
Rating(1,3,4.919459410565401)
Rating(1,3,4.919459410565401)
Rating(1,4,0.40813133837755494)
Rating(1,4,0.40813133837755494)

...


Final conclusion

3 long articles to end in training a poor recommendation system with 2 clients/servers… A bit bloated isn’t it? :)

Anyway, I hope I printed in your brain a few ideas, concepts about spark & scalaz-stream and if I’ve reached this target, it’s already enough!

Yet, I’m not satisfied about a few things:

  • Training a model and using it in the same StreamingContext is still clumsy and I must say that calling model.predict from a map function in a DStream might not be so good in a cluster environment. I haven’t been digging this code enough to have a clear mind on it.
  • I tried using multiple clients for prediction (like 100 in parallel) and it works quite well but I have encountered problems ending both my clients/servers and the streaming context and I often end into having zombies SBT process that I can’t kill until reboot (some threads remain RUNNING while other AWAITS and sockets aren’t released… resources issues…). Closing cleanly all of these tools creating threads & more after intensive work isn’t yet good.

But, I’m satisfied globally:

  • Piping a scalaz-stream Process into a spark DStream works quite well and might be interesting after all.
  • The new scalaz-stream NIO API considering clients & servers as pure streams of data gave me so many ideas that my free-time has suddenly been frightened and went away.


GO TO PART2 <—————————————————————————————————-

Have a look at the code on Github.

Have distributed & resilient yet continuous fun!





Synopsis

The code & sample apps can be found on Github

Zpark-Zstream I article was a PoC trying to use Scalaz-Stream instead of DStream with Spark-Streaming. I had deliberately decided not to deal with fault-tolerance & stream graph persistence to keep simple but without it, it was quite useless for real application…

Here is a tryptic of articles trying to do something concrete with Scalaz-Stream and Spark.

So, what do I want? I wantttttttt a shrewburyyyyyy and to do the following:

  1. Plug Scalaz-Stream Process[Task, T] on Spark DStream[T] (Part 1)
  2. Build DStream using brand new Scalaz-Stream NIO API (client/server) (Part 2)
  3. Train Spark-ML recommendation-like model using NIO client/server (Part 3)
  4. Stream data from multiple NIO clients to the previous trained ML model mixing all together (Part 3)

[Part 2/3] From Scalaz-Stream NIO client & server to Spark DStream

Scalaz-Stream NIO Client

What is a client?

  • Something sending some data W (for Write) to a server
  • Something reading some data I (for Input) from a server

Client seen as Process

A client could be represented as:

  • a Process[Task, I] for input channel (receiving from server)
  • a Process[Task, W] for output channel (sending to server)

In scalaz-stream, recently a new structure has been added :

1
final case class Exchange[I, W](read: Process[Task, I], write: Sink[Task, W])

Precisely what we need!

Now, let’s consider that we work in NIO mode with everything non-blocking, asynchronous etc…

In this context, a client can be seen as something generating soon or later one (or more) Exchange[I, W] i.e :

1
Client[I, W] === Process[Task, Exchange[I, W]]

In the case of a pure TCP client, I and W are often Bytes.

Creating a client

Scalaz-Stream now provides a helper to create a TCP binary NIO client:

1
2
3
4
5
6
7
8
9
10
11
12
// the address of the server to connect to
val address: InetSocketAddress = new InetSocketAddress("xxx.yyy.zzz.ttt", port)

// create a client
val client: Process[Task, Exchange[Bytes, Bytes]] = nio.connect(address)

client map { ex: Exchange[Bytes, Bytes] =>
  // read data sent by server in ex.read
  ???
  // write data to the server with ex.write
  ???
}

Plug your own data source on Exchange

To plug your own data source to write to server, Scalaz-Stream provides 1 more API:

1
2
3
4
5
6
7
8
9
case class Exchange[I, W](read: Process[Task, I], write: Sink[Task, W]) {
  /**
   * Runs supplied Process of `W` values by sending them to remote system.
   * Any replies from remote system are received as `I` values of the resulting process.
   */
  def run(p:Process[Task,W]): Process[Task,I]

  // the W are sent to the server and we retrieve only the received data
}

With this API, we can write data to the client and output received data.

1
2
3
4
5
6
7
8
9
// some data to be sent by client
val data: Process[Task, W] = ...

// send data and retrieve only responses received by client
val output: Process[Task, I] = client flatMap { ex =>
  ex.run(data)
}

val receivedData: Seq[Bytes] = output.runLog.run

Yet, in general, we need to:

  • send custom data to the server
  • expect its response
  • do some business logic
  • send more data
  • etc…

So we need to be able to gather in the same piece of code received & emitted data.


Managing client/server business logic with Wye

Scalaz-stream can help us with the following API:

1
2
3
4
5
6
7
8
9
10
11
12
case class Exchange[I, W](read: Process[Task, I], write: Sink[Task, W]) {
...
  /**
   * Transform this Exchange to another Exchange where queueing, and transformation of this `I` and `W`
   * is controlled by supplied WyeW.
   */
  def wye(w: Wye[Task, I, W2, W \/ I2]): Exchange[I2, W2]
...

// It transforms the original Exchange[I, W] into a Exchange[I2, W2]

}

Whoaaaaa complex isn’t it? Actually not so much…

Wye is a fantastic tool that can:

  • read from a left and/or right branch (in a non-deterministic way: left or right or left+right),
  • perform computation on left/right received data,
  • emit data in output.

I love ASCII art schema:

1
2
3
4
5
6
7
8
9
10
11
12
13
> Wye[Task, I, I2, W]

    I(left)       I2(right)
          v       v
          |       |
          ---------
              |
 ---------------------------
|    Wye[Task, I, I2, W]    |
 ---------------------------
              |
              v
              W

\/ is ScalaZ disjunction also called `Either in the Scala world.

So Wye[Task, I, W2, W \/ I2] can be seen as:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
> Wye[Task, I, W2, W \/ I2]

          I       W2
          v       v
          |       |
          ---------
              |
 ---------------------------
| Wye[Task, I, W2, W \/ I2] |
 ---------------------------
              |
          ---------
          |       |
          v       v
          W       I2

So what does this Exchange.wye API do?

  • It plugs the original Exchange.write: Sink[Task, W] to the W output of the Wye[Task, I, W2, W \/ I2] for sending data to the server.
  • It plugs the Exchange.read: Process[Task, I] receiving data from server to the left input of the Wye[Task, I, W2, W].
  • The right intput W2 branch provides a plug for an external source of data in the shape of Process[Task, W2].
  • The right output I2 can be used to pipe data from the client to an external local process (like streaming out data received from the server).
  • Finally it returns an Exchange[I2, W2].

In a summary:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
> (ex:Exchange[I, W]).wye( w:Wye[Task, I, W2, W \/ I2] )

        ex.read
          |
          v
          I       W2
          v       v
          |       |
          ---------
              |
 -----------------------------
| w:Wye[Task, I, W2, W \/ I2] |
 -----------------------------
              |
          ---------
          |       |
          v       v
          W       I2
          |
          v
      ex.write

======> Returns Exchange[I2, W2]

As a conclusion, Exchange.wye combines the original Exchange[I, W] with your custom Wye[Task, I, W2, W \/ I2] which represents the business logic of data exchange between client & server and finally returns a Exchange[I2, W2] on which you can plug your own data source and retrieve output data.


Implement the client with wye/run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// The source of data to send to server
val data2Send: Process[Task, Bytes] = ...

// The logic of your exchange between client & server
val clientWye: Wye[Task, Bytes, Bytes, Bytes \/ Bytes])= ...
// Scary, there are so many Bytes

val clientReceived: Process[Task, Bytes] = for {
  // client connects to the server & returns Exchange
  ex   <- nio.connect(address)

  // Exchange is customized with clientWye
  // Data are injected in it with run
  output <- ex.wye(clientWye).run(data2Send)
} yield (output)

Implement simple client/server business logic?

Please note, I simply reuse the basic echo example provided in scalaz-stream ;)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def clientEcho(address: InetSocketAddress, data: Bytes): Process[Task, Bytes] = {

  // the custom Wye managing business logic
  def echoLogic: Wye[Bytes, Bytes, Bytes, Byte \/ Bytes] = {

    def go(collected: Int): WyeW[Bytes, Bytes, Bytes, Bytes] = {
      // Create a Wye that can receive on both sides
      receiveBoth {
        // Receive on left == receive from server
        case ReceiveL(rcvd) =>
          // `emitO` outputs on `I2` branch and then...
          emitO(rcvd) fby
            // if we have received everything sent, halt
            (if (collected + rcvd.size >= data.size) halt
            // else go on collecting
            else go(collected + rcvd.size))

        // Receive on right == receive on `W2` branch == your external data source
        case ReceiveR(data) =>
          // `emitW` outputs on `W` branch == sending to server
          // and loops
          emitW(data) fby go(collected)

        // When server closes
        case HaltL(rsn)     => Halt(rsn)
        // When client closes, we go on collecting echoes
        case HaltR(_)       => go(collected)
      }
    }

    // Init
    go(0)
  }

  // Finally wiring all...
  for {
    ex   <- nio.connect(address)
    rslt <- ex.wye(echoSent).run(emit(data))
  } yield {
    rslt
  }
}

This might seem hard to catch to some people because of scalaz-stream notations and wye Left/Right/Both or wye.emitO/emitW. But actually, you’ll get used to it quite quickly as soon as you understand wye. Keep in mind that this code uses low-level scalaz-stream API without anything else and it remains pretty simple and straighforward.


Run the client for its output

1
2
3
4
5
6
7
8
9
// create a client that sends 1024 random bytes
val dataArray = Array.fill[Byte](1024)(1)
scala.util.Random.nextBytes(dataArray)
val clientOutput = clientEcho(addr, Bytes.of(dataArray))

// consumes all received data... (it should contain dataArray)
val result = clientOutput.runLog.run

println("Client received:"+result)

It would give something like:

1
Client received:Vector(Bytes1: pos=0, length=1024, src: (-12,28,55,-124,3,-54,-53,66,-115,17...)

Now, you know about scalaz-stream clients, what about servers???



Scalaz-stream NIO Server

Let’s start again :D

What is a server?

  • Something listening for client(s) connection
  • When there is a client connected, the server can :
    • Receive data I (for Input) from the client
    • Send data W (for Write) to the client
  • A server can manage multiple clients in parallel

Server seen as Process

Remember that a client was defined above as:

1
Client === Process[Task, Exchange[I, W]]

In our NIO, non-blocking, streaming world, a server can be considered as a stream of clients right?

So finally, we can model a server as :

1
2
Server === Process[Task, Client[I, W]]
       === Process[Task, Process[Task, Exchange[I, W]]]

Whoooohoooo, a server is just a stream of streams!!!!


Writing a server

Scalaz-Stream now provides a helper to create a TCP binary NIO server:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// the address of the server
val address: InetSocketAddress = new InetSocketAddress("xxx.yyy.zzz.ttt", port)

// create a server
val server: Process[Task, Process[Task, Exchange[Bytes, Bytes]]] =
  nio.server(address)

server map { client =>
  // for each client
  client flatMap { ex: Exchange[Bytes, Bytes] =>
    // read data sent by client in ex.read
    ???
    // write data to the client with ex.write
    ???
  }
}

Don’t you find that quite elegant? ;)


Managing client/server interaction business logic

There we simply re-use the Exchange described above so you can use exactly the same API than the ones for client. Here is another API that can be useful:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
type Writes1[W, I, I2] = Process[I, W \/ I2]

case class Exchange[I, W](read: Process[Task, I], write: Sink[Task, W]) {
...
  /**
   * Transforms this exchange to another exchange, that for every received `I` will consult supplied Writer1
   * and eventually transforms `I` to `I2` or to `W` that is sent to remote system.
   */
  def readThrough[I2](w: Writer1[W, I, I2])(implicit S: Strategy = Strategy.DefaultStrategy) : Exchange[I2,W]
...
}

// A small schema?
            ex.read
              |
              v
              I
              |
 ---------------------------
|    Writer1[W, I, I2]      |
 ---------------------------
              |
          ---------
          |       |
          v       v
          W       I2
          |
          v
       ex.write

======> Returns Exchange[I2, W]

With this API, you can compute some business logic on the received data from client.

Let’s write the echo server corresponding to the previous client (you can find this sample in scalaz-stream too):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def serverEcho(address: InetSocketAddress): Process[Task, Process[Task, Bytes]] = {

  // This is a Writer1 that echoes everything it receives to the client and emits it locally
  def echoAll: Writer1[Bytes, Bytes, Bytes] =
    receive1[Bytes, Bytes \/ Bytes] { i =>
      // echoes on left, emits on right and then loop (fby = followed by)
      emitSeq( Seq(\/-(i), -\/(i)) ) fby echoAll
    }

  // The server that echoes everything
  val receivedData: Process[Task, Process[Task, Bytes]] =
    for {
      client <- nio.server(address)
      rcv    <- ex.readThrough(echoAll).run()
    } yield rcv
  }

  receivedData
}

receivedData is Process[Task, Process[Task, Bytes]] which is not so practical: we would prefer to gather all data received by clients in 1 single Process[Task, Bytes] to stream it to another module.

Scalaz-Stream has the solution again:

1
2
3
4
5
6
7
package object merge {
  /**
   * Merges non-deterministically processes that are output of the `source` process.
   */
  def mergeN[A](source: Process[Task, Process[Task, A]])
    (implicit S: Strategy = Strategy.DefaultStrategy): Process[Task, A]
}

Please note the Strategy which corresponds to the way Tasks will be executed and that can be compared to Scala ExecutionContext.

Fantastic, let’s plug it on our server:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// The server that echoes everything
def serverEcho(address: InetSocketAddress): Process[Task, Bytes] = {

  // This is a Writer1 that echoes everything it receives to the client and emits it locally
  def echoAll: Writer1[Bytes, Bytes, Bytes] =
    receive1[Bytes, Bytes \/ Bytes] { i =>
      // echoes on left, emits on right and then loop (fby = followed by)
      emitSeq( Seq(\/-(i), -\/(i)) ) fby echoAll
    }

  // The server that echoes everything
  val receivedData: Process[Task, Process[Task, Bytes]] =
    for {
      client <- nio.server(address)
      rcv    <- ex.readThrough(echoAll).run()
    } yield rcv
  }

  // Merges all client streams
  merge.mergeN(receivedData)
}

Finally, we have a server and a client!!!!!

Let’s plug them all together

Run a server

First of all, we need to create a server that can be stopped when required.

Let’s do in the scalaz-stream way using:

  • wye.interrupt :
1
2
3
4
5
6
/**
   * Let through the right branch as long as the left branch is `false`,
   * listening asynchronously for the left branch to become `true`.
   * This halts as soon as the right branch halts.
   */
  def interrupt[I]: Wye[Boolean, I, I]
  • async.signal which is a value that can be changed asynchronously based on 2 APIs:
1
2
3
4
5
6
7
8
9
10
/**
   * Sets the value of this `Signal`. 
   */
  def set(a: A): Task[Unit]

  /**
   * Returns the discrete version of this signal, updated only when `value`
   * is changed ...
   */
  def discrete: Process[Task, A]

Without lots of imagination, we can use a Signal[Boolean].discrete to obtain a Process[Task, Boolean] and wye it with previous server process using wye.interrupt. Then, to stop server, you just have to call:

1
signal.set(true)

Here is the full code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// local bind address
val addr = localAddress(12345)

// The stop signal initialized to false
val stop = async.signal[Boolean]
stop.set(false).run

// Create the server controlled by the previous signal
val stoppableServer = (stop.discrete wye echoServer(addr))(wye.interrupt)

// Run server in async without taking care of output data
stopServer.runLog.runAsync( _ => ())

// DO OTHER THINGS

// stop server
stop.set(true)

Run server & client in the same code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// local bind address
val addr = localAddress(12345)

// the stop signal initialized to false
val stop = async.signal[Boolean]
stop.set(false).run

// Create the server controlled by the previous signal
val stoppableServer = (stop.discrete wye serverEcho(addr))(wye.interrupt)

// Run server in async without taking care of output data
stoppableServer.runLog.runAsync( _ => ())

// create a client that sends 1024 random bytes
val dataArray = Array.fill[Byte](1024)(1)
scala.util.Random.nextBytes(dataArray)
val clientOutput = clientEcho(addr, Bytes.of(dataArray))

// Consume all received data in a blocking way...
val result = clientOutput.runLog.run

// stop server
stop.set(true)

Naturally you rarely run the client & server in the same code but this is funny to see our easily you can do that with scalaz-stream as you just manipulate Process run on provided Strategy


Finally, we can go back to our subject: feeding a DStream using a scalaz-stream NIO client/server

Pipe server output to DStream

clientEcho/serverEcho are simple samples but not very useful.

Now we are going to use a custom client/server I’ve written for this article:

  • NioClient.sendAndCheckSize is a client streaming all emitted data of a Process[Task, Bytes] to the server and checking that the global size has been ack’ed by server.
  • NioServer.ackSize is a server acknowledging all received packets by their size (as a 4-bytes Int)

Now let’s write a client/server dstreamizing data to Spark:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// First create a streaming context
val ssc = new StreamingContext(clusterUrl, "SparkStreamStuff", Seconds(1))

// Local bind address
val addr = localAddress(12345)

// The stop signal initialized to false
val stop = async.signal[Boolean]
stop.set(false).run

// Create the server controlled by the previous signal
val stoppableServer = (stop.discrete wye NioServer.ackSize(addr))(wye.interrupt)

// Create a client that sends a natural integer every 50ms as a string (until reaching 100)
val clientData: Process[Task, Bytes] = naturalsEvery(50 milliseconds).take(100).map(i => Bytes.of(i.toString.getBytes))
val clientOutput = NioClient.sendAndCheckSize(addr, clientData)

// Dstreamize the server into the streaming context
val (consumer, dstream) = dstreamize(stoppableServer, ssc)

// Prepare dstream output
dstream.map( bytes => new String(bytes.toArray) ).print()

// Start the streaming context
ssc.start()

// Run the server just for its effects
consumer.run.runAsync( _ => () )

// Run the client in a blocking way
clientOutput.runLog.run

// Await SSC termination a bit
ssc.awaitTermination(1000)

// stop server
stop.set(true)

// stop the streaming context
ssc.stop()

When run, it prints :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
-------------------------------------------
Time: 1395049304000 ms
-------------------------------------------

-------------------------------------------
Time: 1395049305000 ms
-------------------------------------------
0
1
2
3
4
5
6
7
8
9
...

-------------------------------------------
Time: 1395049306000 ms
-------------------------------------------
20
21
22
23
24
25
26
27
28
29
...

Until 100…



Part2’s conclusion

I spent this second part of my tryptic mainly explaining a few concepts of the new scalaz-stream brand new NIO API. With it, a client becomes just a stream of exchanges Process[Task, Exchange[I, W]] and a server becomes a stream of stream of exchanges Process[Task, Process[Task, Exchange[I, W]]].

As soon as you manipulate Process, you can then use the dstreamize API exposed in Part 1 to pipe streamed data into Spark.

Let’s go to Part 3 now in which we’re going to do some fancy Machine Learning training with these new tools.



GO TO PART1 < —————————————————————————–> GO TO PART3






Synopsis

The code & sample apps can be found on Github

Zpark-Zstream I article was a PoC trying to use Scalaz-Stream instead of DStream with Spark-Streaming. I had deliberately decided not to deal with fault-tolerance & stream graph persistence to keep simple but without it, it was quite useless for real application…

Here is a tryptic of articles trying to do something concrete with Scalaz-Stream and Spark.

So, what do I want? I wantttttttt a shrewburyyyyyy and to do the following:

  1. Plug Scalaz-Stream Process[Task, T] on Spark DStream[T] (Part 1)
  2. Build DStream using brand new Scalaz-Stream NIO API (client/server) (Part 2)
  3. Train Spark-ML recommendation-like model using NIO client/server (Part 3)
  4. Stream data from multiple NIO clients to the previous trained ML model mixing all together (Part 3)

[Part 1/3] From Scalaz-Stream Process to Spark DStream



Reminders on Process[Task, T]

Scalaz-stream Process[Task, T] is a stream of T elements that can interleave some Tasks (representing an external something doing somewhat). Process[Task, T] is built as a state machine that you need to run to process all Task effects and emit a stream of T. This can manage both continuous or discrete, finite or infinite streams.

I restricted to Task for the purpose of this article but it can be any F[_].


Reminders on DStream[T]

Spark DStream[T] is a stream of RDD[T] built by discretizing a continuous stream of T. RDD[T] is a resilient distributed dataset which is the ground data-structure behind Spark for distributing in-memory batch/map/reduce operations to a cluster of nodes with fault-tolerance & persistence.

In a summary, DStream slices a continuous stream of T by windows of time and gathers all Ts in the same window into one RDD[T]. So it discretizes the continuous stream into a stream of RDD[T]. Once built, those RDD[T]s are distributed to Spark cluster. Spark allows to perform transform/union/map/reduce/… operations on RDD[T]s. Therefore DStream[T] takes advantage if the same operations.

Spark-Streaming also persists all operations & relations between DStreams in a graph. Thus, in case of fault in a remote node while performing operations on DStreams, the whole transformation can be replayed (it also means streamed data are also persisted).

Finally, the resulting DStream obtained after map/reduce operations can be output to a file, a console, a DB etc…

Please note that DStream[T] is built with respect to a StreamingContext which manages its distribution in Spark cluster and all operations performed on it. Moreover, DStream map/reduce operations & output must be scheduled before starting the StreamingContext. It could be somewhat compared to a state machine that you build statically and run later.


From Process[Task, T] to RDD[T]

You may ask why not simply build a RDD[T] from a Process[Task, T] ?

Yes sure we can do it:

1
2
3
4
5
6
7
8
9
// Initialize Spark Context
implicit scc = new SparkContext(...)

// Build a process
val p: Process[Task, T] = ...

// Run the process using `runLog` to aggregate all results
// and build a RDD using spark context parallelization
val rdd = sc.parallelize(p.runLog.run)

This works but what if this Process[Task, T] emits huge quantity of data or is infinite? You’ll end in a OutOfMemoryException

So yes you can do it but it’s not so interesting. DStream seems more natural since it can manage stream of data as long as it can discretize it over time.



From Process[Task, T] to DStream[T]


Pull from Process[Task, T], Push to DStream[T] with LocalInputDStream

To build a DStream[T] from a Process[Task, T], the idea is to:

  • Consume/pull the T emitted by Process[Task, O],
  • Gather emitted T during a window of time & generate a RDD[T] with them,
  • Inject RDD[T] into the DStream[T],
  • Go to next window of time…

Spark-Streaming library provides different helpers to create DStream from different sources of data like files (local/HDFS), from sockets…

The helper that seemed the most appropriate is the NetworkInputDStream:

  • It provides a NetworkReceiver based on a Akka actor to which we can push streamed data.
  • NetworkReceiver gathers streamed data over windows of time and builds a BlockRDD[T] for each window.
  • Each BlockRDD[T] is registered to the global Spark BlockManager (responsible for data persistence).
  • BlockRDD[T] is injected into the DStream[T].

So basically, NetworkInputDStream builds a stream of BlockRDD[T]. It’s important to note that NetworkReceiver is also meant to be sent to remote workers so that data can be gathered on several nodes at the same time.

But in my case, the data source Process[Task, T] run on the Spark driver node (at least for now) so instead of NetworkInputDStream, a LocalInputDStream would be better. It would provide a LocalReceiver based on an actor to which we can push the data emitted by the process in an async way.

LocalInputDStream doesn’t exist in Spark-Streaming library (or I haven’t looked well) so I’ve implemented it as I needed. It does exactly the same as NetworkInputDStream without the remoting aspect. The current code is there


Process vs DStream ?

There is a common point between DStream and Process: both are built as state machines that are passive until run.

  • In the case of Process, it is run by playing all the Task effects while gathering emitted values or without taking care of them, in blocking or non-blocking mode etc…

  • In the case of DStream, it is built and registered in the context of a SparkStreamingContext. Then you must also declare some outputs for the DStream like a simple print, an HDFS file output, etc… Finally you start the SparkStreamingContext which manages everything for you until you stop it.

So if we want to adapt a Process[Task, T] to a DStream[T], we must perform 4 steps (on the Spark driver node):

  • build a DStream[T] using LocalInputDStream[T] providing a Receiver in which we’ll be able to push asynchronously T.
  • build a custom scalaz-stream Sink[Task, T, Unit] in charge of consuming all emitted data from Process[Task, T] and pushing them using previous Receiver.
  • pipe the Process[Task, T] to this Sink[Task, T, Unit] & when Process[Task, T] has halted, stop previous DStream[T]: the result of this pipe operation is a Process[Task, Unit] which is a pure effectful process responsible for pushing T into the dstream without emitting anything.
  • return previous DStream[T] and effectful consumer Process[Task, Unit].

dstreamize implementation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def dstreamize[T : ClassTag](
  p: Process[Task, T],
  ssc: StreamingContext,
  storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): (Process[Task, Unit], ZparkInputDStream[T]) = {

  // Build a custom LocalInputDStream
  val dstream = new ZparkInputDStream[T](ssc, storageLevel)

  // Build a Sink pushing into dstream receiver
  val sink = receiver2Sink[T](dstream.receiver.asInstanceOf[ZparkReceiver[T]])

  // Finally pipe the process to the sink and when finished, closes the dstream
  val consumer: Process[Task, Unit] =
    (p to sink)
    // when finished, it closes the dstream
    .append ( eval(Task.delay{ dstream.stop() }) )
    // when error, it closes the dstream
    .handle { case e: Exception =>
      println("Stopping on error "+e.getMessage)
      e.printStackTrace()
      eval(Task.delay{ dstream.stop() })
    }

  // Return the effectful consumer sink and the DStream
  (consumer, dstream)
}

Please remark that this builds a Process[Task, Unit] and a DStream[T] but nothing has happened yet in terms of data consumption & streaming. Both need to be run now.

Use it…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// First create a streaming context
val ssc = new StreamingContext(clusterUrl, "SparkStreamStuff", Seconds(1))

// Create a data source sample as a process generating a natural every 50ms 
// (take 1000 elements)
val p: Process[Task, Int] = naturalsEvery(50 milliseconds).take(1000)

// Dstreamize the process in the streaming context
val (consumer, dstream) = dstreamize(p, ssc)

// Prepare the dstream operations (count) & output (print)
dstream.count().print()

// Start the streaming context
ssc.start()

// Run the consumer for its effects (consuming p and pushing into dstream)
// Note this is blocking but it could be runAsync too
consumer.run.run

// await termination of stream with a timeout
ssc.awaitTermination(1000)

// stops the streaming context
ssc.stop()

Please note that you have to:

  • schedule your dstream operations/output before starting the streaming context.
  • start the streaming context before running the consumer.

Run it…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
14/03/11 11:32:09 WARN util.Utils: Your hostname, localhost.paris.zenexity.com resolves to a loopback address: 127.0.0.1; using 10.0.24.228 instead (on interface en0)
14/03/11 11:32:09 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
-------------------------------------------
Time: 1394533933000 ms
-------------------------------------------
0

-------------------------------------------
Time: 1394533934000 ms
-------------------------------------------
14

-------------------------------------------
Time: 1394533935000 ms
-------------------------------------------
20

-------------------------------------------
Time: 1394533936000 ms
-------------------------------------------
20

...

Ok cool, we can see a warmup phase at beginning and then windows of 1 sec counting 20 elements which is great since one element every 50ms gives 20 elements in 1sec.



Part 1’s conclusion

Now we can pipe a Process[Task, T] into a DStream[T].

Please not that as we run the Process[Task, T] on the Spark driver node, if this node fails, there is no real way to restore lost data. Yet, LocalInputDStream relies on DStreamGraph & BlockRDDs which persist all DStream relations & all received blocks. Moreover, DStream has exactly the same problem with respect to driver node for now.

That was fun but what can we do with that?

In part2, I propose to have more fun and stream data to DStream using the brand new Scalaz-Stream NIO API to create cool NIO client/server streams…



——————————————————————————————————-> GO TO PART2





The code & sample apps can be found on Github

Today I’m going to write about a Proof of Concept I’ve been working on those last weeks: I wanted to use scalaz-stream as a driver of Spark distributed data processing. This is simply an idea and I don’t even know whether it is viable or stupid. But the idea is interesting!

Introduction

2 of my preferred topics those last months are :

  • Realtime streaming
  • Realtime clustered data processing (in-memory & fault-tolerant)

2 tools have kept running through my head those last months:

  • Scalaz-Stream for realtime/continuous streaming using pure functional concepts: I find it very interesting conceptually speaking & very powerful, specially the deterministic & non-deterministic demuxtiplexers provided out-of-the-box (Tee & Wye).

  • Spark for fast/fault-tolerant in-memory, resilient & clustered data processing.

I won’t speak much about Scalaz-Stream because I wrote a few articles about it.


Let’s focus on Spark.

Spark provides tooling for cluster processing of huge datasets in the same batch mode way as Hadoop, the very well known map/reduce infrastructure. But at the difference of Hadoop which is exclusively relying on HDFS cluster file systems when distributing data through the cluster, Spark tries to cache data in memory as much as possible so that latency of access is reduced as much as possible. Hadoop can scale a lot but is known to be slow in the context of a single node.

Spark is aimed at scaling as much as Hadoop but running faster on each node using in-memory caching. Fault-tolerance & data resilience is managed by Spark too using persistence & redundancy based on any nice storage like HDFS or files or whatever you can plug on Spark. So Spark is meant to be a super fast in-memory, fault-tolerant batch processing engine.


RDD Resilient Distributed Dataset

The basic concept of Spark is Resilient Distributed Dataset aka RDD which is a read-only, immutable data structure representing a collection of objects or dataset that can be distributed across a set of nodes in a cluster to perform map/reduce style algorithms.

The dataset represented by this RDD is partitioned i.e. cut into slices called partitions that can be distributed across the cluster of nodes.

Resilient means these data can be rebuilt in case of fault on a node or data loss. To perform this, the dataset is replicated/persisted across nodes in memory or in distributed file system such as HDFS.

So the idea of RDD is to provide a seamless structure to manage clustered datasets with very simple API in “monadic”-style :

1
2
3
4
5
6
7
8
val sc = new SparkContext(
  "local[4]",
  "Simple App",
  "YOUR_SPARK_HOME",
  List("target/scala-2.10/simple-project_2.10-1.0.jar")
)

val logData = sc.textFile(logFile, 2).cache().filter(line => line.contains("a")).map( _ + "foo" ).count()

Depending on your SparkContext configuration, Spark takes in charge of distributing behind the curtain your data to the cluster nodes to perform the required processing in a fully distributed way.

One thing to keep in mind is that Spark distributes data to remote nodes but it also distributes the code/closures remotely. So it means your code has to be serializable which is not the case of scalaz-stream in its current implementation.


Just a word on Spark code

As usual, before using Spark in any big project, I’ve been diving in its code to know whether I can trust this project. I must say I know Spark’s code better than its API ;)

I find Spark Scala implementation quite clean with explicit choices of design made clearly in the purpose of performance. The need to provide a compatible Java/Python API and to distribute code across clustered nodes involves a few restrictions in terms of implementation choices. Anyway, I won’t criticize much because I wouldn’t have written it better and those people clearly know what they do!


Spark Streaming

So Spark is very good to perform fast clustered batch data processing. Yet, what if your dataset is built progressively, continuously, in realtime?

On top of the core module, Spark provides an extension called Spark Streaming aiming at manipulating live streams of data using the power of Spark.

Spark Streaming can ingest different continuous data feeds like Kafka, Flume, Twitter, ZeroMQ or TCP socket and perform high-level operations on it such as map/reduce/groupby/window/…


DStream

The core data structure behind Spark Streams is DStream for Discretized Stream (and not distributed).

Discretized means it gets a continuous stream of data and makes it discrete by slicing it across time and wrapping those sliced data into the famous RDD described above.

A DStream is just a temporal data partitioner that can distribute data slices across the cluster of nodes to perform some data processing using Spark capabilities.

Here is the illustration in official Spark Stream documentation:

streaming-dstream

DStream also tries to leverage Spark automated persistence/caching/fault-tolerance to the domain of live streaming.

DStream is cool but it’s completely based on temporal aspects. Imagine you want to slice the stream depending on other criteria, with DStream, it would be quite hard because the whole API is based on time. Moreover, using DStream, you can discretize a dataflow but you can’t go in the other way and make it continuous again (in my knowledge). This is something that would be cool, isn’t it?

If you want to know more about DStream discretization mechanism, have a look at the official doc.


As usual, I’m trying to investigate the edge-cases of concepts I like. In general, this is where I can test the core design of the project and determine whether it’s worth investigating in my every-day life.


Driving Spark Streams with Scalaz-Stream

I’ve been thinking about scalaz-stream concepts quite a lot and scalaz-stream is very good at manipulating continuous streams of data. Moreover, it can very easily partition a continuous stream regrouping data into chunks based on any criteria you can imagine.

Scalaz-stream represents a data processing algorithm as a static state machine that you can run when you want. This is the same idea behind map/reduce Spark API: you build your chain of map/filter/window and finally reduce it. Reducing a spark data processing is like running a scalaz-stream machine.

So my idea was the following:

  • build a continuous stream of data based on scalaz-stream Process[F, O]
  • discretize the stream Process[F, O] => Process[F, RDD[O]]
  • implement count/reduce/reduceBy/groupBy for Process[F, RDD[O]]
  • provide a continuize method to do Process[F, RDD[O]] => Process[F, O]

So I’ve been hacking between Scalaz-stream Process[F, O] & Spark RDD[O] and here is the resulting API that I’ve called ZPark-ZStream (ZzzzzzPark-Zzzzztream).

Let’s play a bit with my little alpha API.


Discretization by simple slicing

Let’s start with a very simple example.

Take a simple finite process containing integers:

1
val p: Process[Task, Long] = Process(1L, 1L, 2L, 2L, 3L, 3L, 4L, 4L, 5L, 5L, 6L, 6L)

Now I want to slice this stream of integer by slices of 4 elements.

First we have to create the classic Spark Streaming context and make it implicit (needed by my API).

Please remark that I could plug existing StreamingContext on my code without any problem.

1
2
val clusterUrl = "local[4]"
implicit ssc = new StreamingContext(clusterUrl, "SparkSerial", Seconds(1))

Then let’s parallelize the previous process :

1
2
3
val prdd: Process[Task, RDD[Long]] = p.parallelize(4)
// type is just there to show what scalac will infer
// Just to remind that Task is the Future equivalent in Scalaz

Ok folks, now, we have a discretized stream of Long that can be distributed across a Spark cluster.

DStream provides count API which count elements on each RDD in the stream.

Let’s do the same with my API:

1
val pcount: Process[Task, RDD[Int]] = prdd.countRDD()

What happens here? The `count operation on each RDD in the stream is distributed across the cluster in a map/reduce-style and results are gathered.

Ok that’s cool but you still have a discretized stream Process[Task, RDD[Int]] and that’s not practical to use to see what’s inside it. So now we are going to re-continuize it and make it a Process[Task, Int] again.

1
val pfinal: Process[Task, Int] = pcount.continuize()

Easy isn’t it?

All together :

1
2
3
4
5
val p =
  Process(1L, 1L, 2L, 2L, 3L, 3L, 4L, 4L)
  .parallelize(4)
  .countRDD()
  .continuize()

Let’ print the result in the console

1
2
3
4
5
6
7
8
def stdOutLines[I]: Sink[Task, I] =
  Process.constant{ (s: I) => Task.delay { println(s" ----> [${System.nanoTime}] *** $s") }}

(p through stdOutLines).run.run
// 1 run for the process & 1 run for the Task

 ----> [1392418478569989000] *** 4
 ----> [1392418478593226000] *** 4

Oh yes that works: in each slice of 4 elements, we actually have 4 elements! Reassuring ;)

Let’s do the same with countByValue:

1
2
3
4
5
6
7
8
9
10
11
12
13
val p =
  Process(1L, 1L, 2L, 2L, 3L, 3L, 4L, 4L)
  .parallelize(4)
  .countRDDByValue()
  .continuize()

(p through stdOutLines).run.run
// 1 run for the process & 1 run for the Task

 ----> [1392418552751011000] *** (1,2)
 ----> [1392418552751176000] *** (2,2)
 ----> [1392418552770527000] *** (4,2)
 ----> [1392418552770640000] *** (3,2)

You can see that 4 comes before 3. This is due to the fact the 2nd slice of 4 elements (3,3,4,4) is converted into a RDD which is then partitioned and distributed across the cluster to perform the map/reduce count operation. So the order of return might be different at the end.

An example of map/reduce ?

1
2
3
4
5
6
7
8
9
10
val p =
  Process(1L, 1L, 2L, 2L, 3L, 3L, 4L, 4L)
  .parallelize(4)
  .mapRDD(_ + 1L)
  .reduceRDD(_ + _)
  .continuize()

(p through stdOutLines).run.run
 ----> [1392418619885745000] *** 10 (2+2+3+3)
 ----> [1392418619905817000] *** 18 (4+4+5+5)

Please note that:

1
p mapRDD f === p.map{ rdd => rdd map f }

Discretization by time slicing

Now we could try to slice according to time in the same idea as DStream

First of all, let’s define a continuous stream of positive integers:

1
2
3
4
5
6
def naturals: Process[Task, Int] = {
  def go(i: Int): Process[Task, Int] =
    Process.await(Task.delay(i)){ i => Process.emit(i) ++ go(i+1) }

  go(0)
}

Now, I want integers to be emitted at a given tick for example:

1
2
def naturalsEvery(duration: Duration): Process[Task, Int] =
  (naturals zipWith Process.awakeEvery(duration)){ (i, b) => i }

Then, let’s discretize the continuous stream with ZPark-Ztream API:

1
2
val p: Process[Task, RDD[Int]] =
  naturalsEvery(10 milliseconds).discretize(500 milliseconds)

The stream is sliced in slice of 500ms and all elements emitted during these 500ms are gathered in a Spark RDD.

On this stream of RDD, we can applycountRDD` as before and finally re-continuize it. All together we obtain:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
val p =
  naturalsEvery(10 milliseconds)
  .take(5000)  // takes only 5000 because an infinite stream is hard to log in an article
  .discretize(500 milliseconds)
  .countRDD()
  .continuize()

(p through stdOutLines).run.run

 ----> [1392395213389954000] *** 47
 ----> [1392395213705505000] *** 28
 ----> [1392395214191637000] *** 47
 ----> [1392395214688724000] *** 48
 ----> [1392395215189453000] *** 45
 ----> [1392395215697655000] *** 48
 ----> [1392395240677357000] *** 50
 ----> [1392395241175632000] *** 49
 ----> [1392395241674446000] *** 50
 ----> [1392395242175416000] *** 50
 ----> [1392395242675183000] *** 50
 ----> [1392395243177056000] *** 50
 ----> [1392395243676848000] *** 49
 ----> [1392395244175938000] *** 49
 ----> [1392395244676315000] *** 50
 ----> [1392395245175042000] *** 50
 ----> [1392395245677394000] *** 50
 ...

Approximatively we have 50 elements per slice which looks like what we expected.

Please note that there is a short period of warmup where values are less homogenous.


Discretization by time slicing keeping track of time

DStream keeps track of all created RDD slices of data (following Spark philosophy to cache as much as possible) and allows to do operation of windowing to redistribute RDD.

With ZPark API, you can write the same as following:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
val p =
  naturalsEvery(10 milliseconds)
  .take(500)
  .discretizeKeepTime(500 milliseconds)
  .windowRDD(1000 milliseconds)
  .map { case (time, rdd) =>
    (time, rdd.count())
  }

(p through stdOutLines).run.run

 ----> [1392397573066484000] *** (1392397571981061000,68)
 ----> [1392397574069315000] *** (1392397572981063000,85)
 ----> [1392397575058895000] *** (1392397573981072000,87)
 ----> [1392397576059640000] *** (1392397574981078000,89)
 ----> [1392397577069518000] *** (1392397575981086000,89)
 ----> [1392397577538941000] *** (1392397576981095000,82)

We can see here that final interval haven’t 100 elements as we could expect. This is still a mystery to me and I must investigate a bit more to know where this differences comes from. I have a few ideas but need to validate.

Anyway, globally we get 500 elements meaning we haven’t lost anything.


Mixing scalaz-stream IO & Spark streaming

Playing with naturals is funny but let’s work with a real source of data like a file.

It could be anything pluggable on scalaz-stream like kafka/flume/whatever as DStream provides…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
val p =
  io.linesR("testdata/fahrenheit.txt")
    .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
    .map(line => line.toDouble)
    .discretize(100 milliseconds)
    .mapRDD { x => (x, 1L) }
    .groupByKey()
    .mapRDD { case (k, v) => (k, v.size) }
    .continuize()

(p through stdOutLines).run.run

 ----> [1392398529009755000] *** (18.0,23)
 ----> [1392398529010064000] *** (19.0,22)
 ----> [1392398529010301000] *** (78.0,22)
 ----> [1392398529010501000] *** (55.3,22)
 ----> [1392398529010700000] *** (66.0,22)
 ----> [1392398529010892000] *** (64.0,22)
...

Infusing tee with RDD Processes

Is it possible to combine RDD Processes using scalaz-stream ?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
val p0 = naturalsEvery(100 milliseconds).take(50).discretize(250 milliseconds)
val p1 = naturalsEvery(100 milliseconds).take(50).discretize(250 milliseconds)
val p =
 (p0 zipWith p1){ (a,b) =>
   new org.apache.spark.rdd.UnionRDD(ssc.sparkContext, Seq(a,b))
 }.countRDDByValue()
  .continuize()

(p through stdOutLines).run.run

 ----> [1392412464151650000] *** (0,2)
 ----> [1392412464151819000] *** (1,2)
 ----> [1392412464230343000] *** (2,2)
 ----> [1392412464230528000] *** (3,1)
 ----> [1392412464477775000] *** (4,2)
 ----> [1392412464477921000] *** (5,2)
 ----> [1392412464478034000] *** (6,2)
 ----> [1392412464478143000] *** (3,1)
 ----> [1392412464726860000] *** (8,2)
 ----> [1392412464727039000] *** (7,2)
 ----> [1392412464975370000] *** (9,2)
 ----> [1392412464975511000] *** (10,2)
 ----> [1392412464975620000] *** (11,2)
 ----> [1392412465224087000] *** (12,2)
 ----> [1392412465224227000] *** (13,2)
 etc...

Please note that I drive Spark RDD stream with Scalaz-Stream always remains on the driver node and is never sent to a remote node as map/reduce closures are in Spark. So Scalaz-stream is used a stream driver in this case. Moreover, Scalaz Process isn’t serializable in its current implementation so it wouldn’t be possible as is.



What about persistence & fault tolerance?

After discretizing a process, you can persist each RDD :

1
p.discretize(250 milliseconds).mapRDD { _.persist() }

Ok but DStream does much more trying to keep in-memory every RDD that is generated and potentially persist it across the cluster. This makes things stateful & mutable which is not the approach of pure functional API like scalaz-stream. So, I need to think a bit more about this persistence topic which is huge.

Anyway I believe I’m currently investigating another way of manipulating distributed streams than DStream.



Conclusion

Spark is quite amazing and easy to use with respect to the complexity of the subject.

I was also surprised to be able to use it with scalaz-stream so easily.

I hope you liked the idea and I encourage you to think about it and if you find it cool, please tell it! And if you find it stupid, please tell it too: this is still a pure experiment ;)

Have a look at the code on Github.

Have distributed & resilient yet continuous fun!





The code & sample apps can be found on Github

After 5 months studying theories deeper & deeper on my free-time and preparing 3 talks for scala.io & ping-conf with my friend Julien Tournay aka @skaalf, I’m back blogging and I’ve got a few more ideas of articles to come…

If you’re interested in those talks, you can find pingconf videos here:

Let’s go back to our today’s subject : Incoming Play2.3/Scala generic validation API & more.

Julien Tournay aka @skaalf has been working a lot for a few months developing this new API and has just published an article previewing Play 2.3 generic validation API.

This new API is just the logical extension of play2/Scala Json API (that I’ve been working & promoting those 2 last years) pushing its principles far further by allowing validation on any data types.

This new API is a real step further as it will progressively propose a common API for all validations in Play2/Scala (Form/Json/XML/…). It proposes an even more robust design relying on very strong theoretical ground making it very reliable & typesafe.

Julien has written his article presenting the new API basics and he also found time to write great documentation for this new validation API. I must confess Json API doc was quite messy but I’ve never found freetime (and courage) to do better. So I’m not going to spend time on basic features of this new API and I’m going to target advanced features to open your minds about the power of this new API.

Let’s have fun with this new APi & Shapeless, this fantastic tool for higher-rank polymorphism & type-safety!


Warm-up with Higher-kind Zipping of Rules

A really cool & new feature of Play2.3 generic validation API is its ability to compose validation Rules in chains like:

1
2
3
4
val rule1: Rule[A, B] = ...
val rule2: Rule[B, C] = ...

val rule3: Rule[A, C] = rule1 compose rule2

In Play2.1 Json API, you couldn’t do that (you could only map on Reads).

Moreover, with new validation API, as in Json API, you can use macros to create basic validators from case-classes.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
case class FooBar(foo: String, bar: Int, foo2: Long)

val rule = Rule.gen[JsValue, FooBar]

/** Action to validate Json:
  * { foo: "toto", bar: 5, foo2: 2 }
  */
def action = Action(parse.json) { request =>
  rule.validate(request.body) map { foobar =>
    Ok(foobar.toString)
  } recoverTotal { errors =>
    BadRequest(errors.toString)
  }
}

Great but sometimes not enough as you would like to add custom validations on your class. For example, you want to verify :

  • foo isn’t empty
  • bar is >5
  • foo2 is <10

For that you can’t use the macro and must write your caseclass Rule yourself.

1
2
3
4
5
6
7
8
9
10
11
12
case class FooBar(foo: String, bar: Int, foo2: Long)

import play.api.data.mapping.json.Rules
import Rules._

val rule = From[JsValue] { __ =>
  (
    (__ \ "foo").read[String](notEmpty) ~
    (__ \ "bar").read[Int](min(5)) ~
    (__ \ "foo2").read[Long](max(10))
  )(FooBar.apply _)
}

Please note the new From[JsValue]: if it were Xml, it would be From[Xml], genericity requires some more info.

Ok that’s not too hard but sometimes you would like to use first the macro and after those primary type validations, you want to refine with custom validations. Something like:

1
2
Rule.gen[JsValue, FooBar] +?+?+ ( (notEmpty:Rule[String, String]) +: (min(5):Rule[Int, Int]) +: (min(10L):Rule[Long,Long]) )
// +?+?+ is a non-existing operator meaning "compose"

As you may know, you can’t do use this +: from Scala Sequence[T] as this list of Rules is typed heterogenously and Rule[I, O] is invariant.

So we are going to use Shapeless heterogenous Hlist for that:

1
2
3
4
5
6
val customRules =
  (notEmpty:Rule[String, String]) ::
  (min(5):Rule[Int, Int]) ::
  (min(10L):Rule[Long,Long]) ::
  HNil
// customRules is inferred Rule[String, String]) :: Rule[Int, Int] :: Rule[Long,Long]


How to compose Rule[JsValue, FooBar] with Rule[String, String]) :: Rule[Int, Int] :: Rule[Long,Long] ?


We need to convert Rule[JsValue, FooBar] to something like Rule[JsValue, T <: HList].

Based on Shapeless Generic[T], we can provide a nice little new conversion API .hlisted:

1
val rule: Rule[JsValue, String :: Int :: Long :: HNil] = Rule.gen[JsValue, FooBar].hlisted

Generic[T] is able to convert any caseclass from Scala from/to Shapeless HList (& CoProduct).

So we can validate a case class with the macro and get a Rule[JsValue, T <: HList] from it.



How to compose Rule[JsValue, String :: Int :: Long :: HNil] with Rule[String, String]) :: Rule[Int, Int] :: Rule[Long,Long]?


Again, using Shapeless Polymorphic and HList RightFolder, we can implement a function :

1
2
Rule[String, String]) :: Rule[Int, Int] :: Rule[Long,Long] :: HNil) =>
    Rule[String :: Int :: Long :: HNil, String :: Int :: Long :: HNil]

This looks like some higher-kind zip function, let’s call it HZIP.



Now, we can compose them…


1
2
3
4
5
6
val ruleZip = Rule.gen[JsValue, FooBar].hlisted compose hzip(
  (notEmpty:Rule[String, String]) ::
  (min(5):Rule[Int, Int]) ::
  (min(10L):Rule[Long,Long]) ::
  HNil
)

Finally, let’s wire all together in a Play action:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def hzipper = Action(parse.json) { request =>
  ruleZip.validate(request.body) map { foobar =>
    Ok(foobar.toString)
  } recoverTotal { errors =>
    BadRequest(errors.toString)
  }
}

// OK case
{
  "foo" : "toto",
  "bar" : 5,
  "foo2" : 5
} => toto :: 5 :: 8 :: HNil

// KO case
{
  "foo" : "",
  "bar" : 2,
  "foo2" : 12
} => Failure(List(
  ([0],List(ValidationError(error.max,WrappedArray(10)))),
  ([1],List(ValidationError(error.min,WrappedArray(5)))),
  ([2],List(ValidationError(error.required,WrappedArray())))
))

As you can see, the problem in this approach is that we lose the path of Json. Anyway, this can give you a few ideas! Now let’s do something really useful…



Higher-kind Fold of Rules to break the 22 limits

As in Play2.1 Json API, the new validation API provides an applicative builder which allows the following:

1
2
3
(Rule[I, A] ~ Rule[I, B] ~ Rule[I, C]).tupled => Rule[I, (A, B, C)]

(Rule[I, A] ~ Rule[I, B] ~ Rule[I, C])(MyClass.apply _) => Rule[I, MyClass]

But, in Play2.1 Json API and also in new validation API, all functional combinators are limited by the famous Scala 22 limits.

In Scala, you CAN’T write :

  • a case-class with >22 fields
  • a Tuple23

So you can’t do Rule[JsValue, A] ~ Rule[JsValue, B] ~ ... more than 22 times.

Nevertheless, sometimes you receive huge JSON with much more than 22 fields in it. Then you have to build more complex models like case-classes embedding case-classes… Shameful, isn’t it…

Let’s be shameless with Shapeless HList which enables to have unlimited heterogenously typed lists!

So, with HList, we can write :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
val bigRule =
  (__  \ "foo1").read[String] ::
  (__  \ "foo2").read[String] ::
  (__  \ "foo3").read[String] ::
  (__  \ "foo4").read[String] ::
  (__  \ "foo5").read[String] ::
  (__  \ "foo6").read[String] ::
  (__  \ "foo7").read[String] ::
  (__  \ "foo8").read[String] ::
  (__  \ "foo9").read[String] ::
  (__  \ "foo10").read[Int] ::
  (__  \ "foo11").read[Int] ::
  (__  \ "foo12").read[Int] ::
  (__  \ "foo13").read[Int] ::
  (__  \ "foo14").read[Int] ::
  (__  \ "foo15").read[Int] ::
  (__  \ "foo16").read[Int] ::
  (__  \ "foo17").read[Int] ::
  (__  \ "foo18").read[Int] ::
  (__  \ "foo19").read[Int] ::
  (__  \ "foo20").read[Boolean] ::
  (__  \ "foo21").read[Boolean] ::
  (__  \ "foo22").read[Boolean] ::
  (__  \ "foo23").read[Boolean] ::
  (__  \ "foo25").read[Boolean] ::
  (__  \ "foo26").read[Boolean] ::
  (__  \ "foo27").read[Boolean] ::
  (__  \ "foo28").read[Boolean] ::
  (__  \ "foo29").read[Boolean] ::
  (__  \ "foo30").read[Float] ::
  (__  \ "foo31").read[Float] ::
  (__  \ "foo32").read[Float] ::
  (__  \ "foo33").read[Float] ::
  (__  \ "foo34").read[Float] ::
  (__  \ "foo35").read[Float] ::
  (__  \ "foo36").read[Float] ::
  (__  \ "foo37").read[Float] ::
  (__  \ "foo38").read[Float] ::
  (__  \ "foo39").read[Float] ::
  (__  \ "foo40").read[List[Long]] ::
  (__  \ "foo41").read[List[Long]] ::
  (__  \ "foo42").read[List[Long]] ::
  (__  \ "foo43").read[List[Long]] ::
  (__  \ "foo44").read[List[Long]] ::
  (__  \ "foo45").read[List[Long]] ::
  (__  \ "foo46").read[List[Long]] ::
  (__  \ "foo47").read[List[Long]] ::
  (__  \ "foo48").read[List[Long]] ::
  (__  \ "foo49").read[List[Long]] ::
  (__  \ "foo50").read[JsNull.type] ::
  HNil

// inferred as Rule[JsValue, String] :: Rule[JsValue, String] :: ... :: Rule[JsValue, List[Long]] :: HNil

That’s cool but we want the :: operator to have the same applicative builder behavior as the~/and` operator:

1
2
Rule[JsValue, String] :: Rule[JsValue, Long] :: Rule[JsValue, Float] :: HNil =>
  Rule[JsValue, String :: Long :: Float :: HNil]

This looks like a higher-kind fold so let’s call that HFOLD.

We can build this hfold using Shapeless polymorphic functions & RighFolder.

In a next article, I may write about coding such shapeless feature. Meanwhile, you’ll have to discover the code on Github as it’s a bit hairy but very interesting ;)

Gathering everything, we obtain the following:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/* Rules Folding */
val ruleFold = From[JsValue]{ __ =>
  hfold[JsValue](
    (__  \ "foo1").read[String] ::
    (__  \ "foo2").read[String] ::
    (__  \ "foo3").read[String] ::
    (__  \ "foo4").read[String] ::
    (__  \ "foo5").read[String] ::
    (__  \ "foo6").read[String] ::
    (__  \ "foo7").read[String] ::
    (__  \ "foo8").read[String] ::
    (__  \ "foo9").read[String] ::
    (__  \ "foo10").read[Int] ::
    (__  \ "foo11").read[Int] ::
    (__  \ "foo12").read[Int] ::
    (__  \ "foo13").read[Int] ::
    (__  \ "foo14").read[Int] ::
    (__  \ "foo15").read[Int] ::
    (__  \ "foo16").read[Int] ::
    (__  \ "foo17").read[Int] ::
    (__  \ "foo18").read[Int] ::
    (__  \ "foo19").read[Int] ::
    (__  \ "foo20").read[Boolean] ::
    (__  \ "foo21").read[Boolean] ::
    (__  \ "foo22").read[Boolean] ::
    (__  \ "foo23").read[Boolean] ::
    (__  \ "foo25").read[Boolean] ::
    (__  \ "foo26").read[Boolean] ::
    (__  \ "foo27").read[Boolean] ::
    (__  \ "foo28").read[Boolean] ::
    (__  \ "foo29").read[Boolean] ::
    (__  \ "foo30").read[Float] ::
    (__  \ "foo31").read[Float] ::
    (__  \ "foo32").read[Float] ::
    (__  \ "foo33").read[Float] ::
    (__  \ "foo34").read[Float] ::
    (__  \ "foo35").read[Float] ::
    (__  \ "foo36").read[Float] ::
    (__  \ "foo37").read[Float] ::
    (__  \ "foo38").read[Float] ::
    (__  \ "foo39").read[Float] ::
    (__  \ "foo40").read[List[Long]] ::
    (__  \ "foo41").read[List[Long]] ::
    (__  \ "foo42").read[List[Long]] ::
    (__  \ "foo43").read[List[Long]] ::
    (__  \ "foo44").read[List[Long]] ::
    (__  \ "foo45").read[List[Long]] ::
    (__  \ "foo46").read[List[Long]] ::
    (__  \ "foo47").read[List[Long]] ::
    (__  \ "foo48").read[List[Long]] ::
    (__  \ "foo49").read[List[Long]] ::
    (__  \ "foo50").read[JsNull.type] ::
    HNil
  )
}

Let’s write a play action using this rule:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def hfolder = Action(parse.json) { request =>
  ruleFold.validate(request.body) map { hl =>
    Ok(hl.toString)
  } recoverTotal { errors =>
    BadRequest(errors.toString)
  }
}

// OK
{
  "foo1" : "toto1",
  "foo2" : "toto2",
  "foo3" : "toto3",
  "foo4" : "toto4",
  "foo5" : "toto5",
  "foo6" : "toto6",
  "foo7" : "toto7",
  "foo8" : "toto8",
  "foo9" : "toto9",
  "foo10" : 10,
  "foo11" : 11,
  "foo12" : 12,
  "foo13" : 13,
  "foo14" : 14,
  "foo15" : 15,
  "foo16" : 16,
  "foo17" : 17,
  "foo18" : 18,
  "foo19" : 19,
  "foo20" : true,
  "foo21" : false,
  "foo22" : true,
  "foo23" : false,
  "foo24" : true,
  "foo25" : false,
  "foo26" : true,
  "foo27" : false,
  "foo28" : true,
  "foo29" : false,
  "foo30" : 3.0,
  "foo31" : 3.1,
  "foo32" : 3.2,
  "foo33" : 3.3,
  "foo34" : 3.4,
  "foo35" : 3.5,
  "foo36" : 3.6,
  "foo37" : 3.7,
  "foo38" : 3.8,
  "foo39" : 3.9,
  "foo40" : [1,2,3],
  "foo41" : [11,21,31],
  "foo42" : [12,22,32],
  "foo43" : [13,23,33],
  "foo44" : [14,24,34],
  "foo45" : [15,25,35],
  "foo46" : [16,26,36],
  "foo47" : [17,27,37],
  "foo48" : [18,28,38],
  "foo49" : [19,29,39],
  "foo50" : null
} => toto1 :: toto2 :: toto3 :: toto4 :: toto5 :: toto6 :: toto7 :: toto8 :: toto9 ::
  10 :: 11 :: 12 :: 13 :: 14 :: 15 :: 16 :: 17 :: 18 :: 19 ::
  true :: false :: true :: false :: false :: true :: false :: true :: false ::
  3.0 :: 3.1 :: 3.2 :: 3.3 :: 3.4 :: 3.5 :: 3.6 :: 3.7 :: 3.8 :: 3.9 ::
  List(1, 2, 3) :: List(11, 21, 31) :: List(12, 22, 32) :: List(13, 23, 33) ::
  List(14, 24, 34) :: List(15, 25, 35) :: List(16, 26, 36) :: List(17, 27, 37) ::
  List(18, 28, 38) :: List(19, 29, 39) :: null ::
  HNil


// KO
{
  "foo1" : "toto1",
  "foo2" : "toto2",
  "foo3" : "toto3",
  "foo4" : "toto4",
  "foo5" : "toto5",
  "foo6" : "toto6",
  "foo7" : 50,
  "foo8" : "toto8",
  "foo9" : "toto9",
  "foo10" : 10,
  "foo11" : 11,
  "foo12" : 12,
  "foo13" : 13,
  "foo14" : 14,
  "foo15" : true,
  "foo16" : 16,
  "foo17" : 17,
  "foo18" : 18,
  "foo19" : 19,
  "foo20" : true,
  "foo21" : false,
  "foo22" : true,
  "foo23" : false,
  "foo24" : true,
  "foo25" : false,
  "foo26" : true,
  "foo27" : "chboing",
  "foo28" : true,
  "foo29" : false,
  "foo30" : 3.0,
  "foo31" : 3.1,
  "foo32" : 3.2,
  "foo33" : 3.3,
  "foo34" : 3.4,
  "foo35" : 3.5,
  "foo36" : 3.6,
  "foo37" : 3.7,
  "foo38" : 3.8,
  "foo39" : 3.9,
  "foo40" : [1,2,3],
  "foo41" : [11,21,31],
  "foo42" : [12,22,32],
  "foo43" : [13,23,33],
  "foo44" : [14,24,34],
  "foo45" : [15,25,35],
  "foo46" : [16,26,"blabla"],
  "foo47" : [17,27,37],
  "foo48" : [18,28,38],
  "foo49" : [19,29,39],
  "foo50" : "toto"
} => Failure(List(
  (/foo50,List(ValidationError(error.invalid,WrappedArray(null)))),
  (/foo46[2],List(ValidationError(error.number,WrappedArray(Long)))),
  (/foo27,List(ValidationError(error.invalid,WrappedArray(Boolean)))),
  (/foo15,List(ValidationError(error.number,WrappedArray(Int)))),
  (/foo7,List(ValidationError(error.invalid,WrappedArray(String))
))))

Awesome… now, nobody can say 22 limits is still a problem ;)

Have a look at the code on Github.

Have fun x 50!





The code & sample apps can be found on Github here


Actor-Room makes it easy to:
  • create any group of connected entities (people or not) (chatroom, forum, broadcast pivot…).
  • manage connections, disconnections, broadcast, targetted message through actor and nothing else.
For now, members can be:
  • websocket endpoints through actors without taking care of Iteratees/Enumerators…
  • Bots to simulate members

Reminders on websockets in Play

Here is the function Play provides to create a websocket:

1
2
3
def async[A](
  f: RequestHeader => Future[(Iteratee[A, _], Enumerator[A])]
)(implicit frameFormatter: FrameFormatter[A]): WebSocket[A]

A websocket is a persistent bi-directional channel of communication (in/out) and is created with:

  • an Iteratee[A, _] to manage all frames received by the websocket endpoint
  • an Enumerator[A] to send messages through the websocket
  • an implicit FrameFormatter[A] to parse frame content to type A (Play provides default FrameFormatter for String or JsValue)

Here is how you traditionally create a websocket endpoint in Play:

1
2
3
4
5
6
7
8
9
10
object MyController extends Controller {
    def connect = Websocket.async[JsValue]{ rh =>
        // the iteratee to manage received messages
        val iteratee = Iteratee.foreach[JsValue]( js => ...)

        // the enumerator to be able to send messages
        val enumerator = // generally a PushEnumerator
        (iteratee, enumerator)
    }
}

Generally, the Enumerator[A] is created using Concurrent.broadcast[A] and Concurrent.unicast[A] which are very powerful tools but not so easy to understand exactly (the edge-cases of connection close, errors are always tricky).

You often want to:

  • manage multiple client connections at the same time
  • parse messages received from websockets,
  • do something with the message payload
  • send messages to a given client
  • broadcast messages to all connected members
  • create bots to be able to simulate fake connected members
  • etc…

To do that in Play non-blocking/async architecture, you often end developing an Actor topology managing all events/messages on top of the previous Iteratee/Enumerator.

The Iteratee/Enumerator is quite generic but always not so easy to write.

The actor topology is quite generic because there are administration messages that are almost always the same:

  • Connection/Forbidden/Disconnection
  • Broadcast/Send

Actor Room is a helper managing all of this for you. So you can just focus on message management using actors and nothing else. It provides all default behaviors and all behaviors can be overriden if needed. It exposes only actors and nothing else.


The code is based on the chatroom sample (and a cool sample by Julien Tournay) from Play Framework pushed far further and in a more generic way.



What is Actor Room?

An actor room manages a group of connected members which are supervised by a supervisor

Member = 2 actors (receiver/sender)

Each member is represented by 2 actors (1 receiver & 1 sender):

  • You MUST create at least a Receiver Actor because it’s your job to manage your own message format

  • The Sender Actor has a default implementation but you can override it.


Supervisor = 1 actor

All actors are managed by 1 supervisor which have two roles:

  • Creates/supervises all receiver/sender actors

  • Manages administration messages (routing, forwarding, broadcasting etc…)



Code sample step by step

Create the Actor Room

1
2
3
4
5
6
// default constructor
  val room = Room()

  // constructor with custom supervisor
  // custom supervisor are described later
  val room = Room(Props(classOf[CustomSupervisor]))

The room creates the Supervisor actor for you and delegates the creation of receiver/sender actors to it.

If you want to broadcast a message or target a precise member, you should use the supervisor.

1
2
room.supervisor ! Broadcast("fromId", Json.obj("foo" -> "bar"))
  room.supervisor ! Send("fromId", "toId", Json.obj("foo" -> "bar"))

You can manage several rooms in the same project.


Create the mandatory Receiver Actor

There is only one message to manage:

1
2
3
4
5
/** Message received and parsed to type A
  * @param from the ID of the sender
  * @param payload the content of the message
  */
case class Received[A](from: String, payload: A) extends Message

If your websocket frames contain Json, then it should be Received[JsValue].

You just have to create a simple actor:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Create an actor to receive messages from websocket
class Receiver extends Actor {
  def receive = {
    // Received(fromId, js) is the only Message to manage in receiver
    case Received(from, js: JsValue) =>
      (js \ "msg").asOpt[String] match {
        case None =>
          play.Logger.error("couldn't msg in websocket event")

        case Some(s) =>
          play.Logger.info(s"received $s")
          // broadcast message to all connected members
          context.parent ! Broadcast(from, Json.obj("msg" -> s))
      }
  }
}

Please note the Receiver Actor is supervised by the Supervisor actor. So, within the Receiver Actor, context.parent is the Supervisor and you can use it to send/broadcast message as following:

1
2
3
4
5
6
7
8
9
context.parent ! Send(fromId, toId, mymessage)
context.parent ! Broadcast(fromId, mymessage)

// The 2 messages
/** Sends a message from a member to another member */
case class   Send[A](from: String, to: String, payload: A) extends Message

/** Broadcasts a message from a member */
case class   Broadcast[A](from: String, payload: A) extends Message

Create your Json websocket endpoint

Please note that each member is identified by a string that you define yourself.

import org.mandubian.actorroom._

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class Receiver extends Actor {
  def receive = {
    ...
  }
}

object Application extends Controller {
  val room = Room()

  /** websocket requires :
    * - the type of the Receiver actor
    * - the type of the payload
    */
  def connect(id: String) = room.websocket[Receiver, JsValue](id)

  // or
  def connect(id: String) = room.websocket[JsValue](id, Props[Receiver])

}

All together

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import akka.actor._

import play.api._
import play.api.mvc._
import play.api.libs.json._

// Implicits
import play.api.Play.current
import play.api.libs.concurrent.Execution.Implicits._

import org.mandubian.actorroom._

class Receiver extends Actor {
  def receive = {
    case Received(from, js: JsValue) =>
      (js \ "msg").asOpt[String] match {
        case None => play.Logger.error("couldn't msg in websocket event")
        case Some(s) =>
          play.Logger.info(s"received $s")
          context.parent ! Broadcast(from, Json.obj("msg" -> s))
      }
  }
}

object Application extends Controller {

  val room = Room()

  def websocket(id: String) = room.websocket[Receiver, JsValue](id)

}


Extend default behaviors

Override the administration message format

AdminMsgFormatter typeclass is used by ActorRoom to format administration messages (Connected, Disconnected and Error) by default.

AdminMsgFormatter[JsValue] and AdminMsgFormatter[String] are provided by default.

You can override the format as following:

1
2
3
4
5
6
7
8
9
// put this implicit in the same scope where you create your websocket endpoint
implicit val msgFormatter = new AdminMsgFormatter[JsValue]{
    def connected(id: String) = Json.obj("kind" -> "connected", "id" -> id)
    def disconnected(id: String) = Json.obj("kind" -> "disconnected", "id" -> id)
    def error(id: String, msg: String) = Json.obj("kind" -> "error", "id" -> id, "msg" -> msg)
}

// then this msgFormatter will be used for all administration messages  
def websocket(id: String) = room.websocket[Receiver, JsValue](id)

Override the Sender Actor

You just have to create a new actor as following:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class MyCustomSender extends Actor {

  def receive = {
    case s: Send[JsValue]        => // message send from a member to another one

    case b: Broadcast[JsValue]   => // message broadcast by a member

    case Connected(id)           => // member "id" has connected

    case Disconnected(id)        => // member "id" has disconnected

    case Init(id, receiverActor) => // Message sent when sender actor is initialized by ActorRoom

  }

}

Then you must initialize your websocket with it

1
def connect(id: String) = room.websocket[JsValue](_ => id, Props[Receiver], Props[MyCustomSender])

You can override the following messages:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// public sender messages
/** Sender actor is initialized by Supervisor */
case class   Init(id: String, receiverActor: ActorRef)

/** Sends a message from a member to another member */
case class   Send[A](from: String, to: String, payload: A) extends Message

/** Broadcasts a message from a member */
case class   Broadcast[A](from: String, payload: A) extends Message

/** member with ID has connected */
case class   Connected(id: String) extends Message

/** member with ID has disconnected */
case class   Disconnected(id: String) extends Message

Override the Supervisor Actor

Please note Supervisor is an actor which manages a internal state containing all members:

1
var members = Map.empty[String, Member]

You can override the default Supervisor as following:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class CustomSupervisor extends Supervisor {

    def customBroadcast: Receive = {
      case Broadcast(from, js: JsObject) =>
        // adds members to all messages
        val ids = Json.obj("members" -> members.map(_._1))

        members.foreach {
          case (id, member) =>
            member.sender ! Broadcast(from, js ++ ids)

          case _ => ()
        }
    }

    override def receive = customBroadcast orElse super.receive
  }

Create a bot to simulate member

A bot is a fake member that you can use to communicate with other members. It’s identified by an ID as any member.

You create a bot with these API:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
case class Member(id: String, val receiver: ActorRef, val sender: ActorRef) extends Message

def bot[Payload](id: String)
    (implicit msgFormatter: AdminMsgFormatter[Payload]): Future[Member]

def bot[Payload](
    id: String,
    senderProps: Props
  )(implicit msgFormatter: AdminMsgFormatter[Payload]): Future[Member]


def bot[Payload](
    id: String,
    receiverProps: Props,
    senderProps: Props): Future[Member]

Then with returned Member, you can simulate messages:

1
2
3
4
5
6
val room = Room()

val bot = room.bot[JsValue]("robot")

// simulate a received message
bot.receiver ! Received(bod.id, Json.obj("foo" -> "bar"))

Naturally, you can override the Bot Sender Actor

1
2
3
4
5
6
7
8
9
10
11
12
/** The default actor sender for Bots */
class BotSender extends Actor {

  def receive = {
    case s =>
      play.Logger.info(s"Bot should have sent ${s}")

  }

}

val bot = room.bot[JsValue]("robot", Props[BotSender])

So what else??? Everything you can override and everything that I didn’t implement yet…

On github project, you will find 2 samples:

  • simplest which is a very simple working sample.
  • websocket-chat which is just the Play Framework ChatRoom sample rewritten with ActorRoom.

Have fun!





The code for all autosources & sample apps can be found on Github here

The aim of this article is to show how scalaz-stream could be plugged on existing Play Iteratee/Enumerator and used in your web projects. I also wanted to evaluate in depth the power of scalaz-stream Processes by trying to write a recursive streaming action: I mean a web endpoint streaming data and re-injecting its own streamed data in itself.


If you want to see now how scalaz-stream is used with Play, go to this paragraph directly.


Why Scalaz-Stream when you have Play Iteratees?


Play Iteratees are powerful & cool but…

I’m a fan of everything dealing with data streaming and realtime management in backends. I’ve worked a lot on Play Framework and naturally I’ve been using the cornerstone behind Play’s reactive nature: Play Iteratees.

Iteratees (with its counterparts, Enumerators and Enumeratees) are great to manipulate/transform linear streams of data chunks in a very reactive (non-blocking & asynchronous) and purely functional way:

  • Enumerators identifies the data producer that can generate finite/infinite/procedural data streams.
  • Iteratee is simply a data folder built as a state machine based on 3 states (Continue, Done, Error) which consumes data from Enumerator to compute a final result.
  • Enumeratee is a kind of transducer able to adapt an Enumerator producing some type of data to an Iteratee that expects other type of data. Enumeratee can be used as both a pipe transformer and adapter.

Iteratee is really powerful but I must say I’ve always found them quite picky to use, practically speaking. In Play, they are used in their best use-case and they were created for that exactly. I’ve been using Iteratees for more than one year now but I still don’t feel fluent with them. Each time I use them, I must spend some time to know how I could write what I need. It’s not because they are purely functional (piping an Enumerator into an Enumeratee into an Iteratee is quite trivial) but there is something that my brain doesn’t want to catch.

If you want more details about my experience with Iteratees, go to this paragraph

That’s why I wanted to work with other functional streaming tools to see if they suffer the same kind of usability toughness or can bring something more natural to me. There are lots of other competitors on the field such as pipes, conduits and machines. As I don’t have physical time to study all of them in depth, I’ve chosen the one that appealed me the most i.e. Machines.

I’m not yet a Haskell coder even if I can mumble it so I preferred to evaluate the concept with scalaz-stream, a Scala implementation trying to bring machines to normal coders focusing on the aspect of IO streaming.



Scratching the concepts of Machine / Process ?

I’m not going to judge if Machines are better or not than Iteratees, this is not my aim. I’m just experimenting the concept in an objective way.

I won’t explain the concept of Machines in depth because it’s huge and I don’t think I have the theoretical background to do it right now. So, let’s focus on very basic ideas at first:

  • Machine is a very generic concept that represents a data processing mechanism with potential multiple inputs, an output and monadic effects (typically Future input chunks, side-effects while transforming, delayed output…)
  • To simplify, let say a machine is a bit like a mechano that you construct by plugging together other more generic machines (such as source, transducer, sink, tee, wye) as simply as pipes.
  • Building a machine also means planning all the steps you will go through when managing streamed data but it doesn’t do anything until you run it (no side-effect, no resource consumption). You can re-run a machine as many times as you want.
  • A machine is a state machine (Emit/Await/Halt) as Iteratee but it manages error in a more explicit way IMHO (fallback/error)

In scalaz-stream, you don’t manipulate machines which are too abstract for real-life use-cases but you manipulate simpler concepts:

  • Process[M, O] is a restricted machine outputting a stream of O. It can be a source if the monadic effect gets input from I/O or generates procedural data, or a sink if you don’t care about the output. Please note that it doesn’t infer the type of potential input at all.
  • Wye[L, R, O] is a machine that takes 2 inputs (left L / right R) and outputs chunks of type O (you can read from left or right or wait for both before ouputting)
  • Tee[L, R, O] is a Wye that can only read alternatively from left or from right but not from both at the same time.
  • Process1[I, O] can be seen as a transducer which accepts inputs of type I and outputs chunks of type O (a bit like Enumeratee)
  • Channel[M, I, O] is an effectul channel that accepts input of type I and use it in a monadic effect M to produce potential O

What I find attractive in Machines?

  • Machines is producer/consumer/transducer in the same place and Machines can consume/fold as Iteratee, transform as Enumeratee and emit as Enumerator at the same time and it opens lots of possibilities (even if 3 concepts in one could make it more complicated too).
  • I feel like playing with legos as you plug machines on machines and this is quite funny actually.
  • Machines manages monadic effects in its design and doesn’t infer the type of effect so you can use it with I/O, Future and whatever you can imagine that is monadic…
  • Machines provide out-of-the-box Tee/Wye to compose streams, interleave, zip them as you want without writing crazy code.
  • The early code samples I’ve seen were quite easy to read (even the implementation is not so complex). Have a look at the StartHere sample provided by scalaz-stream:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
property("simple file I/O") = secure {

    val converter: Task[Unit] =
      io.linesR("testdata/fahrenheit.txt")
        .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
        .map(line => fahrenheitToCelsius(line.toDouble).toString)
        .intersperse("\n")
        .pipe(process1.utf8Encode)
        .to(io.fileChunkW("testdata/celsius.txt"))
        .run

    converter.run
    true
  }

But don’t think everything is so simple, machines is a complex concept with lots of theory behind it which is quite abstract. what I find very interesting is that it’s possible to vulgarize this very abstract concept with simpler concepts such as Process, Source, Sink, Tee, Wye… that you can catch quite easily as these are concepts you already manipulated when you were playing in your bathtub when you were child (or even now).



Scalaz-stream Plug’n’Play Iteratee/Enumerator

After these considerations, I wanted to experiment scalaz-stream with Play streaming capabilities in order to see how it behaves in a context I know.

Here is what I decided to study:

  • Stream data out of a controller action using a scalaz-stream Process
  • Call an AsyncWebService & consume the response as a stream of Array[Byte] using a scalaz-stream Process

Here is existing Play API :

  • Action provides Ok.stream(Enumerator)
  • WS call consuming response as a stream of data WS.get(r: ResponseHeader => Iteratee)

As you can see, these API depends on Iteratee/Enumerator. As I didn’t want to hack Play too much as a beginning, I decided to try & plug scalaz-stream on Play Iteratee (if possible).

Building Enumerator[O] from Process[Task, O]

The idea is to take a scalaz-stream Source[O] (Process[M,O]) and wrap it into an Enumerator[O] so that it can be used in Play controller actions.

An Enumerator is a data producer which can generate those data using monadic Future effects (Play Iteratee is tightly linked to Future).

Process[Task, O] is a machine outputting a stream of O so it’s logically the right candidate to be adapted with a Enumerator[O]. Let’s remind’ Task is just a scalaz Future[Either[Throwable,A]] with a few helpers and it’s used in scalaz-stream.

So I’ve implemented (at least tried) an Enumerator[O] that accepts a Process[Task, O]:

1
2
3
4
5
6
def enumerator[O](p: Process[Task, O])(implicit ctx: ExecutionContext) =
    new Enumerator[O] {
      ...
      // look the code in github project
      ...
  }

The implementation just synchronizes the states of the Iteratee[O, A] consuming the Enumerator with the states of Process[Task, O] emitting data chunks of O. It’s quite simple actually.



Building Process1[I, O] from Iteratee[I, O]

The idea is to drive an Iteratee from a scalaz-stream Process so that it can consume an Enumerator and be used in Play WS.

An Iteratee[I, O] accepts inputs of type I (and nothing else) and will fold the input stream into a single result of type O.

A Process1[I, O] accepts inputs of type I and emits chunks of type O but not necessarily one single output chunk. So it’s a good candidate for our use-case but we need to choose which emitted chunk will be the result of the Iteratee[I, O]. here, totally arbitrarily, I’ve chosen to take the first emit as the result (but the last would be as good if not better).

So I implemented the following:

1
2
3
4
5
def iterateeFirstEmit[I, O](p: Process.Process1[I, O])(implicit ctx: ExecutionContext): Iteratee[I, O] = {
  ...
  // look the code in github project
  ...
}

The implementation is really raw for experimentation as it goes through the states of the Process1[I,O] and generates the corresponding states of Iteratee[I,O] until first emitted value. Nothing more nothing less…



A few basic action samples

Everything done in those samples could be done with Iteratee/Enumeratee more or less simply. The subject is not there!


Sample 1 : Generates a stream from a Simple Emitter Process

1
2
3
4
5
def sample1 = Action {
  val process = Process.emitAll(Seq(1, 2, 3, 4)).map(_.toString)

  Ok.stream(enumerator(process))
}
1
2
> curl "localhost:10000/sample1" --no-buffer
1234

Sample 2 : Generates a stream from a continuous emitter

1
2
3
4
5
6
7
/** A process generating an infinite stream of natural numbers */
val numerals = Process.unfold(0){ s => val x = s+1; Some(x, x) }.repeat

// we limit the number of outputs but you don't have it can stream forever...
def sample2 = Action {
  Ok.stream(enumerator(numerals.map(_.toString).intersperse(",").take(40)))
}
1
2
> curl "localhost:10000/sample2" --no-buffer
1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,

Sample 3 : Generates a stream whose output frequency is controlled by a tee with numeral generator on left and ticker on right

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/** ticks constant every delay milliseconds */
def ticker(constant: Int, delay: Long): Process[Task, Int] = Process.await(
  scalaFuture2scalazTask(delayedNumber(constant, delay))
)(Process.emit).repeat

def sample3 = Action {
  Ok.stream(enumerator(
    // creates a Tee outputting only numerals but consuming ticker // to have the delayed effect
    (numerals tee ticker(0, 100))(processes.zipWith((a,b) => a))
      .take(100)
      .map(_.toString)
      .intersperse(",")
  ))
}

Please note :

  • scalaFuture2scalazTask is just a helper to convert a Future into Task
  • tickeris quite simple to understand: it awaits Task[Int] and emits thisInt and repeats it again…
  • processes.zipWith((a,b) => a) is a tee (2 inputs left/right) that outputs only left data but consumes right also to have the delay effect.
  • .map(_.toString) simply converts into something writeable by Ok.stream
  • .intersperse(",") which simply add `”,” between each element
1
2
3
4
5
6
> curl "localhost:10000/sample3" --no-buffer
1... // to simulate the progressive apparition of numbers on screen
1,...
1,2...
...
1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100

Sample 4 : Generates a stream using side-effect to control output frequency

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/** Async generates this Int after delay*/
def delayedNumber(i: Int, delay: Long): Future[Int] =
  play.api.libs.concurrent.Promise.timeout(i, delay)

/** Creates a process generating an infinite stream natural numbers
  * every `delay milliseconds
  */
def delayedNumerals(delay: Long) = {
  def step(i: Int): Process[Task, Int] = {
    Process.emit(i).then(
      Process.await(scalaFuture2scalazTask(delayedNumber(i+1, delay)))(step)
    )
  }
  Process.await(scalaFuture2scalazTask(delayedNumber(0, delay)))(step)
}

def sample4 = Action {
  Ok.stream(enumerator(delayedNumerals(100).take(100).map(_.toString).intersperse(",")))
}

Please note:

  • delayedNumber uses an Akka scheduler to trigger our value after timeout
  • delayedNumerals shows a simple recursive `Process[Task, Int] construction which shouldn’t be too hard to understand
1
2
3
4
5
6
7
8
> curl "localhost:10000/sample4" --no-buffer
0... // to simulate the progressive apparition of numbers every 100ms
0,...
0,1...
0,1,...
0,1,2...
...
0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99

Sample 5 : Generates a stream by consuming completely another stream

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// a process folding all Array[Byte] into a big String
val reader: Process.Process1[Array[Byte], String] = processes.fold1[Array[Byte]]((a, b) => a ++ b )
  .map{ arr => new String(arr) } |> processes.last

def sample5 = Action {
  // the WS call with response consumer by previous Process1[Array[Byte], String] driving the Iteratee[Array[Byte], String]
  val maybeValues: Future[String] =
    WS.url(routes.Application.sample2().absoluteURL())
      .get(rh => iterateeFirstEmit(reader))
      .flatMap(_.run)

  Ok.stream(enumerator(
    // wraps the received String in a Process
    // re-splits it to remove ","
    // emits all chunks
    Process.wrap(scalaFuture2scalazTask(maybeValues))
      .flatMap{ values => Process.emitAll(values.split(",")) }
  ))
}

Please note:

  • reader is a Process1[Array[Byte], String] that folds all receivedArray[Byte]into aString`
  • iterateeFirstEmit(reader) simulates an Iteratee[Array[Byte], String] driven by the reader process that will fold all chunks of data received from WS call to routes.Application.sample2()
  • .get(rh => iterateeFirstEmit(reader)) will return a Future[Iteratee[Array[Byte], String] that is run in .flatMap(_.run) to return a Future[String]
  • Process.wrap(scalaFuture2scalazTask(maybeValues)) is a trick to wrap the folded Future[String] into a Process[Task, String]
  • Process.emitAll(values.split(",")) splits the resulting string again and emits all chunks outside (stupid, just for demo)
1
2
> curl "localhost:10000/sample5" --no-buffer
1234567891011121314151617181920

Still there? Let’s dive deeper and be sharper!


Building recursive streaming action consuming itself


Hacking WS to consume & re-emit WS in realtime

WS.executeStream(r: ResponseHeader => Iteratee[Array[Byte], A]) is cool API because you can build an iteratee from the ResponseHeader and then the iteratee will consume received `Array[Byte] chunks in a reactive way and will fold them. The problem is that until the iteratee has finished, you won’t have any result.

But I’d like to be able to receive chunks of data in realtime and re-emit them immediately so that I can inject them in realtime data flow processing. WS API doesn’t allow this so I decided to hack it a bit. I’ve written WSZ which provides the API:

1
2
3
def getRealTime(): Process[Future, Array[Byte]]
// based on
private[libs] def realtimeStream: Process[Future, Array[Byte]]

This API outputs a realtime Stream of Array[Byte] whose flow is controlled by promises (Future) being redeemed in AsyncHttpClient AsyncHandler. I didn’t care about ResponseHeaders for this experimentation but it should be taken account in a more serious impl.

I obtain a Process[Future, Array[Byte]] streaming received chunks in realtime and I can then take advantage of the power of machines to manipulate the data chunks as I want.


Sample 6 : Generates a stream by forwarding/refolding another stream in realtime

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/** A Process1 splitting input strings using splitter and re-grouping chunks */
def splitFold(splitter: String): Process.Process1[String, String] = {
  // the recursive splitter / refolder
  def go(rest: String)(str: String): Process.Process1[String, String] = {
    val splitted = str.split(splitter)
    println(s"""$str - ${splitted.mkString(",")} --""")
    (splitted.length match {
      case 0 =>
        // string == splitter
        // emit rest
        // loop
        Process.emit(rest).then( Process.await1[String].flatMap(go("")) )
      case 1 =>
        // splitter not found in string 
        // so waiting for next string
        // loop by adding current str to rest
        // but if we reach end of input, then we emit (rest+str) for last element
        Process.await1[String].flatMap(go(rest + str)).orElse(Process.emit(rest+str))
      case _ =>
        // splitter found
        // emit rest + splitted.head
        // emit all splitted elements but last
        // loops with rest = splitted last element
        Process.emit(rest + splitted.head)
               .then( Process.emitAll(splitted.tail.init) )
               .then( Process.await1[String].flatMap(go(splitted.last)) )
    })
  }
  // await1 simply means "await an input string and emits it"
  Process.await1[String].flatMap(go(""))
}

def sample6 = Action { implicit request =>
  val p = WSZ.url(routes.Application.sample4().absoluteURL()).getRealTime.translate(Task2FutureNT)

  Ok.stream(enumerator(p.map(new String(_)) |> splitFold(",")))
}

Please note:

  • def splitFold(splitter: String): Process.Process1[String, String] is just a demo that coding a Process transducer isn’t so crazy… Look at comments in code
  • .translate(Task2FutureNF) converts the Process[Future, Array[Byte]] to Process[Task, Array[Byte]] using Scalaz Natural Transformation.
  • p |> splitFold(",") means “pipe output of process p to input of splitFold”.
1
2
3
4
5
6
> curl "localhost:10000/sample6" --no-buffer
0...
01...
012...
...
01234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798

Let’s finish our trip with a bit of puzzle and mystery.


THE FINAL MYSTERY: recursive stream generating Fibonacci series

As soon as my first experimentations of scalaz-stream with Play were operational, I’ve imagined an interesting case:

Is it possible to build an action generating a stream of data fed by itself: a kind of recursive stream.

With Iteratee, it’s not really possible since it can’t emit data before finishing iteration. It would certainly be possible with an Enumeratee but the API doesn’t exist and I find it much more obvious with scalaz-stream API!

The mystery isn’t in the answer to my question: YES it is possible!

The idea is simple:

  • Create a simple action
  • Create a first process emitting a few initialization data
  • Create a second process which consumes the WS calling my own action and re-emits the received chunks in realtime
  • Append first process output and second process output
  • Stream global output as a result of the action which will back-propagated along time to the action itself…

Naturally, if it consumes its own data, it will recall itself again and again and again until you reach the connections or opened file descriptors limit. As a consequence, you must limit the depth of recursion.

I performed different experiences to show this use-case by zipping the stream with itself, adding elements with themselves etc… And after a few tries, I implemented the following code quite fortuitously :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/** @param curDepth the current recursion depth
  * @param maxDepth the max recursion depth
  */
def sample7(curDepth: Int, maxDepth: Int) = Action { implicit request =>

  // initializes serie with 2 first numerals output with a delay of 100ms
  val init: Process[Task, String] = delayedNumerals(100).take(2).map(_.toString)

  // Creates output Process
  // If didn't reach maxDepth, creates a process consuming my own action
  // If reach maxDepth, just emit 0
  val outputProcess =
    if(curDepth < maxDepth) {
      // calling my own action and streaming chunks using getRealTime

      val myself = WSZ.url(
        routes.Application.sample7(curDepth+1, maxDepth).absoluteURL()
      ).getRealTime.translate(Task2FutureNT).map(new String(_))
      // splitFold isn't useful, just for demo
      |> splitFold(",")

      // THE IMPORTANT PART BEGIN
      // appends `init` output with `myself` output
      // pipe it through a helper provided scalaz-stream `processes.sum[Long]`
      // which sums elements and emits partial sums
      ((init append myself).map(_.toLong) |> processes.sum[Long])
      // THE IMPORTANT PART END
      // just for output format
      .map(_.toString).intersperse(",")
    }
    else Process.emit(0).map(_.toString)

  Ok.stream(enumerator(outputProcess))
}

Launch it:

1
2
curl "localhost:10000/sample7?curDepth=0&maxDepth=10" --no-buffer
0,1,1,2,3,5,8,13,21,34,55,89,144,233,377,610,987,1597,2584,4181,6765

WTF??? This is Fibonacci series?

Just to remind you about it:

1
2
3
e(0) = 0
e(1) = 1
e(n) = e(n-1) + e(n-2)

Here is the mystery!!!

How does it work???

I won’t tell the answer to this puzzling side-effect and let you think about it and discover why it works XD

But this sample shows exactly what I wanted: Yes, it’s possible to feed an action with its own feed! Victory!



Conclusion

Ok all of that was really funky but is it useful in real projects? I don’t really know yet but it provides a great proof of the very reactive character of scalaz-stream and Play too!

I tend to like scalaz-stream and I feel more comfortable, more natural using Process than Iteratee right now… Maybe this is just an impression so I’ll keep cautious about my conclusions for now…

All of this code is just experimental so be aware about it. If you like it and see that it could be useful, tell me so that we create a real library from it!

Have Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun, Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun, Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,Fun,!



PostScriptum

A few more details about Iteratees

Here are a few things that bother me when I use Play Iteratee (you don’t have to agree, this is very subjective):

  • Enumeratees are really powerful (maybe the most powerful part of the API) but they can be tricky: for ex, defining a new Enumeratee from scratch isn’t easy at first sight due to the signature of the Enumeratee itself, Enumeratee composes differently on left (with Enumerators) and on right (with Iteratees) and it can be strange at beginning…
  • Enumerators are not defined (when not using helpers) in terms of the data they produce but with respect to the way an Iteratee will consume the data they will produce. You must somewhat reverse your way of thinking which is not so natural.
  • Iteratees are great to produce one result by folding a stream of data but if you want to consume/cut/aggregate/re-emit the chunks, the code you write based on Iteratee/Enumeratee quickly becomes complex, hard to re-read and edge cases (error, end of stream) are hard to treat.
  • When you want to manipulate multiple streams together, zip/interleave them, you must write very complex code too.
  • End of iteration and Error management with Iteratees isn’t really clear IMHO and when you begin to compose Iteratees together, it becomes hard to know what will happen…
  • If you want to manipulate a stream with side-effecting, you can do it with Enumeratees but it’s not so obvious…




Now you should use play-autosource 2.0 correcting a few issues & introducing ActionBuilder from play2.2


The code for all autosources & sample apps can be found on Github here

Brand New Autosources

Play AutoSource now have 2 more implementations :

One month ago, I’ve demo’ed the concept of Autosource for Play2/Scala with ReactiveMongo in this article. ReactiveMongo was the perfect target for this idea because it accepts Json structures almost natively for both documents manipulation and queries.

But how does the concept behave when applied on a DB for which data are constrained by a schema and for which queries aren’t Json.


Using Datomisca-Autosource in your Play project

Add following lines to your project/Build.scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
val mandubianRepo = Seq(
  "Mandubian repository snapshots" at "https://github.com/mandubian/mandubian-mvn/raw/master/snapshots/",
  "Mandubian repository releases" at "https://github.com/mandubian/mandubian-mvn/raw/master/releases/"
)

val appDependencies = Seq()

val main = play.Project(appName, appVersion, appDependencies).settings(
  resolvers ++= mandubianRepo,
  libraryDependencies ++= Seq(
    "play-autosource"   %% "datomisca"       % "1.0",
    ...
  )
)

Create your Model + Schema

With ReactiveMongo Autosource, you could create a pure blob Autosource using JsObject without any supplementary information. But with Datomic, it’s not possible because Datomic forces to use a schema for your data.

We could create a schema and manipulate JsObject directly with Datomic and some Json validators. But I’m going to focus on the static models because this is the way people traditionally interact with a Schema-constrained DB.

Let’s create our model and schema.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// The Model (with characters pointing on Datomic named entities)
case class Person(name: String, age: Long, characters: Set[DRef])

// The Schema written with Datomisca
object Person {
  // Namespaces
  val person = new Namespace("person") {
    val characters = Namespace("person.characters")
  }

  // Attributes
  val name       = Attribute(person / "name",       SchemaType.string, Cardinality.one) .withDoc("Person's name")
  val age        = Attribute(person / "age",        SchemaType.long,   Cardinality.one) .withDoc("Person's age")
  val characters = Attribute(person / "characters", SchemaType.ref,    Cardinality.many).withDoc("Person's characterS")

  // Characters named entities
  val violent = AddIdent(person.characters / "violent")
  val weak    = AddIdent(person.characters / "weak")
  val clever  = AddIdent(person.characters / "clever")
  val dumb    = AddIdent(person.characters / "dumb")
  val stupid  = AddIdent(person.characters / "stupid")

  // Schema
  val schema = Seq(
    name, age, characters,
    violent, weak, clever, dumb, stupid
  )

Create Datomisca Autosource

Now that we have our schema, let’s write the autosource.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import datomisca._
import Datomic._

import play.autosource.datomisca._

import play.modules.datomisca._
import Implicits._

import scala.concurrent.ExecutionContext.Implicits.global
import play.api.Play.current

import models._
import Person._

object Persons extends DatomiscaAutoSourceController[Person] {
  // gets the Datomic URI from application.conf
  val uri = DatomicPlugin.uri("mem")

  // ugly DB initialization ONLY for test purpose
  Datomic.createDatabase(uri)

  // Datomic connection is required
  override implicit val conn = Datomic.connect(uri)
  // Datomic partition in which you store your entities
  override val partition = Partition.USER

  // more than ugly schema provisioning, ONLY for test purpose
  Await.result(
    Datomic.transact(Person.schema),
    Duration("10 seconds")
  )

}

Implementing Json <-> Person <-> Datomic transformers

If you compile previous code, you should have following error:

1
could not find implicit value for parameter datomicReader: datomisca.EntityReader[models.Person]

Actually, Datomisca Autosource requires 4 elements to work:

  • Json.Format[Person] to convert Person instances from/to Json (network interface)
  • EntityReader[Person] to convert Person instances from Datomic entities (Datomic interface)
  • PartialAddEntityWriter[Person] to convert Person instances to Datomic entities (Datomic interface)
  • Reads[PartialAddEntity] to convert Json to PartialAddEntity which is actually a simple map of fields/values to partially update an existing entity (one single field for ex).

It might seem more complicated than in ReactiveMongo but there is nothing different. The autosource converts Person from/to Json and then converts Person from/to Datomic structure ie PartialAddEntity. In ReactiveMongo, the only difference is that it understands Json so well that static model becomes unnecessary sometimes ;)…

Let’s define those elements in Person companion object.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
object Person {
...
  // Classic Play2 Json Reads/Writes
  implicit val personFormat = Json.format[Person]

  // Partial entity update : Json to PartialAddEntity Reads
  implicit val partialUpdate: Reads[PartialAddEntity] = (
    ((__ \ 'name).read(readAttr[String](Person.name)) orElse Reads.pure(PartialAddEntity(Map.empty))) and
    ((__ \ 'age) .read(readAttr[Long](Person.age)) orElse Reads.pure(PartialAddEntity(Map.empty)))  and
    // need to specify type because a ref/many can be a list of dref or entities so need to tell it explicitly
    (__ \ 'characters).read( readAttr[Set[DRef]](Person.characters) )
    reduce
  )

  // Entity Reads (looks like Json combinators but it's Datomisca combinators)
  implicit val entity2Person: EntityReader[Person] = (
    name      .read[String]   and
    age       .read[Long]     and
    characters.read[Set[DRef]]
  )(Person.apply _)

  // Entity Writes (looks like Json combinators but it's Datomisca combinators)
  implicit val person2Entity: PartialAddEntityWriter[Person] = (
    name      .write[String]   and
    age       .write[Long]     and
    characters.write[Set[DRef]]
  )(DatomicMapping.unlift(Person.unapply))

...
}

Now we have everything to work except a few configurations.

Add AutoSource routes at beginning conf/routes

1
->      /person                     controllers.Persons

Create conf/play.plugins to initialize Datomisca Plugin

1
400:play.modules.datomisca.DatomicPlugin

Append to conf/application.conf to initialize MongoDB connection

1
datomisca.uri.mem="datomic:mem://mem"

Insert your first 2 persons with Curl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
>curl -X POST -d '{ "name":"bob", "age":25, "characters": ["person.characters/stupid", "person.characters/violent"] }' --header "Content-Type:application/json" http://localhost:9000/persons --include

HTTP/1.1 200 OK
Content-Type: application/json; charset=utf-8
Content-Length: 21

{"id":17592186045423} -> oh a Datomic ID

>curl -X POST -d '{ "name":"john", "age":43, "characters": ["person.characters/clever", "person.characters/weak"] }' --header "Content-Type:application/json" http://localhost:9000/persons --include

HTTP/1.1 200 OK
Content-Type: application/json; charset=utf-8
Content-Length: 21

{"id":17592186045425}

Querying is the biggest difference in Datomic

In Datomic, you can’t do a getAll without providing a Datomic Query.

But what is a Datomic query? It’s inspired by Datalog which uses predicates to express the constraints on the searched entities. You can combine predicates together.

With Datomisca Autosource, you can directly send datalog queries in the query parameter q for GET or in body for POST with one restriction: your query can’t accept input parameters and must return only the entity ID. For ex:

[ :find ?e :where [ ?e :person/name "john"] ] --> OK

[ :find ?e ?name :where [ ?e :person/name ?name] ] --> KO

Let’s use it by finding all persons.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
>curl -X POST --header "Content-Type:text/plain" -d '[:find ?e :where [?e :person/name]]' 'http://localhost:9000/persons/find' --include

HTTP/1.1 200 OK
Content-Type: application/json; charset=utf-8
Content-Length: 231

[
    {
        "name": "bob",
        "age": 25,
        "characters": [
            ":person.characters/violent",
            ":person.characters/stupid"
        ],
        "id": 17592186045423
    },
    {
        "name": "john",
        "age": 43,
        "characters": [
            ":person.characters/clever",
            ":person.characters/weak"
        ],
        "id": 17592186045425
    }
]

Please note the use of POST here instead of GET because Curl doesn’t like [] in URL even using -g option

Now you can use all other routes provided by Autosource

Autosource Standard Routes

Get / Find / Stream

  • GET /persons?… -> Find by query
  • GET /persons/ID -> Find by ID
  • GET /persons/stream -> Find by query & stream result by page

Insert / Batch / Find

  • POST /persons + BODY -> Insert
  • POST /persons/find + BODY -> find by query (when query is too complex to be in a GET)
  • POST /persons/batch + BODY -> batch insert (multiple)

Update / batch

  • PUT /persons/ID + BODY -> Update by ID
  • PUT /persons/ID/partial + BODY -> Update partially by ID
  • PUT /persons/batch -> batch update (multiple)

Delete / Batch

  • DELETE /persons/ID -> delete by ID
  • DELETE /persons/batch + BODY -> batch delete (multiple)


Conclusion

Play-Autosource’s ambition was to be DB agnostic (as much as possible) and showing that the concept can be applied to schemaless DB (ReactiveMongo & CouchDB) and schema DB (Datomic) is a good sign it can work. Naturally, there are a few more elements to provide for Datomic than in ReactiveMongo but it’s useful anyway.

Thank to @TrevorReznik for his contribution of CouchBase Autosource.

I hope to see soon one for Slick and a few more ;)

Have Autofun!





EXPERIMENTAL / DRAFT


Do you remember JsPath pattern matching presented in this article ?

Let’s now go further with something that you should enjoy even more: Json Interpolation & Pattern Matching.

I’ve had the idea of these features for some time in my mind but let’s render unto Caesar what is Caesar’s : Rapture.io proved that it could be done quite easily and I must say I stole got inspired by a few implementation details from them! (specially the @inline implicit conversion for string interpolation class which is required due to a ValueClass limitation that should be removed in further Scala versions)

First of all, code samples as usual…

Create JsValue using String interpolation

1
2
3
4
5
6
7
8
9
10
11
scala> val js = json"""{ "foo" : "bar", "foo2" : 123 }"""
js: play.api.libs.json.JsValue = {"foo":"bar","foo2":123}

scala> js == Json.obj("foo" -> "bar", "foo2" -> 123)
res1: Boolean = true

scala> val js = json"""[ 1, true, "foo", 345.234]"""
js: play.api.libs.json.JsValue = [1,true,"foo",345.234]

scala> js == Json.arr(1, true, "foo", 345.234)
res2: Boolean = true

Yes, pure Json in a string…

How does it work? Using String interpolation introduced in Scala 2.10.0 and Jackson for the parsing…

In String interpolation, you can also put Scala variables directly in the interpolated string. You can do the same in Json interpolation.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
scala> val alpha = "foo"
alpha: String = foo

scala> val beta = 123L
beta: Long = 123

scala> val js = json"""{ "alpha" : "$alpha", "beta" : $beta}"""
js: play.api.libs.json.JsValue = {"alpha":"foo","beta":123}

scala> val gamma = Json.arr(1, 2, 3)
gamma: play.api.libs.json.JsArray = [1,2,3]

scala> val delta = Json.obj("key1" -> "value1", "key2" -> "value2")
delta: play.api.libs.json.JsObject = {"key1":"value1","key2":"value2"}

scala> val js = json"""
     |         {
     |           "alpha" : "$alpha",
     |           "beta" : $beta,
     |           "gamma" : $gamma,
     |           "delta" : $delta,
     |           "eta" : {
     |             "foo" : "bar",
     |             "foo2" : [ "bar21", 123, true, null ]
     |           }
     |         }
     |       """
js: play.api.libs.json.JsValue = {"alpha":"foo","beta":123,"gamma":[1,2,3],"delta":{"key1":"value1","key2":"value2"},"eta":{"foo":"bar","foo2":["bar21",123,true,null]}}

Please note that string variables must be put between "..." because without it the parser will complain.

Ok, so now it’s really trivial to write Json, isn’t it?

String interpolation just replaces the string you write in your code by some Scala code concatenating pieces of strings with variables as you would write yourself. Kind-of: s"toto ${v1} tata" => "toto + v1 + " tata" + ...

But at compile-time, it doesn’t compile your String into Json: the Json parsing is done at runtime with string interpolation. So using Json interpolation doesn’t provide you with compile-time type safety and parsing for now.

In the future, I may replace String interpolation by a real Macro which will also parse the string at compile-time. Meanwhile, if you want to rely on type-safety, go on using Json.obj / Json.arr API.


Json pattern matching

What is one of the first feature that you discover when learning Scala and that makes you say immediately: “Whoaa Cool feature”? Pattern Matching.

You can write:

1
2
3
4
5
6
7
8
9
10
11
12
13
scala> val opt = Option("toto")
opt: Option[String] = Some(toto)

scala> opt match {
  case Some(s) => s"not empty option:$s"
  case None    => "empty option"
}
res2: String = not empty option:toto

// or direct variable assignement using pattern matching

scala> val Some(s) = opt
s: String = toto

Why not doing this with Json?

And…. Here it is with Json pattern matching!!!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
scala> val js = Json.obj("foo" -> "bar", "foo2" -> 123L)
js: play.api.libs.json.JsObject = {"foo":"bar","foo2":123}

scala> js match {
  case json"""{ "foo" : $a, "foo2" : $b }""" => Some(a -> b)
  case _ => None
}
res5: Option[(play.api.libs.json.JsValue, play.api.libs.json.JsValue)] =
Some(("bar",123))

scala> val json"""{ "foo" : $a, "foo2" : $b}""" = json""" { "foo" : "bar", "foo2" : 123 }"""
a: play.api.libs.json.JsValue = "bar"
b: play.api.libs.json.JsValue = 123

scala> val json"[ $v1, 2, $v2, 4 ]" = Json.arr(1, 2, 3, 4)
v1: play.api.libs.json.JsValue = 1
v2: play.api.libs.json.JsValue = 3

Magical?

Not at all… Just unapplySeq using the tool that enables this kind of Json manipulation as trees: JsZipper

The more I use JsZippers, the more I find fields where I can use them ;)


More complex Json pattern matching

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
scala> val js = json"""{
    "key1" : "value1",
    "key2" : [
      "alpha",
      { "foo" : "bar",
        "foo2" : {
          "key21" : "value21",
          "key22" : [ "value221", 123, false ]
        }
      },
      true,
      123.45
    ]
  }"""
js: play.api.libs.json.JsValue = {"key1":"value1","key2":["alpha",{"foo":"bar","foo2":{"key21":"value21","key22":["value221",123,false]}},true,123.45]}

scala> val json"""{ "key1" : $v1, "key2" : ["alpha", $v2, true, $v3] }""" = js
v1: play.api.libs.json.JsValue = "value1"
v2: play.api.libs.json.JsValue = {"foo":"bar","foo2":{"key21":"value21","key22":["value221",123,false]}}
v3: play.api.libs.json.JsValue = 123.45

scala> js match {
    case json"""{
      "key1" : "value1",
      "key2" : ["alpha", $v1, true, $v2]
    }"""   => Some(v1, v2)
    case _ => None
  }
res9: Option[(play.api.libs.json.JsValue, play.api.libs.json.JsValue)] =
Some(({"foo":"bar","foo2":{"key21":"value21","key22":["value221",123,false]}},123.45))

// A non matching example maybe ? ;)
scala>  js match {
    case json"""{
      "key1" : "value1",
      "key2" : ["alpha", $v1, false, $v2]
    }"""   => Some(v1, v2)
    case _ => None
  }
res10: Option[(play.api.libs.json.JsValue, play.api.libs.json.JsValue)] = None

If you like that, please tell it so that I know whether it’s worth pushing it to Play Framework!


Using these features right now in a Scala/SBT project

These features are part of my experimental project JsZipper presented in this article.

To use it, add following lines to your SBT Build.scala:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
object ApplicationBuild extends Build {
  ...
  val mandubianRepo = Seq(
    "Mandubian repository snapshots" at "https://github.com/mandubian/mandubian-mvn/raw/master/snapshots/",
    "Mandubian repository releases" at "https://github.com/mandubian/mandubian-mvn/raw/master/releases/"
  )
  ...

  val main = play.Project(appName, appVersion, appDependencies).settings(
    resolvers ++= mandubianRepo,
    libraryDependencies ++= Seq(
      ...
      "play-json-zipper"  %% "play-json-zipper"    % "0.1-SNAPSHOT",
      ...
    )
  )
  ...
}

In your Scala code, import following packages

1
2
3
4
import play.api.libs.json._
import syntax._
import play.api.libs.functional.syntax._
import play.api.libs.json.extensions._

PatternMatch your fun!





Now you should use play-autosource 2.0 correcting a few issues & introducing ActionBuilder from play2.2


The module code and sample app can be found on Github here


Here we go:

0’ : Create App

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
> play2 new auto-persons
       _            _
 _ __ | | __ _ _  _| |
| '_ \| |/ _' | || |_|
|  __/|_|\____|\__ (_)
|_|            |__/

play! 2.1.1 (using Java 1.7.0_21 and Scala 2.10.0), http://www.playframework.org

The new application will be created in /Users/pvo/zenexity/workspaces/workspace_mandubian/auto-persons

What is the application name? [auto-persons]
>

Which template do you want to use for this new application?

  1             - Create a simple Scala application
  2             - Create a simple Java application

> 1
OK, application auto-persons is created.

Have fun!

10’ : edit project/Build.scala, add play-autosource:reactivemongo dependency

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
val mandubianRepo = Seq(
  "Mandubian repository snapshots" at "https://github.com/mandubian/mandubian-mvn/raw/master/snapshots/",
  "Mandubian repository releases" at "https://github.com/mandubian/mandubian-mvn/raw/master/releases/"
)

val appDependencies = Seq()

val main = play.Project(appName, appVersion, appDependencies).settings(
  resolvers ++= mandubianRepo,
  libraryDependencies ++= Seq(
    "play-autosource"   %% "reactivemongo"       % "1.0-SNAPSHOT",
    "org.specs2"        %% "specs2"              % "1.13"        % "test",
    "junit"              % "junit"               % "4.8"         % "test"
  )
)

30’ : Create new ReactiveMongo AutoSource Controller in app/Person.scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package controllers

import play.api._
import play.api.mvc._

// BORING IMPORTS
// Json
import play.api.libs.json._
import play.api.libs.functional.syntax._
// Reactive JSONCollection
import play.modules.reactivemongo.json.collection.JSONCollection
// Autosource
import play.autosource.reactivemongo._
// AutoSource is Async so imports Scala Future implicits
import scala.concurrent.ExecutionContext.Implicits.global
import play.api.Play.current

// >>> THE IMPORTANT PART <<<
object Persons extends ReactiveMongoAutoSourceController[JsObject] {
  val coll = db.collection[JSONCollection]("persons")
}

50’ : Add AutoSource routes at beginning conf/routes

1
->      /person                     controllers.Persons

60’ : Create conf/play.plugins to initialize ReactiveMongo Plugin

1
400:play.modules.reactivemongo.ReactiveMongoPlugin

70’ : Append to conf/application.conf to initialize MongoDB connection

1
mongodb.uri ="mongodb://localhost:27017/persons"

80’ : Launch application

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
> play2 run

[info] Loading project definition from /.../auto-persons/project
[info] Set current project to auto-persons (in build file:/.../auto-persons/)

[info] Updating {file:/.../auto-persons/}auto-persons...
[info] Done updating.
--- (Running the application from SBT, auto-reloading is enabled) ---

[info] play - Listening for HTTP on /0:0:0:0:0:0:0:0:9000

(Server started, use Ctrl+D to stop and go back to the console...)
[info] Compiling 5 Scala sources and 1 Java source to /.../auto-persons/target/scala-2.10/classes...
[warn] there were 2 feature warnings; re-run with -feature for details
[warn] one warning found
[success] Compiled in 6s

100’ : Insert your first 2 persons with Curl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
>curl -X POST -d '{ "name":"bob", "age":25 }' --header "Content-Type:application/json" http://localhost:9000/person --include

HTTP/1.1 200 OK
Content-Type: application/json; charset=utf-8
Content-Length: 33

{"id":"51b868ef31d4002c0bac8ba4"} -> oh a BSONObjectId

>curl -X POST -d '{ "name":"john", "age":43 }' --header "Content-Type:application/json" http://localhost:9000/person --include

HTTP/1.1 200 OK
Content-Type: application/json; charset=utf-8
Content-Length: 33

{"id":"51b868fa31d4002c0bac8ba5"}

110’ : Get all persons

1
2
3
4
5
6
7
8
9
10
>curl http://localhost:9000/person --include

HTTP/1.1 200 OK
Content-Type: application/json; charset=utf-8
Content-Length: 118

[
  {"name":"bob","age":25.0,"id":"51b868ef31d4002c0bac8ba4"},
  {"name":"john","age":43.0,"id":"51b868fa31d4002c0bac8ba5"}
]

115’ : Delete one person

1
2
3
4
5
6
7
>curl -X DELETE http://localhost:9000/person/51b868ef31d4002c0bac8ba4 --include

HTTP/1.1 200 OK
Content-Type: application/json; charset=utf-8
Content-Length: 33

{"id":"51b868ef31d4002c0bac8ba4"}

120’ : Verify person was deleted

1
2
3
4
5
6
7
>curl -X GET http://localhost:9000/person/51b868ef31d4002c0bac8ba4 --include

HTTP/1.1 404 Not Found
Content-Type: text/plain; charset=utf-8
Content-Length: 37

ID 51b868ef31d4002c0bac8ba4 not found

125’ : Update person

1
2
3
4
5
6
7
>curl -X PUT -d '{ "name":"john", "age":35 }' --header "Content-Type:application/json" http://localhost:9000/person/51b868fa31d4002c0bac8ba5 --include

HTTP/1.1 200 OK
Content-Type: application/json; charset=utf-8
Content-Length: 33

{"id":"51b868fa31d4002c0bac8ba5"}

130’ : Batch insert 2 persons (johnny & tom) with more properties

1
2
3
4
5
6
7
>curl -X POST -d '[{ "name":"johnny", "age":15, "address":{"city":"Paris", "street":"rue quincampoix"} },{ "name":"tom", "age":3, "address":{"city":"Trifouilly", "street":"rue des accidents de poucettes"} }]' --header "Content-Type:application/json" http://localhost:9000/person/batch --include

HTTP/1.1 200 OK
Content-Type: application/json; charset=utf-8
Content-Length: 8

{"nb":2}

135’ : Get all persons whose name begins by “john”

1
2
3
4
5
6
7
8
9
10
>curl -X POST -d '{"name":{"$regex":"^john"}}' --header "Content-Type:application/json" http://localhost:9000/person/find --include

HTTP/1.1 200 OK
Content-Type: application/json; charset=utf-8
Content-Length: 175

[
  {"name":"john","age":35.0,"id":"51b868fa31d4002c0bac8ba5"},
  {"id":"51b86a1931d400bc01ac8ba8","name":"johnny","age":15.0,"address":{"city":"Paris","street":"rue quincampoix"}}
]

140’ : Delete all persons

1
2
3
4
5
6
7
>curl -X DELETE -d '{}' --header "Content-Type:application/json" http://localhost:9000/person/batch --include

HTTP/1.1 200 OK
Content-Type: text/plain; charset=utf-8
Content-Length: 7

deleted

145’ : Take 5’ rest


150’ : Done



So what was demonstrated here?

With Play-Autosource, in a few lines, you obtain :

  • A backed abstract datasource (here implemented for ReactiveMongo but it could be implemented for other DBs)
  • All CRUD operations are exposed as pure REST services
  • The datasource is typesafe (here JsObject but we’ll show later that we can use any type)

It can be useful to kickstart any application in which you’re going to work iteratively on our data models in direct interaction with front-end.

It could also be useful to Frontend developers who need to bootstrap frontend code with Play Framework application backend. With Autosource, they don’t have to care about modelizing strictly a datasource on server-side and can dig into their client-side code quite quickly.



Adding constraints & validation

Now you tell me: “Hey that’s stupid, you store directly JsObject but my data are structured and must be validated before inserting them”

Yes you’re right so let’s add some type constraints on our data:

1
2
3
4
5
6
7
8
9
10
object Persons extends ReactiveMongoAutoSourceController[JsObject] {
  val coll = db.collection[JSONCollection]("persons")

  // we validate the received Json as JsObject because the autosource type is JsObject
  // and we add classic validations on types
  override val reader = __.read[JsObject] keepAnd (
    (__ \ "name").read[String] and
    (__ \ "age").read[Int](Reads.min(0) keepAnd Reads.max(117))
  ).tupled
}

Try it now:

1
2
3
4
5
6
7
curl -X POST -d '{ "nameXXX":"bob", "age":25 }' --header "Content-Type:application/json" http://localhost:9000/person --include

HTTP/1.1 400 Bad Request
Content-Type: application/json; charset=utf-8
Content-Length: 62

{"obj.name":[{"msg":"validate.error.missing-path","args":[]}]}

You can add progressively constraints on your data in a few lines. With AutoSource, you don’t need to determine immediately the exact shape of your models and you can work with JsObject directly as long as you need. Sometimes, you’ll even discover that you don’t even need a structured model and JsObject will be enough. (but I also advise to design a bit things before implementing ;))

Keep in mind that our sample is based on an implementation for ReactiveMongo so using Json is natural. For other DB, other data structure might be more idiomatic…



Use typesafe models

Now you tell me: “Funny but but but JsObject is evil because it’s not strict enough. I’m a OO developer (maybe abused by ORM gurus when I was young) and my models are case-classes…”

Yes you’re right, sometimes, you need more business logic or you want to separate concerns very strictly and your model will be shaped as case-classes.

So let’s replace our nice little JsObject by a more serious case class.

1
2
3
4
5
6
7
8
9
10
11
// the model
case class Person(name: String, age: Int)
object Person{
  // the famous Json Macro which generates at compile-time a Reads[Person] in a one-liner
  implicit val fmt = Json.format[Person]
}

// The autosource... shorter than before
object Persons extends ReactiveMongoAutoSourceController[Person] {
  val coll = db.collection[JSONCollection]("persons")
}

Please note that I removed the validations I had introduced before because there are not useful anymore: using Json macros, I created an implicit Format[Person] which is used implicitly by AutoSource.

So, now you can see why I consider AutoSource as a typesafe datasource.



Let’s be front-sexy with AngularJS

You all know that AngularJS is the new kid on the block and that you must use it if you want to be sexy nowadays.

I’m already sexy so I must be able to use it without understanding anything to it and that’s exactly what I’ve done: in 30mn without knowing anything about Angular (but a few concepts), I wrote a dumb CRUD front page plugged on my wonderful AutoSource.


Client DS in app/assets/javascripts/persons.js

This is the most important part of this sample: we need to call our CRUD autosource endpoints from angularJS.

We are going to use Angular resources for it even if it’s not really the best feature of AngularJS. Anyway, in a few lines, it works pretty well in my raw case.

(thanks to Paul Dijou for reviewing this code because I repeat I don’t know angularJS at all and I wrote this in 20mn without trying to understand anything :D)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
var app =
  // injects ngResource
  angular.module("app", ["ngResource"])
  // creates the Person factory backed by our autosource
  // Please remark the url person/:id which will use transparently our CRUD AutoSource endpoints
  .factory('Person', ["$resource", function($resource){
    return $resource('person/:id', { "id" : "@id" });
  }])
  // creates a controller
  .controller("PersonCtrl", ["$scope", "Person", function($scope, Person) {

    $scope.createForm = {};

    // retrieves all persons
    $scope.persons = Person.query();

    // creates a person using createForm and refreshes list
    $scope.create = function() {
      var person = new Person({name: $scope.createForm.name, age: $scope.createForm.age});
      person.$save(function(){
        $scope.createForm = {};
        $scope.persons = Person.query();
      })
    }

    // removes a person and refreshes list
    $scope.remove = function(person) {
      person.$remove(function() {
        $scope.persons = Person.query();
      })
    }

    // updates a person and refreshes list
    $scope.update = function(person) {
      person.$save(function() {
        $scope.persons = Person.query();
      })
    }
}]);

CRUD UI in index.scala.html

Now let’s create our CRUD UI page using angular directives. We need to be able to:

  • list persons
  • update/delete each person
  • create new persons
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@(message: String)

@main("Welcome to Play 2.1") {

    <div ng-controller="PersonCtrl">
      <!-- create form -->
      <label for="name">name:</label><input ng-model="createForm.name"/>
      <label for="age">age:</label><input ng-model="createForm.age" type="number"/>
      <button ng-click="create()">Create new person</button>
      <hr/>
      <!-- List of persons with update/delete buttons -->
      <table>
      <thead><th>name</th><th>age</th><td>actions</td></thead>
      <tbody ng-repeat="person in persons">
        <tr>
          <td><input ng-model="person.name"/></td>
          <td><input type="number" ng-model="person.age"/></td>
          <td><button ng-click="update(person)">Update</button><button ng-click="remove(person)">Delete</button></td>
        </tr>
      </tbody>
      </div>
    </div>

}

Import Angular in main.scala.html

We need to import angularjs in our application and create angular application using ng-app

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@(title: String)(content: Html)

<!DOCTYPE html>

<!-- please note the directive ng-app to initialize angular app-->
<html ng-app="app">
    <head>
        <title>@title</title>
        <link rel="stylesheet" media="screen" href="@routes.Assets.at("stylesheets/main.css")">
        <link rel="shortcut icon" type="image/png" href="@routes.Assets.at("images/favicon.png")">
        <script src="@routes.Assets.at("javascripts/jquery-1.9.0.min.js")" type="text/javascript"></script>
        <script src="https://ajax.googleapis.com/ajax/libs/angularjs/1.1.5/angular.min.js"></script>
        <script src="https://ajax.googleapis.com/ajax/libs/angularjs/1.1.5/angular-resource.min.js"></script>

        <script src="@routes.Assets.at("javascripts/person.js")" type="text/javascript"></script>
    </head>
    <body>
        @content
    </body>
</html>

What else??? Oh yes Security…

I know what you think: “Uhuh, the poor guy who exposes his DB directly on the network and who is able to delete everything without any security”

Once again, you’re right. (yes I know I love flattery)

Autosource is by default not secured in any way and actually I don’t really care about security because this is your job to secure your exposed APIs and there are so many ways to secure services that I prefer to let you choose the one you want.

Anyway, I’m a nice boy and I’m going to show you how you could secure the DELETE endpoint using the authentication action composition sample given in Play Framework documentation.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// FAKE USER class to simulate a user extracted from DB.
case class User(name: String)
object User {
  def find(name: String) = Some(User(name))
}

object Persons extends ReactiveMongoAutoSourceController[Person] {
  // The action composite directly copied for PlayFramework doc
  def Authenticated(action: User => EssentialAction): EssentialAction = {
    // Let's define a helper function to retrieve a User
    def getUser(request: RequestHeader): Option[User] = {
      request.session.get("user").flatMap(u => User.find(u))
    }

    // Now let's define the new Action
    EssentialAction { request =>
      getUser(request).map(u => action(u)(request)).getOrElse {
        Done(Unauthorized)
      }
    }
  }

  val coll = db.collection[JSONCollection]("persons")

  // >>> IMPORTANT PART <<<
  // We simply override the delete action
  // If authenticated, we call the original action
  override def delete(id: BSONObjectID) = Authenticated { _ =>
    super.delete(id)
  }

  def index = Action {
    Ok(views.html.index("ok"))
  }

  // the login action which log any user
  def login(name: String) = Action {
    Ok("logged in").withSession("user" -> name)
  }

  // the logout action which log out any user
  def logout = Action {
    Ok("logged out").withNewSession
  }
}

Nothing to complicated here. If you need to add headers in your responses and params to querystring, it’s easy to wrap autosource actions. Please refer to Play Framework doc for more info…

I won’t try it here, the article is already too long but it should work…



Play-Autosource is DB agnostic

Play-Autosource Core is independent of the DB and provides Reactive (Async/Nonblocking) APIs to fulfill PlayFramework requirements.

Naturally this 1st implementation uses ReactiveMongo which is one of the best sample of DB reactive driver. MongoDB fits very well in this concept too because document records are really compliant to JSON datasources.

But other implementations for other DB can be done and I count on you people to contribute them.

DB implementation contributions are welcome (Play-Autosource is just Apache2 licensed) and AutoSource API are subject to evolutions if they appear to be erroneous.



Conclusion

Play-Autosource provides a very fast & lightweight way to create a REST CRUD typesafe datasource in your Play/Scala application. You can begin with blob data such as JsObject and then elaborate the model of your data progressively by adding constraints or types to it.

There would be many more things to say about Play/Autosource:

  • you can also override writers to change output format
  • you have some alpha streaming API also
  • etc…

There are also lots of features to improve/add because it’s still a very draft module.

If you like it and have ideas, don’t hesitate to discuss, to contribute, to improve etc…

curl -X POST -d "{ "coding" : "Have fun"} http://localhost:9000/developer

PS: Thanks to James Roper for his article about advanced routing in Play Framework which I copied shamefully XD