1. 简介
本文将介绍 Go 语言中的 Weighted
并发原语,包括 Weighted
的基本使用方法、实现原理、使用注意事项等内容。能够更好地理解和应用 Weighted
来实现资源的管理,从而提高程序的稳定性。
2. 问题引入
在微服务架构中,我们的服务节点负责接收其他节点的请求,并提供相应的功能和数据。比如账户服务,其他服务需要获取账户信息,都会通过rpc请求向账户服务发起请求。
这些服务节点通常以集群的方式部署在服务器上,用于处理大量的并发请求。每个服务器都有其处理能力的上限,超过该上限可能导致性能下降甚至崩溃。
在部署服务时,通常会评估服务的并发量,并为其分配适当的资源以处理预期的请求负载。然而,在微服务架构中,存在着上游服务请求下游服务的场景。如果上游服务在某些情况下没有正确考虑并发量,或者由于某些异常情况导致大量请求发送给下游服务,那么下游服务可能面临超过其处理能力的问题。这可能导致下游服务的响应时间增加,甚至无法正常处理请求,进而影响整个系统的稳定性和可用性。下面用一个简单的代码来说明一下:
package main
import (
"fmt"
"net/http"
"sync"
)
func main() {
// 启动下游服务,用于处理请求
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
// 模拟下游服务的处理逻辑
// ...
// 完成请求处理后,从等待组中删除一个等待
wg.Done()
})
// 启动下游服务的 HTTP 服务器
http.ListenAndServe(":8080", nil)
}
这里启动一个简单的HTTP服务器,由其来模拟下游服务,来接收上游服务的请求。下面我们启动一个简单的程序,由其来模拟上游服务发送请求:
func main() {
// 创建一个等待组,用于等待所有请求完成
var wg sync.WaitGroup
// 模拟上游服务发送大量请求给下游服务
go func() {
for i := 0; i < 1000000; i++ {
wg.Add(1)
go sendRequest(&wg)
}
}()
// 等待所有请求完成
wg.Wait()
}
func sendRequest(wg *sync.WaitGroup) {
// 模拟上游服务发送请求给下游服务
resp, err := http.Get("http://localhost:8080/")
if err != nil {
fmt.Println("请求失败:", err)
} else {
fmt.Println("请求成功:", resp.Status)
}
// 请求完成后,通知等待组
wg.Done()
}
这里,我们同时启动了1000000个协程同时往HTTP服务器发送请求,如果服务器配置不够高,亦或者是请求量更多的情况下,已经超过了服务器的处理上限,服务器没有主够的资源去处理这些请求,此时将有可能直接将服务器打挂掉,服务直接不可用。在这种情况下,如果由于上游服务的问题,导致下游服务,甚至整个链路的系统都直接崩溃,这个是不合理的,此时需要有一些手段保护下游服务由于异常流量导致整个系统的崩溃。
这里对上面的场景进行分析,可以发现,此时是由于上游服务大量请求的过来,而当前服务并没有足够的资源去处理这些请求,但是并没有对其加以限制,而是继续处理,最终导致了整个系统的不可用。那么此时就应该进行限流,对并发请求量进行控制,对服务器能够处理的并发数进行合理评估,当并发请求数超过了限制,此时应该直接拒绝其访问,避免整个系统的不可用。
那问题来了,go语言中,有什么方法能够实现资源的管理,如果没有足够的资源,此时将直接返回,不对请求进行处理呢?其实go语言中有Weighted
类型,在这种场景还挺合适的。下面我们将对其进行介绍。
3. 基本使用
3.1 基本介绍
Weighted
是 Go 语言中 golang.org/x/sync
包中的一种类型,用于限制并发访问某个资源的数量。它提供了一种机制,允许调用者以不同的权重请求访问资源,并在资源可用时进行授予。
Weighted
的定义如下,提供了Acquire
,TryAcquire
,Release
三个方法:
type Weighted struct {
size int64
cur int64
mu sync.Mutex
waiters list.List
}
func (s *Weighted) Acquire(ctx context.Context, n int64) error{}
func (s *Weighted) TryAcquire(n int64) bool{}
func (s *Weighted) Release(n int64) {}
Acquire
: 以权重n
请求获取资源,阻塞直到资源可用或上下文ctx
结束。TryAcquire
: 尝试以权重n
获取信号量,如果成功则返回true
,否则返回false
,并保持信号量不变。Release
:释放具有权重n
的信号量。
3.2 权重说明
有时候,不同请求对资源的消耗是不同的。通过设置权重,你可以更好地控制不同请求对资源的使用情况。例如,某些请求可能需要更多的计算资源或更长的处理时间,你可以设置较高的权重来确保它们能够获取到足够的资源。
其次就是权重大只是代表着请求需要使用到的资源多,对于优先级并不会有作用。在Weighted
中,资源的许可是以先进先出(FIFO)的顺序分配的,而不是根据权重来决定获取的优先级。当有多个请求同时等待获取资源时,它们会按照先后顺序依次获取资源的许可。
假设先请求权重为 1 的资源,然后再请求权重为 2 的资源。如果当前可用的资源许可足够满足两个请求的总权重,那么先请求的权重为 1 的资源会先获取到许可,然后是后续请求的权重为 2 的资源。
w.Acquire(context.Background(), 1) // 权重为 1 的请求先获取到资源许可
w.Acquire(context.Background(), 2) // 权重为 2 的请求在权重为 1 的请求之后获取到资源许可
3.3 基本使用
当使用Weighted
来控制资源的并发访问时,通常需要以下几个步骤:
- 创建
Weighted
实例,定义好最大资源数 - 当需要资源时,调用
Acquire
方法占据资源 - 当处理完成之后,调用
Release
方法释放资源
下面是一个简单的代码的示例,展示了如何使用Weighted
实现资源控制:
func main() {
// 1. 创建一个信号量实例,设置最大并发数
sem := semaphore.NewWeighted(10)
// 具体处理请求的函数
handleRequest := func(id int) {
// 2. 调用Acquire尝试获取资源
err := sem.Acquire(context.Background(), 1)
if err != nil {
fmt.Printf("Goroutine %d failed to acquire resource\n", id)
}
/