名: ${qmr.state}") println(s"县名:${qmr.county}") println(s"年份:${qmr.year}") println(s"取值:${qmr.value}") println("-------------") })) */ val fs2Stream: Stream[Task,RowType] = Stream.eva l(async.boundedQueue[Task,Option[RowType]](16)) .flatMap { q => Task(source.to(new FS2Gate[RowType](q)).run).unsafeRunAsyncFuture //enqueue Task(new thread)
pipe.unNoneTerminate(q.dequeue) //dequeue in current thread
} fs2Stream.map{row => toTypedRow(row)} .map(qmr => { println(s"州名: ${qmr.state}") println(s"县名:${qmr.county}") println(s"年份:${qmr.year}") println(s"取值:${qmr.value}") println("-------------") }).run.unsafeRun scala.io.StdIn.readLine() actorSys.terminate() }
|