设为首页 加入收藏

TOP

FunDA(3)- 流动数据行操作:FDAPipeLine operations using scalaz-stream-fs2(二)
2017-10-09 14:30:15 】 浏览:1871
Tags:FunDA 流动 数据 操作 FDAPipeLine operations using scalaz-stream-fs2
现功能。下面是这篇讨论中的示范代码:

 1 import fs2._  2 object FDAPipe {  3   def log[ROW](prompt: String): Pipe[Task,ROW,ROW] =
 4     _.eva lMap {row => Task.delay {println(s"$prompt> $row"); row}}  5   Stream.range(1,5).through(log("")).run.unsafeRun  6   def stopOn3[ROW]: Pipe[Task,ROW,ROW] = in => {  7     def go: Handle[Task,ROW] => Pull[Task,ROW,Unit] = h => {  8  h.receive1Option {  9         case Some((r,h)) => if ( 3 == r) Pull.done 10                             else Pull.output1(r) >> go(h) 11         case None => Pull.done 12  } 13  } 14     in.pull(go) 15  } 16   Stream(4,2,9,3,8,1) 17    .through(log("before")) 18  .through(stopOn3) 19    .through(log("after")) 20  .run 21  .unsafeRun 22   //数据处理管道
23   type FDAPipeLine[ROW] = Stream[Task,ROW] 24   //数据作业节点
25   type FDAWorkNode[ROW] = Pipe[Task,ROW,ROW] 26   //数据管道开关阀门,从此处获得管道内数据
27   type FDAValve[ROW] = Handle[Task,ROW] 28   //管道连接器
29   type FDAPipeJoint[ROW] = Pull[Task,ROW,Unit] 30   
31   //库提供:停止数据流动
32   def fda_haltFlow = Pull.done 33   //库提供:向下游发送一个ROW
34   def fda_sendRow[ROW](row: ROW) = Pull.output1(row) 35   //库提供:处理当前数据。运行用户提供的功能wf
36   def fda_doWork[ROW](wf: ROW => FDAPipeJoint[ROW]): FDAWorkNode[ROW] = { 37     def go: FDAValve[ROW] => FDAPipeJoint[ROW] = h => { 38  h.receive1Option { 39         case Some((r,h)) => wf(r) >> go(h) 40         case None => fda_haltFlow 41  } 42  } 43     in => in.pull(go) 44  } 45   //用户提供数据处理功能函数
46   def breakOn3[ROW]: ROW => FDAPipeJoint[ROW] = row => { 47      if (3 == row ) fda_haltFlow 48      else fda_sendRow(row) 49  } 50   //测试运算
51   Stream(4,2,9,3,8,1) 52    .through(log("before")) 53  .through(fda_doWork(breakOn3)) 54    .through(log("after")) 55  .run 56  .unsafeRun 57 }

 

 

 

 

 

 

首页 上一页 1 2 下一页 尾页 2/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇FunDA(4)- 数据流内容控制:St.. 下一篇FunDA(5)- Reactive Streams:..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目