ontext"
"fmt"
"github.com/reactivex/rxgo/v2"
"time"
)
func Supplier1(ctx context.Context) rxgo.Item {
deadline, ok := ctx.Deadline()
fmt.Println("Supplier1", deadline, ok)
time.Sleep(time.Second)
return rxgo.Of(1)
}
func Supplier2(ctx context.Context) rxgo.Item {
deadline, ok := ctx.Deadline()
fmt.Println("Supplier2", deadline, ok)
time.Sleep(time.Second)
return rxgo.Of(2)
}
func Supplier3(ctx context.Context) rxgo.Item {
deadline, ok := ctx.Deadline()
fmt.Println("Supplier3", deadline, ok)
time.Sleep(time.Second)
return rxgo.Of(3)
}
func main() {
ctx, _ := context.WithTimeout(context.Background(), time.Second*2)
observable := rxgo.Start([]rxgo.Supplier{Supplier1, Supplier2, Supplier3}, rxgo.WithContext(ctx))
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
4、Observable 分类
根据数据在何处生成,Observable 被分为 Hot 和 Cold 两种类型。
- Hot Observable:热可观测量,数据由可观测量外部产生。
- Cold Observable:冷可观测量,数据由可观测量内部产生。
通常不想一次性的创建所有的数据,使用 热可观测量。
4.1 热可观测量示例
func main() {
ch := make(chan rxgo.Item)
go func() {
for i := 0; i < 3; i++ {
ch <- rxgo.Of(i)
}
close(ch)
}()
observable := rxgo.FromChannel(ch)
for item := range observable.Observe() {
fmt.Println(item.V)
}
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
结果:
0
1
2
上面创建的是 Hot Observable。但是有个问题,第一次Observe()
消耗了所有的数据,第二个就没有数据输出了。(可以用可连接的观测量来修改这一行为,后面再说)。
4.2 冷可观测量示例
Cold Observable 就不会有这个问题,因为它创建的流是独立于每个观察者的。即每次调用Observe()
都创建一个新的 channel。我们使用Defer()
方法创建 Cold Observable,它的参数与Create()
方法一样。
func main() {
observable := rxgo.Defer([]rxgo.Producer{func(_ context.Context, ch chan<- rxgo.Item) {
for i := 0; i < 3; i++ {
ch <- rxgo.Of(i)
}
}})
for item := range observable.Observe() {
fmt.Println(item.V)
}
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
Defer源码介绍:
// Defer does not create the Observable until the observer subscribes,
// and creates a fresh Observable for each observer.
func Defer(f []Producer, opts ...Option) Observable {
return &ObservableImpl{
iterable: newDeferIterable(f, opts...),
}
}
执行结果:
$ go run main.go
0
1
2
0
1
2
4.3 可连接的 Observable
可连接的(Connectable)Observable 对普通的 Observable 进行了一层组装。调用它的Observe()
方法时并不会立刻产生数据。使用它,我们可以等所有的观察者都准备就绪了(即调用了Observe()
方法)之后,再调用其Connect()
方法开始生成数据。我们通过两个示例比较使用普通的 Observable 和可连接的 Observable 有何不同。
4.3.1 普通的Observable,并不是可连接的Observable
func main() {
ch := make(chan rxgo.Item)
go func() {
for i := 1; i <= 3; i++ {
ch <- rxgo.Of(i)
}
close(ch)
}()
observable := rxgo.FromChannel(ch)
observable.DoOnNext(func(i interface{}) {
fmt.Printf("First observer: %d\n", i)
})
time.Sleep(3 * time.Second)
fmt.Println("before subscribe second observer")
observable.DoOnNext(func(i interface{}) {
fmt.Printf("Second observer: %d\n", i)
})
time.Sleep(3 * time.Second)
}
上例中我们使用DoOnNext()
方法来注册观察者。由于DoOnNext()
方法是异步执行的,所以为了等待结果输出,在最后增加了一行time.Sleep
。运行结果:
First observer: 1
First observer: 2
First observer: 3
before subscribe second observer
由输出可以看出,注册第一个观察者之后就开始产生数据了。第二个观察者并不会得到数据。
4.3.2 可连接的Observable
通过在创建 Observable 的方法中指定rxgo.WithPublishStrategy()
选项就可以创建可连接的 Observable:
- 重点是传入
rxgo.WithPublishStrategy()
func