设为首页 加入收藏

TOP

Go中响应式编程库RxGo详细介绍(一)
2023-07-23 13:29:43 】 浏览:120
Tags:应式编 程库 RxGo

最近的项目用到了 RxGo ,因为之前从没有接触过,特意去学了学,特此记录下。文章很多内容是复制了参考资料或者官方文档。如果涉及侵权,请联系删除,谢谢。

1、RxGo简介

1.1 基础介绍

RxGo是一个基于Go语言的响应式编程库,它提供了一种简单而强大的方式来处理异步事件流和数据流。RxGo的设计灵感来自于ReactiveX,它提供了类似于ReactiveX的操作符和概念,如Observable、Observer、Subject、Scheduler等。

RxGo的目标是提供一种简单而强大的方式来处理异步事件流和数据流,使得开发人员可以更容易地编写高效、可维护和可扩展的代码。RxGo的特点包括:

  1. 响应式编程:RxGo提供了Observable和Observer两个核心概念,使得开发人员可以更容易地处理异步事件流和数据流。
  2. 操作符:RxGo提供了类似于ReactiveX的操作符,如map、filter、reduce等,使得开发人员可以更容易地对事件流进行转换、过滤和聚合等操作。
  3. 调度器:RxGo提供了调度器,使得开发人员可以更容易地控制事件流的执行线程和顺序。
  4. 可组合性:RxGo的操作符具有可组合性,使得开发人员可以更容易地组合多个操作符来实现复杂的操作。
  5. 高效性:RxGo的设计和实现都非常高效,可以处理大量的事件流和数据流。

总之,RxGo是一个非常强大和实用的响应式编程库,它可以帮助开发人员更容易地处理异步事件流和数据流,提高代码的可维护性和可扩展性。

1.2 RxGo 数据流程图

RxGo的实现基于管道的概念。管道是由通道连接的一系列阶段,其中每个阶段是运行相同功能的一组goroutine。

  • 使用Just操作符创建一个基于固定列表的静态可观测数据。
  • 使用Map操作符定义了一个转换函数(把圆形变成方形)。
  • Filter操作符过滤掉黄色方形。

从上面的例子中可以看出来,最终生成的数据被发送到一个通道中,消费者读取数据进行消费。RxGo中有很多种消费和生成数据的方式,发布结果到通道中只是其中一种方式。

2、快速入门

2.1 安装 RxGo v2

go get -u github.com/reactivex/rxgo/v2

2.2 简单案例

我们先写一个简单的案例,来学习RxGo的简单使用。

package main

import (
  "fmt"

  "github.com/reactivex/rxgo/v2"
)

func main() {
  observable := rxgo.Just(1, 2, 3, 4, 5)()
  ch := observable.Observe()
  for item := range ch {
    fmt.Println(item.V)
  }
}

使用 RxGo 的一般流程如下:

  • 使用相关的 Operator 创建 ObservableOperator 就是用来创建 Observable 的。
  • 中间各个阶段可以使用过滤操作筛选出我们想要的数据,使用转换操作对数据进行转换;
  • 调用 ObservableObserve()方法,该方法返回一个<- chan rxgo.Item。然后for range遍历即可。

结合上面的这张图,我们就比较容易理解RxGo的数据处理流程。因为例子比较简单,没有用到Map、Filter操作。

执行结果:

$ go run main.go 
1
2
3
4
5

Just使用到柯里化的编程思想。
柯里化(Currying)是一种函数式编程的技术,它将一个接受多个参数的函数转换成一系列接受单个参数的函数。这些单参数函数可以被组合起来,以便在后续的计算中使用。

柯里化的主要优点是它可以使函数更加灵活和可复用。通过将函数分解为一系列单参数函数,我们可以更容易地组合和重用这些函数,从而减少代码的重复性和冗余性。

例如:

//柯里化的例子
func addCurried(x int) func(int) int {
	return func(y int) int {
		return x + y
	}
}

func main()  {
	add5 := addCurried(5)
	fmt.Println(add5(10))
}

由于 Go 不支持多个可变参数,Just通过柯里化迂回地实现了这个功能:

//Just creates an Observable with the provided items.
func Just(items ...interface{}) func(opts ...Option) Observable {
  return func(opts ...Option) Observable {
    return &ObservableImpl{
      iterable: newJustIterable(items...)(opts...),
    }
  }
}

Observe()返回一个 Item 的chan ,Item的结构如下:

// Item is a wrapper having either a value or an error.
type	Item struct {
		V interface{}
		E error
	}

所以通过Just生成observable对象时,传入的数据可以包含错误,在使用时通过 item.Error() 来区分。

func main() {
  observable := rxgo.Just(1, 2, errors.New("unknown"), 3, 4, 5)()
  ch := observable.Observe()
  for item := range ch {
    if item.Error() {
      fmt.Println("error:", item.E)
    } else {
      fmt.Println(item.V)
    }
  }
}

我们使用item.Error()检查是否出现错误。然后使用item.V访问数据,item.E访问错误。

除了使用for range之外,我们还可以调用 ObservableForEach()方法来实现遍历。ForEach()接受 3 个回调函数:

  • NextFunc:类型为func (v interface {}),传入的数据不包含错误类型时走此函数处理。
  • ErrFunc:类型为func (err error),当传入的数据包含错误时走此函数;
  • CompletedFunc:类型为func ()Observable 完成时调用。

有点Promise那味了。使用ForEach(),可以将上面的示例改写为:

func main() {
  observable := rxgo.Just(1, 2, errors.New("这是一个测试错误!"), 4, 5)()
  <-observable.ForEach(func(v interface{}) {
    fmt.Println("received:", v)
  }, func(err error) {
    fmt.Println("error:", err)
  }, func() {
    fmt.Println("completed")
  })
}
$ go run main.go 
received: 1
received: 2
error: 这是一
首页 上一页 1 2 3 4 5 6 7 下一页 尾页 1/7/7
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇gRPC入门 下一篇LAL v0.35.4发布,OBS支持RTMP H2..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目