设为首页 加入收藏

TOP

Go中响应式编程库RxGo详细介绍(七)
2023-07-23 13:29:43 】 浏览:124
Tags:应式编 程库 RxGo
以改变这一默认行为,使用rxgo.WithPool(n)选项设置运行n个 goroutine,或者rxgo.WitCPUPool()选项设置运行与逻辑 CPU 数量相等的 goroutine。

package main

import (
	"context"
	"fmt"
	"github.com/reactivex/rxgo/v2"
	"math/rand"
	"time"
)

func main() {
	observable := rxgo.Range(1, 10)

	observable = observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
		time.Sleep(time.Duration(rand.Int31()))
		return i.(int) + 1, nil
	}, rxgo.WithCPUPool())

	for item := range observable.Observe() {
		fmt.Println(item.V)
	}
}

结果:

8
9
10
6
5
11
2
4
7
3

由于是并行运算,所以结果是不固定的。

我们可以直接看官网的介绍:https://github.com/ReactiveX/RxGo/blob/v2.5.0/doc/options.md

7、过滤 Observable

我们可以对Observable 中发送过来的数据进行过滤,过滤掉不需要的数据,有以下方式:

  • Filter

  • ElementAt

  • Debounce

  • Distinct

  • Skip

  • Take

下面的内容大多来自官方的示例,地址:https://github.com/ReactiveX/RxGo/tree/v2.5.0/doc

7.1 Filter

Filter()接受一个类型为func (i interface{}) bool的参数,通过的数据使用这个函数断言,返回true的将发送给下一个阶段。否则,丢弃。

package main

import (
	"fmt"
	"github.com/reactivex/rxgo/v2"
)

func main() {
	observable := rxgo.Just(1, 2, 3)().
		Filter(func(i interface{}) bool {
			return i != 2
		})
	for item := range observable.Observe() {
		fmt.Println(item.V)
	}
}

结果:

1
3

7.2 ElementAt

ElementAt()只发送指定索引的数据,如ElementAt(2)只发送索引为 2 的数据,即第 3 个数据。

package main

import (
	"fmt"
	"github.com/reactivex/rxgo/v2"
)

func main() {
	observable := rxgo.Just(0, 1, 2, 3, 4)().ElementAt(2)
	for item := range observable.Observe() {
		fmt.Println(item.V)
	}
}

结果:

2

7.3 Debounce

只有当特定的时间跨度已经过去而没有发出另一个Item时,才从Observable发出一个Item

package main

import (
	"fmt"
	"github.com/reactivex/rxgo/v2"
	"time"
)

func main() {
	ch := make(chan rxgo.Item)

	go func() {
		ch <- rxgo.Of(1)
		time.Sleep(2 * time.Second)
		ch <- rxgo.Of(2)
		ch <- rxgo.Of(3)
		time.Sleep(2 * time.Second)
		close(ch)
	}()

	observable := rxgo.FromChannel(ch).Debounce(rxgo.WithDuration(1 * time.Second))
	for item := range observable.Observe() {
		fmt.Println(item.V)
	}
}

结果:

1
3

上面示例,先收到 1,然后 2s 内没收到数据,所以发送 1。接着收到了数据 2,由于马上又收到了 3,所以 2 不会发送。收到 3 之后 2s 内没有收到数据,发送了 3。所以最后输出为 1,3。

7.4 Distinct

Distinct()会记录它发送的所有数据,它不会发送重复的数据。由于数据格式多样,Distinct()要求我们提供一个函数,根据原数据返回一个唯一标识码(有点类似哈希值)。基于这个标识码去重。

package main

import (
	"context"
	"fmt"
	"github.com/reactivex/rxgo/v2"
)

func main() {
	observable := rxgo.Just(1, 2, 2, 3, 4, 4, 5)().
		Distinct(func(_ context.Context, i interface{}) (interface{}, error) {
			return i, nil
		})
	for item := range observable.Observe() {
		fmt.Println(item.V)
	}
}

结果:

1
2
3
4
5

7.5 Skip

Skip可以跳过前若干个数据。

package main

import (
	"fmt"
	"github.com/reactivex/rxgo/v2"
)

func main() {
	observable := rxgo.Just(1, 2, 3, 4, 5)().Skip(2)
	for item := range observable.Observe() {
		fmt.Println(item.V)
	}
}

结果:

3
4
5

7.6 Take

Take只取前若干个数据。

package main

import (
	"fmt"
	"github.com/reactivex/rxgo/v2"
)

func main() {
	observable := rxgo.Just(1, 2, 3, 4, 5)().Take(2)
	for item := range observable.Observe() {
		fmt.Println(item.V)
	}
}

结果:

1
2

8、选项

因为golang中不支持默认参数,所以我们经常会用到选项设计模式,rxgo中也大量使用到了此模式。

  • rxgo.WithB
首页 上一页 4 5 6 7 下一页 尾页 7/7/7
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇gRPC入门 下一篇LAL v0.35.4发布,OBS支持RTMP H2..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目