设为首页 加入收藏

TOP

[转] Scala 中的异步事件处理(一)
2019-08-15 00:11:22 】 浏览:190
Tags:Scala 异步 事件 处理

在任何并发性应用程序中,异步事件处理都至关重要。无论事件的来源是什么(不同的计算任务、I/O 操作或与外部系统的交互),您的代码都必须跟踪事件,协调为响应它们而执行的操作。应用程序可以采用两种基本方法之一来实现异步事件处理:

  • 阻塞:一个等待事件的协调线程。
  • 非阻塞:事件向应用程序生成某种形式的通知,而没有线程显式等待它。

合成事件

scala.concurrent.Promise 和 scala.concurrent.Future 类为 Scala 开发人员提供了一些与 Java 8 开发人员的 CompletableFuture 使用方式类似的选项。具体地讲,Future 同时提供了阻塞和非阻塞的事件完成方式。但是,尽管在此级别上很相似,但用于处理两种 future 的技术是不同的。

我们先来看一个并发任务设置:

任务和排序

在一个特定操作中,应用程序通常必须执行多个处理步骤。例如,在向用户返回结果之前,Web 应用程序可能需要:

  1. 在一个数据库中查找用户的信息
  2. 使用查找到的信息来执行 Web 服务调用,并执行另一次数据库查询。
  3. 根据从前两个操作中获得的结果来执行数据库更新。
    图 1 演示了这种结构类型。

图 1. 应用程序任务流

图 1 将处理过程分解为 4 个不同的任务,它们通过表示顺序依赖关系的箭头相连接。任务 1 可以直接执行,任务 2 和任务 3 都在任务 1 完成后执行,任务 4 在任务 2 和任务 3 都完成后执行。

建模异步事件

在真实的系统中,异步事件的来源一般是并行计算或一种形式的 I/O 操作。但是,使用简单的时间延迟来建模这种系统会更容易一些,这也是这里所采用的方法。清单 1 显示了我用于生成事件的基本的赋时事件 (timed-event) 代码,这些事件采用了已完成的 Future 格式。

清单 1. 赋时事件代码

import java.util.Timer
import java.util.TimerTask
 
import scala.concurrent._
 
object TimedEvent {
  val timer = new Timer
 
  /** Return a Future which completes successfully with the supplied value after secs seconds. */
  def delayedSuccess[T](secs: Int, value: T): Future[T] = {
    val result = Promise[T]
    timer.schedule(new TimerTask() {
      def run() = {
        result.success(value)
      }
    }, secs * 1000)
    result.future
  }
 
  /** Return a Future which completes failing with an IllegalArgumentException after secs
    * seconds. */
  def delayedFailure(secs: Int, msg: String): Future[Int] = {
    val result = Promise[Int]
    timer.schedule(new TimerTask() {
      def run() = {
        result.failure(new IllegalArgumentException(msg))
      }
    }, secs * 1000)
    result.future
  }

清单 1 中的 Scala 代码使用一个 java.util.Timer 来安排 java.util.TimerTask 在一个延迟之后执行。每个 TimerTask 在运行时完成一个有关联的 future。delayedSuccess 函数定制了一个任务,在运行时成功完成一个 Scala Future[T],然后将该 future 返回给调用方。delayedSuccess 函数返回相同类型的 future,但使用了一个在完成 future 时发生 IllegalArgumentException 异常的失败任务。

清单 2 展示了如何使用 清单 1 中的代码创建 Future[Int] 格式的事件,使之与 图 1 中的 4 个任务相匹配。(此代码来自示例代码中的 AsyncHappy 类。)

清单 2. 示例任务的事件

// task definitions
def task1(input: Int) = TimedEvent.delayedSuccess(1, input + 1)
def task2(input: Int) = TimedEvent.delayedSuccess(2, input + 2)
def task3(input: Int) = TimedEvent.delayedSuccess(3, input + 3)
def task4(input: Int) = TimedEvent.delayedSuccess(1, input + 4)

清单 2 中 4 个任务方法中的每一个都为该任务的完成时刻使用了特定的延迟值:task1 为 1 秒,task2 为 2 秒,task3 为 3 秒,task4 重新变为 1 秒。每个任务还接受一个输入值,是该输入加上任务编号作为 future 的(最终)结果值。这些方法都使用了 future 的成功形式;稍后您会看到一些使用失败形式的例子。

这些任务要求您按 图 1 中所示的顺序运行它们,向每个任务传递上一个任务返回的结果值(或者对于 task4,传递前两个任务结果的和)。如果中间两个任务同时执行,总的执行时间大约为 5 秒(1 秒 + (2 秒、3 秒中的最大值)+ 1 秒。如果 task1 的输入为 1,那么结果为 2。如果该结果被传递给 task2 和 task3,那么结果将为 4 和 5。如果这两个结果的和 (9) 被作为输入传递给 task4,那么最终结果将为 13。

阻塞等待

在设定好操作环境之后,是时候来查看 Scala 如何处理事件的完成情况了。与上一期的 Java 代码中一样,协调 4 个任务的执行的最简单的方法是使用阻塞等待:主要线程等待每个任务依次完成。清单 3(同样来自示例代码中的 AsyncHappy 类)给出了此方法。

清单 3. 阻塞等待任务执行

def runBlocking() = {
  val v1 = Await.result(task1(1), Duration.Inf)
  val future2 = task2(v1)
  val future3 = task3(v1)
  val v2 = Await.result(future2, Duration.Inf)
  val v3 = Await.result(future3, Duration.Inf)
  val v4 = Await.result(task4(v2 + v3), Duration.Inf)
  val result = Promise[Int]
  result.success(v4)
  result.future
}

清单 3 使用 Scala scala.concurrent.Await 对象的 result() 方法来完成阻塞等待。该代码首先等待 task1 的结果,然后同时创建 task2 和 task3 future,并等待两个任务依次返回 future,最后等待 task4 的结果。最后 3 行(创建和设置 result)使得该方法能够返回一个 Future[Int]。返回该 future,让此方法与我接下来展示的非阻塞形式一致,但该 future 将在该方法返回之前完成。

组合 future

清单 4(同样来自示例代码中的 AsyncHappy 类)展示了一种将 f

首页 上一页 1 2 3 4 下一页 尾页 1/4/4
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Scala环境搭建及Intellij IDEA安装 下一篇Scala模式匹配常用

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目