serve() {
fmt.Println(item.V)
}
}
由于Unmarshaller
接受[]byte
类型的参数,我们在Unmarshal
之前加了一个Map
用于将string
转为[]byte
。运行结果:
&{dj 18}
&{jw 20}
5.4 Buffer
Buffer
按照一定的规则收集接收到的数据,然后一次性发送出去(作为切片),而不是收到一个发送一个。有 3 种类型的Buffer
:
BufferWithCount(n)
:每收到n
个数据发送一次,最后一次可能少于n
个;
BufferWithTime(n)
:发送在一个时间间隔n
内收到的数据;
BufferWithTimeOrCount(d, n)
:收到n
个数据,或经过d
时间间隔,发送当前收到的数据。
5.4.1 BufferWithCount
func main() {
observable := rxgo.Range(0, 5)
observable = observable.BufferWithCount(2)
for item := range observable.Observe() {
fmt.Println(item.V)
}
}
执行结果:
[0 1]
[2 3]
[4]
最后一组只有一个。
5.4.2 BufferWithTime
unc main() {
ch := make(chan rxgo.Item, 1)
go func() {
i := 0
for range time.Tick(time.Second) {
ch <- rxgo.Of(i)
i++
}
}()
observable := rxgo.FromChannel(ch).BufferWithTime(rxgo.WithDuration(2 * time.Second))
layout := "2006-01-02 13:04:05"
fmt.Println("startTime", time.Now().Format(layout))
for item := range observable.Observe() {
fmt.Println(item.V)
fmt.Println("nextTime", time.Now().Format(layout))
}
}
执行结果是不确定的,这里需要注意:
startTime 2023-04-22 44:15:49
[0]
nextTime 2023-04-22 44:15:51
[1 2]
nextTime 2023-04-22 44:15:53
[3 4 5]
nextTime 2023-04-22 44:15:55
...
5.4.3 BufferWithTimeOrCount
func main() {
ch := make(chan rxgo.Item, 1)
go func() {
i := 0
for range time.Tick(time.Second) {
ch <- rxgo.Of(i)
i++
}
}()
observable := rxgo.FromChannel(ch).BufferWithTimeOrCount(rxgo.WithDuration(2*time.Second), 2)
layout := "2006-01-02 13:04:05"
fmt.Println("startTime", time.Now().Format(layout))
for item := range observable.Observe() {
fmt.Println(item.V)
fmt.Println("nextTime", time.Now().Format(layout))
}
}
执行结果:
startTime 2023-04-22 44:18:48
[0]
nextTime 2023-04-22 44:18:50
[1 2]
nextTime 2023-04-22 44:18:51
[3 4]
nextTime 2023-04-22 44:18:53
BufferWithTimeOrCount
是以BufferWithCount、BufferWithTime
谁先满足条件为准,谁先满足谁就先执行。
5.5 GroupBy
``GroupBy将一个
Observable分成多个
子Observable,每个
子Observable`包含相同的索引值的元素。
GroupBy
函数定义如下:
GroupBy(length int, distribution func(Item) int, opts ...Option) Observable
即将一个Observable
分成length个子Observable
,根据distribution
函数返回的int作为分组的依据。
package main
import (
"fmt"
"github.com/reactivex/rxgo/v2"
)
func main() {
// 创建一个Observable,它发出一些整数值
source := rxgo.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)()
// 使用GroupBy操作符将整数值按照奇偶性进行分组
grouped := source.GroupBy(2, func(item rxgo.Item) int {
return item.V.(int) % 2
}, rxgo.WithBufferedChannel(10))
for subObservable := range grouped.Observe() {
fmt.Println("new subObservable ------ ")
for item := range subObservable.V.(rxgo.Observable).Observe() {
fmt.Printf("%v\n", item.V)
}
}
}
上面根据每个数模 3 的余数将整个流分为 3 组。运行:
new subObservable ------
2
4
6
8
10
new subObservable ------
1
3
5
7
9
注意rxgo.WithBufferedChannel(10)
的使用,由于我们的数字是连续生成的,依次为 0->1->2->…->9->10。而 Observable 默认是惰性的,即由Observe()
驱动。内层的Observe()
在返回一个 0 之后就等待下一个数,但是下一个数 1 不在此 Observable 中。所以会陷入死锁。使用rxgo.WithBufferedChannel(10)
,设置它们之间的连接 channel 缓冲区大小为 10,这样即使我们未取出 channel 里面的数字,上游还是能发送数字进来。
6、并行操作
默认情况下,这些转换操作都是串行的,即只有一个 goroutine 负责执行转换函数。从上面的Map
操作也可以得知默认是串行执行的。可