设为首页 加入收藏

TOP

Go中响应式编程库RxGo详细介绍(三)
2023-07-23 13:29:43 】 浏览:123
Tags:应式编 程库 RxGo
ontext" "fmt" "github.com/reactivex/rxgo/v2" "time" ) func Supplier1(ctx context.Context) rxgo.Item { deadline, ok := ctx.Deadline() fmt.Println("Supplier1", deadline, ok) time.Sleep(time.Second) return rxgo.Of(1) } func Supplier2(ctx context.Context) rxgo.Item { deadline, ok := ctx.Deadline() fmt.Println("Supplier2", deadline, ok) time.Sleep(time.Second) return rxgo.Of(2) } func Supplier3(ctx context.Context) rxgo.Item { deadline, ok := ctx.Deadline() fmt.Println("Supplier3", deadline, ok) time.Sleep(time.Second) return rxgo.Of(3) } func main() { ctx, _ := context.WithTimeout(context.Background(), time.Second*2) observable := rxgo.Start([]rxgo.Supplier{Supplier1, Supplier2, Supplier3}, rxgo.WithContext(ctx)) for item := range observable.Observe() { fmt.Println(item.V) } }

4、Observable 分类

根据数据在何处生成,Observable 被分为 HotCold 两种类型。

  • Hot Observable:热可观测量,数据由可观测量外部产生。
  • Cold Observable:冷可观测量,数据由可观测量内部产生。

通常不想一次性的创建所有的数据,使用 热可观测量。

4.1 热可观测量示例

func main() {
  ch := make(chan rxgo.Item)
  go func() {
    for i := 0; i < 3; i++ {
      ch <- rxgo.Of(i)
    }
    close(ch)
  }()

  observable := rxgo.FromChannel(ch)

  for item := range observable.Observe() {
    fmt.Println(item.V)
  }

  for item := range observable.Observe() {
    fmt.Println(item.V)
  }
}

结果:

0
1
2

上面创建的是 Hot Observable。但是有个问题,第一次Observe()消耗了所有的数据,第二个就没有数据输出了。(可以用可连接的观测量来修改这一行为,后面再说)。

4.2 冷可观测量示例

Cold Observable 就不会有这个问题,因为它创建的流是独立于每个观察者的。即每次调用Observe()都创建一个新的 channel。我们使用Defer()方法创建 Cold Observable,它的参数与Create()方法一样。

func main() {
  observable := rxgo.Defer([]rxgo.Producer{func(_ context.Context, ch chan<- rxgo.Item) {
    for i := 0; i < 3; i++ {
      ch <- rxgo.Of(i)
    }
  }})

  for item := range observable.Observe() {
    fmt.Println(item.V)
  }

  for item := range observable.Observe() {
    fmt.Println(item.V)
  }
}

Defer源码介绍:

// Defer does not create the Observable until the observer subscribes,
// and creates a fresh Observable for each observer.
func Defer(f []Producer, opts ...Option) Observable {
	return &ObservableImpl{
		iterable: newDeferIterable(f, opts...),
	}
}

执行结果:

$ go run main.go
0
1
2
0
1
2

4.3 可连接的 Observable

可连接的(Connectable)Observable 对普通的 Observable 进行了一层组装。调用它的Observe()方法时并不会立刻产生数据。使用它,我们可以等所有的观察者都准备就绪了(即调用了Observe()方法)之后,再调用其Connect()方法开始生成数据。我们通过两个示例比较使用普通的 Observable 和可连接的 Observable 有何不同。

4.3.1 普通的Observable,并不是可连接的Observable
func main() {
  ch := make(chan rxgo.Item)
  go func() {
    for i := 1; i <= 3; i++ {
      ch <- rxgo.Of(i)
    }
    close(ch)
  }()

  observable := rxgo.FromChannel(ch)

  observable.DoOnNext(func(i interface{}) {
    fmt.Printf("First observer: %d\n", i)
  })

  time.Sleep(3 * time.Second)
  fmt.Println("before subscribe second observer")

  observable.DoOnNext(func(i interface{}) {
    fmt.Printf("Second observer: %d\n", i)
  })

  time.Sleep(3 * time.Second)
}

上例中我们使用DoOnNext()方法来注册观察者。由于DoOnNext()方法是异步执行的,所以为了等待结果输出,在最后增加了一行time.Sleep。运行结果:

First observer: 1
First observer: 2
First observer: 3
before subscribe second observer

由输出可以看出,注册第一个观察者之后就开始产生数据了。第二个观察者并不会得到数据。

4.3.2 可连接的Observable

通过在创建 Observable 的方法中指定rxgo.WithPublishStrategy()选项就可以创建可连接的 Observable

  • 重点是传入rxgo.WithPublishStrategy()
func
首页 上一页 1 2 3 4 5 6 7 下一页 尾页 3/7/7
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇gRPC入门 下一篇LAL v0.35.4发布,OBS支持RTMP H2..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目