1 import fs2._ 2 object FDAPipe { 3 def log[ROW](prompt: String): Pipe[Task,ROW,ROW] =
4 _.eva lMap {row => Task.delay {println(s"$prompt> $row"); row}} 5 Stream.range(1,5).through(log("")).run.unsafeRun 6 def stopOn3[ROW]: Pipe[Task,ROW,ROW] = in => { 7 def go: Handle[Task,ROW] => Pull[Task,ROW,Unit] = h => { 8 h.receive1Option { 9 case Some((r,h)) => if ( 3 == r) Pull.done 10 else Pull.output1(r) >> go(h) 11 case None => Pull.done 12 } 13 } 14 in.pull(go) 15 } 16 Stream(4,2,9,3,8,1) 17 .through(log("before")) 18 .through(stopOn3) 19 .through(log("after")) 20 .run 21 .unsafeRun 22 //数据处理管道
23 type FDAPipeLine[ROW] = Stream[Task,ROW] 24 //数据作业节点
25 type FDAWorkNode[ROW] = Pipe[Task,ROW,ROW] 26 //数据管道开关阀门,从此处获得管道内数据
27 type FDAValve[ROW] = Handle[Task,ROW] 28 //管道连接器
29 type FDAPipeJoint[ROW] = Pull[Task,ROW,Unit] 30
31 //库提供:停止数据流动
32 def fda_haltFlow = Pull.done 33 //库提供:向下游发送一个ROW
34 def fda_sendRow[ROW](row: ROW) = Pull.output1(row) 35 //库提供:处理当前数据。运行用户提供的功能wf
36 def fda_doWork[ROW](wf: ROW => FDAPipeJoint[ROW]): FDAWorkNode[ROW] = { 37 def go: FDAValve[ROW] => FDAPipeJoint[ROW] = h => { 38 h.receive1Option { 39 case Some((r,h)) => wf(r) >> go(h) 40 case None => fda_haltFlow 41 } 42 } 43 in => in.pull(go) 44 } 45 //用户提供数据处理功能函数
46 def breakOn3[ROW]: ROW => FDAPipeJoint[ROW] = row => { 47 if (3 == row ) fda_haltFlow 48 else fda_sendRow(row) 49 } 50 //测试运算
51 Stream(4,2,9,3,8,1) 52 .through(log("before")) 53 .through(fda_doWork(breakOn3)) 54 .through(log("after")) 55 .run 56 .unsafeRun 57 }