1 import scala.concurrent._ 2 import scala.concurrent.duration._ 3 import scalaz.stream.async.mutable._ 4 import scala.concurrent.ExecutionContext.Implicits.global
5 val sharedData: Topic[BigStringResult] = async.topic() 6 //> sharedData : scalaz.stream.async.mutable.Topic[demo.ws.blogStream.BigStringResult] = scalaz.stream.async.package$$anon$1@797badd3
7 val subscriber = sharedData.subscribe.runLog //> subscriber : scalaz.concurrent.Task[Vector[demo.ws.blogStream.BigStringResult]] = scalaz.concurrent.Task@226a82c4
8 val otherThread = future { 9 subscriber.run // Added this here - now subscriber is really attached to the topic
10 } //> otherThread : scala.concurrent.Future[Vector[demo.ws.blogStream.BigStringResult]] = List() 11 // Need to give subscriber some time to start up. 12 // I doubt you'd do this in actual code. 13
14 // topics seem more useful for hooking up things like 15 // sensors that produce a continual stream of data, 16
17 // and where individual values can be dropped on 18 // floor.
19 Thread.sleep(100) 20
21 sharedData.publishOne(longGet(1)).run // don't just call publishOne; need to run the resulting task
22 sharedData.close.run // Don't just call close; need to run the resulting task 23
24 // Need to wait for the output
25 val result = Await.result(otherThread, Duration.Inf) 26 //> result : Vector[demo.ws.blogStream.BigStringResult] = Vector(Some large data sets from job#1)