最近的项目用到了 RxGo ,因为之前从没有接触过,特意去学了学,特此记录下。文章很多内容是复制了参考资料或者官方文档。如果涉及侵权,请联系删除,谢谢。
1、RxGo简介
1.1 基础介绍
RxGo
是一个基于Go语言的响应式编程库,它提供了一种简单而强大的方式来处理异步事件流和数据流
。RxGo的设计灵感来自于ReactiveX,它提供了类似于ReactiveX的操作符和概念,如Observable、Observer、Subject、Scheduler等。
RxGo的目标是提供一种简单而强大的方式来处理异步事件流和数据流,使得开发人员可以更容易地编写高效、可维护和可扩展的代码。RxGo的特点包括:
- 响应式编程:
RxGo
提供了Observable和Observer
两个核心概念,使得开发人员可以更容易地处理异步事件流和数据流。 - 操作符:
RxGo
提供了类似于ReactiveX的操作符,如map、filter、reduce等,使得开发人员可以更容易地对事件流进行转换、过滤和聚合等操作。 - 调度器:
RxGo
提供了调度器,使得开发人员可以更容易地控制事件流的执行线程和顺序。 - 可组合性:
RxGo
的操作符具有可组合性,使得开发人员可以更容易地组合多个操作符来实现复杂的操作。 - 高效性:
RxGo
的设计和实现都非常高效,可以处理大量的事件流和数据流。
总之,RxGo
是一个非常强大和实用的响应式编程库,它可以帮助开发人员更容易地处理异步事件流和数据流,提高代码的可维护性和可扩展性。
1.2 RxGo 数据流程图
RxGo的实现基于管道的概念。管道是由通道连接的一系列阶段,其中每个阶段是运行相同功能的一组goroutine。
- 使用
Just
操作符创建一个基于固定列表的静态可观测数据。 - 使用
Map
操作符定义了一个转换函数(把圆形变成方形)。 - 用
Filter
操作符过滤掉黄色方形。
从上面的例子中可以看出来,最终生成的数据被发送到一个通道中,消费者读取数据进行消费。RxGo
中有很多种消费和生成数据的方式,发布结果到通道中只是其中一种方式。
2、快速入门
2.1 安装 RxGo v2
go get -u github.com/reactivex/rxgo/v2
2.2 简单案例
我们先写一个简单的案例,来学习RxGo的简单使用。
package main
import (
"fmt"
"github.com/reactivex/rxgo/v2"
)
func main() {
observable := rxgo.Just(1, 2, 3, 4, 5)()
ch := observable.Observe()
for item := range ch {
fmt.Println(item.V)
}
}
使用 RxGo 的一般流程如下:
- 使用相关的 Operator 创建 Observable,Operator 就是用来创建 Observable 的。
- 中间各个阶段可以使用过滤操作筛选出我们想要的数据,使用转换操作对数据进行转换;
- 调用 Observable 的
Observe()
方法,该方法返回一个<- chan rxgo.Item
。然后for range
遍历即可。
结合上面的这张图,我们就比较容易理解RxGo的数据处理流程。因为例子比较简单,没有用到Map、Filter
操作。
执行结果:
$ go run main.go
1
2
3
4
5
Just
使用到柯里化的编程思想。
柯里化(Currying)是一种函数式编程的技术,它将一个接受多个参数的函数转换成一系列接受单个参数的函数。这些单参数函数可以被组合起来,以便在后续的计算中使用。
柯里化的主要优点是它可以使函数更加灵活和可复用。通过将函数分解为一系列单参数函数,我们可以更容易地组合和重用这些函数,从而减少代码的重复性和冗余性。
例如:
//柯里化的例子
func addCurried(x int) func(int) int {
return func(y int) int {
return x + y
}
}
func main() {
add5 := addCurried(5)
fmt.Println(add5(10))
}
由于 Go 不支持多个可变参数,Just
通过柯里化迂回地实现了这个功能:
//Just creates an Observable with the provided items.
func Just(items ...interface{}) func(opts ...Option) Observable {
return func(opts ...Option) Observable {
return &ObservableImpl{
iterable: newJustIterable(items...)(opts...),
}
}
}
Observe()
返回一个 Item 的chan ,Item的结构如下:
// Item is a wrapper having either a value or an error.
type Item struct {
V interface{}
E error
}
所以通过Just生成observable对象时,传入的数据可以包含错误,在使用时通过 item.Error() 来区分。
func main() {
observable := rxgo.Just(1, 2, errors.New("unknown"), 3, 4, 5)()
ch := observable.Observe()
for item := range ch {
if item.Error() {
fmt.Println("error:", item.E)
} else {
fmt.Println(item.V)
}
}
}
我们使用item.Error()
检查是否出现错误。然后使用item.V
访问数据,item.E
访问错误。
除了使用for range
之外,我们还可以调用 Observable 的ForEach()
方法来实现遍历。ForEach()
接受 3 个回调函数:
NextFunc
:类型为func (v interface {})
,传入的数据不包含错误类型时走此函数处理。ErrFunc
:类型为func (err error)
,当传入的数据包含错误时走此函数;CompletedFunc
:类型为func ()
,Observable 完成时调用。
有点Promise
那味了。使用ForEach()
,可以将上面的示例改写为:
func main() {
observable := rxgo.Just(1, 2, errors.New("这是一个测试错误!"), 4, 5)()
<-observable.ForEach(func(v interface{}) {
fmt.Println("received:", v)
}, func(err error) {
fmt.Println("error:", err)
}, func() {
fmt.Println("completed")
})
}
$ go run main.go
received: 1
received: 2
error: 这是一