设为首页 加入收藏

TOP

Go中响应式编程库RxGo详细介绍(六)
2023-07-23 13:29:43 】 浏览:125
Tags:应式编 程库 RxGo
ufferedChannel(10):设置 channel 的缓存大小;
  • rxgo.WithPool(n)/rxgo.WithCpuPool():使用多个 goroutine 执行转换操作;
  • rxgo.WithPublishStrategy():使用发布策略,即创建可连接的 Observable
  • rxgo还有很多其他选项,具体看官方文档,地址:

    https://github.com/ReactiveX/RxGo/blob/v2.5.0/doc/options.md

    9、简化的真实案例

    假设现在有一个定时处理任务,结构如下:

    type ScheduledTask struct {
    	RecordId int
    	HandleStartTime time.Time
    	Status bool
    }
    

    在执行具体的任务时,需要去数据库查询下是否已经被取消了,如果已经被取消掉的,则不再执行。

    完整代码如下:

    package main
    
    import (
    	"fmt"
    	"github.com/reactivex/rxgo/v2"
    	"time"
    )
    
    type ScheduledTask struct {
    	RecordId int
    	HandleStartTime string
    	Status bool
    }
    
    func main() {
    	ch := make(chan rxgo.Item)
    	go producer(ch)
    
    	time.Sleep(time.Second*3)
    	observable := rxgo.FromChannel(ch)
    	observable = observable.Filter(func(i interface{}) bool {
    		st := i.(*ScheduledTask)
    		return st.Status
    	}, rxgo.WithBufferedChannel(1))
    
    	// 消费可观测量
    	for customer := range observable.Observe() {
    		st := customer.V.(*ScheduledTask)
    		fmt.Printf("resutl: --> %+v\n", st)
    	}
    }
    
    func producer(ch chan <- rxgo.Item)  {
    	for i := 0; i < 10; i++ {
    		status := false
    		if i % 2 == 0 {
    			status = true
    		}
    		st := &ScheduledTask{
    			RecordId: i,
    			HandleStartTime: time.Now().Format("2006-01-02 13:04:05"),
    			Status: status,
    		}
    		ch <- rxgo.Of(st)
    	}
    	
      // 这里千万不要忘记了
    	close(ch)
    }
    
    

    结果:

    resutl: --> &{RecordId:0 HandleStartTime:2023-04-22 46:04:07 Status:true}
    resutl: --> &{RecordId:2 HandleStartTime:2023-04-22 46:04:10 Status:true}
    resutl: --> &{RecordId:4 HandleStartTime:2023-04-22 46:04:10 Status:true}
    resutl: --> &{RecordId:6 HandleStartTime:2023-04-22 46:04:10 Status:true}
    resutl: --> &{RecordId:8 HandleStartTime:2023-04-22 46:04:10 Status:true}
    

    参考链接

    Go 每日一库之 rxgo

    官方例子

    首页 上一页 3 4 5 6 7 下一页 尾页 6/7/7
    】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
    上一篇gRPC入门 下一篇LAL v0.35.4发布,OBS支持RTMP H2..

    最新文章

    热门文章

    Hot 文章

    Python

    C 语言

    C++基础

    大数据基础

    linux编程基础

    C/C++面试题目