ufferedChannel(10):设置 channel 的缓存大小;
rxgo.WithPool(n)/rxgo.WithCpuPool()
:使用多个 goroutine 执行转换操作;
rxgo.WithPublishStrategy()
:使用发布策略,即创建可连接的 Observable。
rxgo还有很多其他选项,具体看官方文档,地址:
https://github.com/ReactiveX/RxGo/blob/v2.5.0/doc/options.md
9、简化的真实案例
假设现在有一个定时处理任务,结构如下:
type ScheduledTask struct {
RecordId int
HandleStartTime time.Time
Status bool
}
在执行具体的任务时,需要去数据库查询下是否已经被取消了,如果已经被取消掉的,则不再执行。
完整代码如下:
package main
import (
"fmt"
"github.com/reactivex/rxgo/v2"
"time"
)
type ScheduledTask struct {
RecordId int
HandleStartTime string
Status bool
}
func main() {
ch := make(chan rxgo.Item)
go producer(ch)
time.Sleep(time.Second*3)
observable := rxgo.FromChannel(ch)
observable = observable.Filter(func(i interface{}) bool {
st := i.(*ScheduledTask)
return st.Status
}, rxgo.WithBufferedChannel(1))
// 消费可观测量
for customer := range observable.Observe() {
st := customer.V.(*ScheduledTask)
fmt.Printf("resutl: --> %+v\n", st)
}
}
func producer(ch chan <- rxgo.Item) {
for i := 0; i < 10; i++ {
status := false
if i % 2 == 0 {
status = true
}
st := &ScheduledTask{
RecordId: i,
HandleStartTime: time.Now().Format("2006-01-02 13:04:05"),
Status: status,
}
ch <- rxgo.Of(st)
}
// 这里千万不要忘记了
close(ch)
}
结果:
resutl: --> &{RecordId:0 HandleStartTime:2023-04-22 46:04:07 Status:true}
resutl: --> &{RecordId:2 HandleStartTime:2023-04-22 46:04:10 Status:true}
resutl: --> &{RecordId:4 HandleStartTime:2023-04-22 46:04:10 Status:true}
resutl: --> &{RecordId:6 HandleStartTime:2023-04-22 46:04:10 Status:true}
resutl: --> &{RecordId:8 HandleStartTime:2023-04-22 46:04:10 Status:true}
参考链接
Go 每日一库之 rxgo
官方例子