以改变这一默认行为,使用rxgo.WithPool(n)
选项设置运行n
个 goroutine,或者rxgo.WitCPUPool()
选项设置运行与逻辑 CPU 数量相等的 goroutine。
package main
import (
"context"
"fmt"
"github.com/reactivex/rxgo/v2"
"math/rand"
"time"
)
func main() {
observable := rxgo.Range(1, 10)
observable = observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
time.Sleep(time.Duration(rand.Int31()))
return i.(int) + 1, nil
}, rxgo.WithCPUPool())
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
结果:
8
9
10
6
5
11
2
4
7
3
由于是并行运算,所以结果是不固定的。
我们可以直接看官网的介绍:https://github.com/ReactiveX/RxGo/blob/v2.5.0/doc/options.md
7、过滤 Observable
我们可以对Observable 中发送过来的数据进行过滤,过滤掉不需要的数据,有以下方式:
-
Filter
-
ElementAt
-
Debounce
-
Distinct
-
Skip
-
Take
下面的内容大多来自官方的示例,地址:https://github.com/ReactiveX/RxGo/tree/v2.5.0/doc
7.1 Filter
Filter()
接受一个类型为func (i interface{}) bool
的参数,通过的数据使用这个函数断言,返回true
的将发送给下一个阶段。否则,丢弃。
package main
import (
"fmt"
"github.com/reactivex/rxgo/v2"
)
func main() {
observable := rxgo.Just(1, 2, 3)().
Filter(func(i interface{}) bool {
return i != 2
})
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
结果:
1
3
7.2 ElementAt
ElementAt()
只发送指定索引的数据,如ElementAt(2)
只发送索引为 2 的数据,即第 3 个数据。
package main
import (
"fmt"
"github.com/reactivex/rxgo/v2"
)
func main() {
observable := rxgo.Just(0, 1, 2, 3, 4)().ElementAt(2)
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
结果:
2
7.3 Debounce
只有当特定的时间跨度已经过去而没有发出另一个Item
时,才从Observable发出一个Item
。
package main
import (
"fmt"
"github.com/reactivex/rxgo/v2"
"time"
)
func main() {
ch := make(chan rxgo.Item)
go func() {
ch <- rxgo.Of(1)
time.Sleep(2 * time.Second)
ch <- rxgo.Of(2)
ch <- rxgo.Of(3)
time.Sleep(2 * time.Second)
close(ch)
}()
observable := rxgo.FromChannel(ch).Debounce(rxgo.WithDuration(1 * time.Second))
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
结果:
1
3
上面示例,先收到 1,然后 2s 内没收到数据,所以发送 1。接着收到了数据 2,由于马上又收到了 3,所以 2 不会发送。收到 3 之后 2s 内没有收到数据,发送了 3。所以最后输出为 1,3。
7.4 Distinct
Distinct()
会记录它发送的所有数据,它不会发送重复的数据。由于数据格式多样,Distinct()
要求我们提供一个函数,根据原数据返回一个唯一标识码(有点类似哈希值)。基于这个标识码去重。
package main
import (
"context"
"fmt"
"github.com/reactivex/rxgo/v2"
)
func main() {
observable := rxgo.Just(1, 2, 2, 3, 4, 4, 5)().
Distinct(func(_ context.Context, i interface{}) (interface{}, error) {
return i, nil
})
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
结果:
1
2
3
4
5
7.5 Skip
Skip
可以跳过前若干个数据。
package main
import (
"fmt"
"github.com/reactivex/rxgo/v2"
)
func main() {
observable := rxgo.Just(1, 2, 3, 4, 5)().Skip(2)
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
结果:
3
4
5
7.6 Take
Take
只取前若干个数据。
package main
import (
"fmt"
"github.com/reactivex/rxgo/v2"
)
func main() {
observable := rxgo.Just(1, 2, 3, 4, 5)().Take(2)
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
结果:
1
2
8、选项
因为golang中不支持默认参数,所以我们经常会用到选项设计模式,rxgo中也大量使用到了此模式。