设为首页 加入收藏

TOP

Akka(26): Stream:异常处理-Exception handling(一)
2017-10-09 13:27:32 】 浏览:2955
Tags:Akka Stream 异常 处理 -Exception handling

   akka-stream是基于Actor模式的,所以也继承了Actor模式的“坚韧性(resilient)”特点,在任何异常情况下都有某种整体统一的异常处理策略和具体实施方式。在akka-stream的官方文件中都有详细的说明和示范例子。我们在这篇讨论里也没有什么更好的想法和范例,也只能略做一些字面翻译和分析理解的事了。下面列出了akka-stream处理异常的一些实用方法:

1、recover:这是一个函数,发出数据流最后一个元素然后根据上游发生的异常终止当前数据流

2、recoverWithRetries:也是个函数,在上游发生异常后改选用后备数据流作为上游继续运行

3、Backoff restart strategy:是RestartSource,RestartFlow,RestartSink的一个属性。为它们提供“逐步延迟重启策略”

4、Supervision strategy:是数据流构件的“异常监管策略”属性。为发生异常的功能阶段Stage提供异常情况处理方法

下面我们就用一些代码例子来示范它们的使用方法:

1、recover:Flow[T].recover函数的款式如下:

  /** * Recover allows to send last element on failure and gracefully complete the stream * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. * This stage can recover the failure signal, but not the skipped elements, which will be dropped. * * Throwing an exception inside `recover` _will_ be logged on ERROR level automatically. * * '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element * * '''Backpressures when''' downstream backpressures * * '''Completes when''' upstream completes or upstream failed with exception pf can handle * * '''Cancels when''' downstream cancels * */ def recover[T >: Out](pf: PartialFunction[Throwable, T]): Repr[T] = via(Recover(pf))

下面是一个用例:

  Source(0 to 10).map { n =>
    if (n < 5) n.toString else throw new Exception("Boooommm!") }.recover{ case e: Exception => s"truncate stream: ${e.getMessage}" }.runWith(Sink.foreach(println))

运算结果:

0
1
2
3
4 truncate stream: Boooommm!

2、recoverWithRetries:看看它的函数款式:

 /** * RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after * a failure has been recovered up to `attempts` number of times so that each time there is a failure * it is fed into the `pf` and a new Source may be materialized. Note that if you pass in 0, this won't * attempt to recover at all. * * A negative `attempts` number is interpreted as "infinite", which results in the exact same behavior as `recoverWith`. * * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. * This stage can recover the failure signal, but not the skipped elements, which will be dropped. * * Throwing an exception inside `recoverWithRetries` _will_ be logged on ERROR level automatically. * * '''Emits when''' element is available from the upstream or upstream is failed and element is available * from alternative Source * * '''Backpressures when''' downstream backpressures * * '''Completes when''' upstream completes or upstream failed with exception pf can handle * * '''Cancels when''' downstream cancels * * @param attempts Maximum number of retries or -1 to retry indefinitely * @param pf Receives the failure cause and returns the new Source to be materialized if any * @throws IllegalArgumentException if `attempts` is a negative number other than -1 * */ def recoverWithRetries[T >: Out](attempts: Int, pf: PartialFunction[Throwable, Graph[SourceShape[T], NotUsed]]): Repr[T] = via(new RecoverWith(attempts, pf))

attempts代表发生异常过程中尝试恢复次数,0代表不尝试恢复,直接异常中断。<0代表无限尝试次数。下面是一个用例示范: 

 val backupSource = Source(List("five","six","seven","eight","nine")) Source(0 to 10).map { n =>
    if (n < 5) n.toString else throw new RuntimeException("
首页 上一页 1 2 3 4 5 下一页 尾页 1/5/5
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇scala基本语法和单词统计 下一篇Akka(27): Stream:Use case-C..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目