设为首页 加入收藏

TOP

Go中响应式编程库RxGo详细介绍(五)
2023-07-23 13:29:43 】 浏览:128
Tags:应式编 程库 RxGo
serve() { fmt.Println(item.V) } }

由于Unmarshaller接受[]byte类型的参数,我们在Unmarshal之前加了一个Map用于将string转为[]byte。运行结果:

&{dj 18}
&{jw 20}

5.4 Buffer

Buffer按照一定的规则收集接收到的数据,然后一次性发送出去(作为切片),而不是收到一个发送一个。有 3 种类型的Buffer

  • BufferWithCount(n):每收到n个数据发送一次,最后一次可能少于n个;
  • BufferWithTime(n):发送在一个时间间隔n内收到的数据;
  • BufferWithTimeOrCount(d, n):收到n个数据,或经过d时间间隔,发送当前收到的数据。
5.4.1 BufferWithCount
func main() {
	observable := rxgo.Range(0, 5)

	observable = observable.BufferWithCount(2)

	for item := range observable.Observe() {
		fmt.Println(item.V)
	}
}

执行结果:

[0 1]
[2 3]
[4]

最后一组只有一个。

5.4.2 BufferWithTime
unc main() {
	ch := make(chan rxgo.Item, 1)

	go func() {
		i := 0
		for range time.Tick(time.Second) {
			ch <- rxgo.Of(i)
			i++
		}
	}()

	observable := rxgo.FromChannel(ch).BufferWithTime(rxgo.WithDuration(2 * time.Second))

	layout := "2006-01-02 13:04:05"
	fmt.Println("startTime", time.Now().Format(layout))
	for item := range observable.Observe() {
		fmt.Println(item.V)
		fmt.Println("nextTime", time.Now().Format(layout))

	}
}

执行结果是不确定的,这里需要注意:

startTime 2023-04-22 44:15:49
[0]
nextTime 2023-04-22 44:15:51
[1 2]
nextTime 2023-04-22 44:15:53
[3 4 5]
nextTime 2023-04-22 44:15:55
...
5.4.3 BufferWithTimeOrCount
func main() {
	ch := make(chan rxgo.Item, 1)

	go func() {
		i := 0
		for range time.Tick(time.Second) {
			ch <- rxgo.Of(i)
			i++
		}
	}()

	observable := rxgo.FromChannel(ch).BufferWithTimeOrCount(rxgo.WithDuration(2*time.Second), 2)

	layout := "2006-01-02 13:04:05"
	fmt.Println("startTime", time.Now().Format(layout))
	for item := range observable.Observe() {
		fmt.Println(item.V)
		fmt.Println("nextTime", time.Now().Format(layout))
	}
}

执行结果:

startTime 2023-04-22 44:18:48
[0]
nextTime 2023-04-22 44:18:50
[1 2]
nextTime 2023-04-22 44:18:51
[3 4]
nextTime 2023-04-22 44:18:53

BufferWithTimeOrCount是以BufferWithCount、BufferWithTime谁先满足条件为准,谁先满足谁就先执行。

5.5 GroupBy

``GroupBy将一个Observable分成多个子Observable,每个子Observable`包含相同的索引值的元素。

GroupBy函数定义如下:

GroupBy(length int, distribution func(Item) int, opts ...Option) Observable

即将一个Observable分成length个子Observable,根据distribution函数返回的int作为分组的依据。

package main

import (
	"fmt"

	"github.com/reactivex/rxgo/v2"
)

func main() {
	// 创建一个Observable,它发出一些整数值
	source := rxgo.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)()

	// 使用GroupBy操作符将整数值按照奇偶性进行分组
	grouped := source.GroupBy(2, func(item rxgo.Item) int {
		return item.V.(int) % 2
	}, rxgo.WithBufferedChannel(10))

	for subObservable := range grouped.Observe() {
		fmt.Println("new subObservable ------ ")
		for item := range subObservable.V.(rxgo.Observable).Observe() {
			fmt.Printf("%v\n", item.V)
		}
	}

}

上面根据每个数模 3 的余数将整个流分为 3 组。运行:

new subObservable ------ 
2
4
6
8
10
new subObservable ------ 
1
3
5
7
9

注意rxgo.WithBufferedChannel(10)的使用,由于我们的数字是连续生成的,依次为 0->1->2->…->9->10。而 Observable 默认是惰性的,即由Observe()驱动。内层的Observe()在返回一个 0 之后就等待下一个数,但是下一个数 1 不在此 Observable 中。所以会陷入死锁。使用rxgo.WithBufferedChannel(10),设置它们之间的连接 channel 缓冲区大小为 10,这样即使我们未取出 channel 里面的数字,上游还是能发送数字进来。

6、并行操作

默认情况下,这些转换操作都是串行的,即只有一个 goroutine 负责执行转换函数。从上面的Map操作也可以得知默认是串行执行的。可

首页 上一页 2 3 4 5 6 7 下一页 尾页 5/7/7
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇gRPC入门 下一篇LAL v0.35.4发布,OBS支持RTMP H2..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目