设为首页 加入收藏

TOP

FunDA(8)- Static Source:保证资源使用安全 - Resource Safety(一)
2017-10-09 14:30:08 】 浏览:4961
Tags:FunDA Static Source 保证 资源 使用 安全 Resource Safety

   我们在前面用了许多章节来讨论如何把数据从后台数据库中搬到内存,然后进行逐行操作运算。我们选定的解决方案是把后台数据转换成内存中的数据流。无论在打开数据库表或从数据库读取数据等环节都涉及到对数据库表这项资源的安全使用:我们最起码要保证在完成使用或者使用中途出现错误异常退出时能释放占用的资源。谈到资源使用安全,不得不想到函数式编程通用的那个bracket函数,fs2同样提供了这个函数:

def bracket[F[_],R,A](r: F[R])(use: R => Stream[F,A], release: R => F[Unit]): Stream[F,A] = Stream.mk { StreamCore.acquire(r, release andThen (Free.eva l)) flatMap { case (_, r) => use(r).get } }

这个函数的入参数r,use,release都涉及到了资源占用处理:r一般是打开文件或者库表操作,use是资源使用如读取数据过程,release 顾名思义就是正常完成资源使用后的资源释放清理过程。函数bracket能保证这些过程的正确引用。

我们用几个例子来分析一下这个函数的功能:

val s = Stream.bracket(Task.delay(throw new Exception("Oh no!")))( _ => Stream(1,2,3) ++ Stream.fail(new Exception("boom!")) ++ Stream(3,4), _ => Task.delay(println("normal end"))) s.runLog.unsafeRun //> java.lang.Exception: Oh no! //| at demo.ws.streams$$anonfun$main$1$$anonfun$1.apply(demo.ws.streams.scal //| a:4) //| at demo.ws.streams$$anonfun$main$1$$anonfun$1.apply(demo.ws.streams.scal //| a:4)

在上面这个例子里我们人为在两个地方制造了异常。我们可以用onError来截获这些异常: 

val s1 = s.map(_.toString).onError {e => Stream.emit(e.getMessage)} s1.runLog.unsafeRun //> res0: Vector[String] = Vector(Oh no!)

必须用toString转换了Stream元素类型后才能把截获的异常信息放进Stream。注意release未调用,因为资源还没有被占用。但是如果除了释放资源外还有其它清理工作的话,我们可以用onFinalize来确保一定可以调用清理程序:

val s5 = s1.onFinalize(Task.delay{println("finally end!")}) s5.runLog.unsafeRun //> finally end! //| res1: Vector[String] = Vector(Oh no!)

如果在使用资源中间出现异常会怎样?

val s3 = Stream.bracket(Task.delay())( _ => Stream(1,2,3) ++ Stream.fail(new Exception("boom!")) ++ Stream(3,4), _ => Task.delay(println("normal end"))) val s4 = s3.map(_.toString).onError {e => Stream.emit(e.getMessage)} .onFinalize(Task.delay{println("finally end!")}) s4.runLog.unsafeRun //> normal end //| finally end! //| res2: Vector[String] = Vector(1, 2, 3, boom!)

返回结果res2正确记录了出错地点,而且所有清理过程都得到运行。当然,我们可以不用动Stream元素类型,用attempt:

val s6 = s3.attempt.onError {e => Stream.emit(e.getMessage)} .onFinalize(Task.delay{println("finally end!")}) s6.runLog.unsafeRun //> normal end //| finally end! //| res3: Vector[Object] = Vector(Right(1), Right(2), Right(3), Left(java.lang.Exception: boom!))

我们在前面FunDA(1)里讨论过运算slick Query Action run返回结果类型是Future[Iterable[ROW]]。Slick获取数据的方式是一次性读入内存,所以本期标题提到的Static-Source就是指这样的一个内存中的集合。那么我们就可以不必考虑开启并占用数据库表这项操作了。我们只需要用FunDA DataRowType.getTypedRow函数获取了Iterable[ROW]结果后直接传给bracket就行了。现在最重要的是如何把Seq[ROW]转换成Stream[F[_],ROW]。我们可以用Seq的fold函数来构建Stream: 

val data = Seq(1,2,3,4)                           //> data : Seq[Int] = List(1, 2, 3, 4)
val s8 = data.foldLeft(Stream[Task,Int]())((s,a) => s ++ Stream.emit(a)) def log[A](prompt: String): Pipe[Task,A,A] = _.eva lMap {row => Task.delay{ println(s"$prompt> $row"); row }} //> log: [A](prompt: String)fs2.Pipe[fs2.Task,A,A]
 s8.through(log("")).run.unsafeRun                 //> > 1 //| > 2 //| > 3 //| > 4

表面上看好像没什么问题,但仔细分析:Seq[ROW]可以是个超大的集合,而foldLeft是个递归函数,无论是否尾递归都有可能造成堆栈溢出错误(StackOverflowError)。看来还是用freemonad,它可以把每步运算都存放在内存结构里,可以在固定的堆栈空间运算。下面的函数用fs2.Pull类型结构可以把Seq[ROW]转换成Stream[F[_],ROW]:

 def pullSeq[ROW](h: Seq[ROW]): Pull[Task, ROW, Unit] = { val it = h.iterator def
首页 上一页 1 2 3 下一页 尾页 1/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇FunDA(7)- Reactive Streams t.. 下一篇FunDA(9)- Stream Source:rea..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目