设为首页 加入收藏

TOP

.Net并行编程 - Reactive Extensions(Rx)并发浅析(一)
2019-09-03 03:19:06 】 浏览:38
Tags:.Net 并行 编程 Reactive Extensions 并发 浅析

关于Reactive Extensions(Rx)

关于Reactive Extensions(Rx),先来看一下来自微软的官方描述:

The Reactive Extensions (Rx) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators. Using Rx, developers represent asynchronous data streams with Observablesquery asynchronous data streams using LINQ operators, and parameterize the concurrency in the asynchronous data streams using Schedulers. Simply put, Rx = Observables + LINQ + Schedulers.

“Reactive Extensions(Rx)是一个类库,它集成了异步、基于可观察(observable)序列的事件驱动编程和LINQ-style的查询操作。使用Rx,开发人员可以用observable对象描述异步数据流,使用LINQ操作符异步查询数据和使用Schedulers控制异步过程中的并发。简而言之,Rx = Observables + LINQ + Schedulers。”

 

Reactive Extensions(Rx)就一定是多线程?

在以上的描述中,反复出现了一个词“异步”。一般来讲,提到“异步”,首先反应到的就是多线程。那问题来了,使用Reactive Extensions就一定意味着多线程吗?先来看一个示例,代码来了:

 1 static void Main(string[] args)
 2 {
 3     Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
 4     var sub = new Subject<Object>();
 5 
 6     sub.Subscribe(o => Console.WriteLine("Received {1} on threadId:{0}",    //为Observable订阅处理器(handler),输出handler thread id
 7         Thread.CurrentThread.ManagedThreadId,
 8         o));
 9     ParameterizedThreadStart notify = obj =>    //委托定义,其内输出被观察对象的thread id
10     {
11         Console.WriteLine("OnNext({1}) on threadId:{0}",
12         Thread.CurrentThread.ManagedThreadId,
13         obj);
14         sub.OnNext(obj);
15     };
16     notify(1);
17     new Thread(notify).Start(2);
18     new Thread(notify).Start(3);
19 
20     Console.Read();
21 }

代码中,分别输出了通知者的thread id和callback handler的thread id。这里使用的是Rx默认的线程并发方式。输出结果如下:

 

无论是在当前线程调用,还是新启线程执行,通知者和处理方法所在线程均为同一个。在该示例中,Rx的线程分配是在free-threaded模式下工作的,free-threaded就意味着我们不强行指其Rx中的subscription, notification执行线程。这是Rx的默认工作模式,而这种模式下subscribing/call OnNext并没有引发新的线程来处理observable序列,线处理方式是单线程(Single Threaded Apartment,STA)。所以,我们可以这样说:单线程是Reactive Extensions(Rx)的默认处理方式。

 

使用SubscribeOn控制订阅(subscribing)的上下文

IObservable<TSource>的扩展方法SubscribeOn<TSource>(IScheduler)允许我们传入一调度器(Scheduler),控制订阅执行的上下文。

 1 static void Main(string[] args)
 2 {
 3     Console.WriteLine("Starting on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
 4     var source = Observable.Create<int>(
 5     o =>
 6     {
 7         Console.WriteLine("Invoked on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
 8         o.OnNext(1);
 9         o.OnNext(2);
10         o.OnNext(3);
11         o.OnCompleted();
12         Console.WriteLine("Finished on threadId:{0}",
13         Thread.CurrentThread.ManagedThreadId);
14         return Disposable.Empty;
15     });
16     source
17         //.SubscribeOn(Scheduler.ThreadPool)
18     .Subscribe(
19     o => Console.WriteLine("Received {1} on threadId:{0}",
20     Thread.CurrentThread.ManagedThreadId,
21     o),
22     () => Console.WriteLine("OnCompleted on threadId:{0}",
23     Thread.CurrentThread.ManagedThreadId));
24     Console.WriteLine("Subscribed on threadId:{0}", Thread.CurrentThread.ManagedThreadId);
25 
26     Console.Read();
27 }

代码中,使用Observable.Create创建一Observable序列,随后订阅该序列。输出结果为:

当序列被订阅source.Subscribe,代理Observable.Create被调用执行。首先是OnNext(1) handler,依次是OnNext(2) OnNext(3

首页 上一页 1 2 下一页 尾页 1/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇浅谈WPF中的MVVM框架--MVVMFounda.. 下一篇桥接模式-挣钱的设计模式。

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目