0.1、索引
https://waterflow.link/articles/1665853719750
当我们编写 HTTP 应用程序时,您可以使用 HTTP 中间件包装特定于路由的应用程序处理程序,可以在执行应用程序处理程序之前和之后执行一些常见的逻辑。 我们通常使用中间件来编写跨领域组件,例如授权、日志记录、缓存等。在 gRPC 中可以使用称为拦截器的概念来实现相同的功能。
通过使用拦截器,我们可以在客户端和服务器上拦截 RPC 方法的执行。 在客户端和服务器上,都有两种类型的拦截器:
- UnaryInterceptor(一元拦截器)
- StreamInterceptor(流式拦截器)
UnaryInterceptor 拦截一元 RPC,而 StreamInterceptor 拦截流式 RPC。
在一元 RPC 中,客户端向服务器发送单个请求并返回单个响应。 在流式 RPC 中,客户端或服务器,或双方(双向流式传输),获取一个流读取一系列消息返回,然后客户端或服务器从返回的流中读消息,直到没有更多消息为止。
1、在 gRPC 客户端中编写拦截器
我们可以在 gRPC 客户端应用程序中编写两种类型的拦截器:
- UnaryClientInterceptor:UnaryClientInterceptor 拦截客户端上一元 RPC 的执行。
- StreamClientInterceptor:StreamClientInterceptor拦截ClientStream的创建。 它可能会返回一个自定义的 ClientStream 来拦截所有 I/O 操作。
1、UnaryClientInterceptor
为了创建 UnaryClientInterceptor,可以通过提供 UnaryClientInterceptor 函数值调用 WithUnaryInterceptor 函数,该函数返回一个 grpc.DialOption 指定一元 RPC 的拦截器:
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption
然后将返回的 grpc.DialOption 值用作调用 grpc.Dial 函数以将拦截器应用于一元 RPC 的参数。
UnaryClientInterceptor func 类型的定义如下:
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts …CallOption) error
参数调用者是完成 RPC 的处理程序,调用它是拦截器的责任。 UnaryClientInterceptor 函数值提供拦截器逻辑。 这是一个实现 UnaryClientInterceptor 的示例拦截器:
clientInterceptor := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
start := time.Now().Unix()
err := invoker(ctx, method, req, reply, cc, opts...)
end := time.Now().Unix()
// 获取调用grpc的请求时长
fmt.Println("request time duration: ", end - start)
return err
}
下面的函数返回一个 grpc.DialOption 值,它通过提供 UnaryClientInterceptor 函数值来调用 WithUnaryInterceptor 函数:
func WithUnaryInterceptorCustom() grpc.DialOption {
clientInterceptor := func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
start := time.Now().Unix()
err := invoker(ctx, method, req, reply, cc, opts...)
end := time.Now().Unix()
// 获取调用grpc的请求时长
fmt.Println("request time duration: ", end - start)
return err
}
return grpc.WithUnaryInterceptor(clientInterceptor)
}
返回的 grpc.DialOption 值用作调用 grpc.Dial 函数以应用拦截器的参数:
conn, err := grpc.Dial("localhost:1234", grpc.WithInsecure(), WithUnaryInterceptorCustom())
搭建简单grpc服务可以参考这篇文章:https://waterflow.link/articles/1665674508275
2、StreamClientInterceptor
为了创建 StreamClientInterceptor,可以通过提供 StreamClientInterceptor 函数值调用 WithStreamInterceptor 函数,该函数返回一个 grpc.DialOption 指定流 RPC 的拦截器:
func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.streamInt = f
})
}
然后将返回的 grpc.DialOption 值用作调用 grpc.Dial 函数的参数,以将拦截器应用于流式 RPC。
下面是 StreamClientInterceptor func 类型的定义:
type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)
下面是StreamClientInterceptor的具体实现:
// 包装下stream
// 结构体内嵌接口,初始化的时候需要赋值对象实现了该接口的所有方法
type wrappedStream struct {
grpc.ClientStream
}
// 实现接收消息方法,并自定义打印
func (w *wrappedStream) RecvMsg(m interface{}) error {
log.Printf("====