设为首页 加入收藏

TOP

使用Netty框架完成客户端和服务端收发Protobuf消息(一)
2023-07-25 21:35:35 】 浏览:57
Tags:使用 Netty 成客户 Protobuf 消息

前言

本周继续学习尼恩编著的《Netty、Redis、ZooKeeper高并发实战》,一些资源也贴在这里,自己以后想看还可以找到,这个是在博客园的一个入口https://www.cnblogs.com/crazymakercircle/p/9904544.html。
这周主要学习了Netty客户端和服务端通信,书是由浅入深的在进行,从Socket NOI通信到 Reactor反应器模式,再到Netty框架,示例代码都在https://gitee.com/crazymaker/netty_redis_zookeeper_source_code.git 中可以看到,书结合源代码,自己在动手试验一下,感觉还是有些收获。 今天的示例代码就是实践出一个客户端和服务端传递protobuf的例子。

Netty 服务端

先来看一下服务端代码,


@Slf4j
public class ProtoBufServer {

    private final int serverPort;
    ServerBootstrap b = new ServerBootstrap();

    public ProtoBufServer(int port) {
        this.serverPort = port;
    }

    public void runServer() {
        //创建reactor 线程组
        EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerLoopGroup = new NioEventLoopGroup();

        try {
            //1 设置reactor 线程组
            b.group(bossLoopGroup, workerLoopGroup);
            //2 设置nio类型的channel
            b.channel(NioServerSocketChannel.class);
            //3 设置监听端口
            b.localAddress(serverPort);
            //4 设置通道的参数
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

            //5 装配子通道流水线
            b.childHandler(new ChannelInitializer<SocketChannel>() {
                //有连接到达时会创建一个channel
                protected void initChannel(SocketChannel ch) throws Exception {
                    // pipeline管理子通道channel中的Handler
                    // 向子channel流水线添加3个handler处理器

                    // protobufDecoder仅仅负责编码,并不支持读半包,所以在之前,一定要有读半包的处理器。
                    // 有三种方式可以选择:
                    // 使用netty提供ProtobufVarint32FrameDecoder
                    // 继承netty提供的通用半包处理器 LengthFieldBasedFrameDecoder
                    // 继承ByteToMessageDecoder类,自己处理半包

                    // 半包的处理
                    ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                    // 需要解码的目标类
                    ch.pipeline().addLast(new ProtobufDecoder(MarketPriceProto.MarketPrice.getDefaultInstance()));
                    ch.pipeline().addLast(new ProtobufBussinessHandler());
                }
            });
            // 6 开始绑定server
            // 通过调用sync同步方法阻塞直到绑定成功
            ChannelFuture channelFuture = b.bind().sync();
            log.info(" 服务器启动成功,监听端口: " +
                    channelFuture.channel().localAddress());

            // 7 等待通道关闭的异步任务结束
            // 服务监听通道会一直等待通道关闭的异步任务结束
            ChannelFuture closeFuture = channelFuture.channel().closeFuture();
            closeFuture.sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 8 优雅关闭EventLoopGroup,
            // 释放掉所有资源包括创建的线程
            workerLoopGroup.shutdownGracefully();
            bossLoopGroup.shutdownGracefully();
        }

    }

    //服务器端业务处理器
    static class ProtobufBussinessHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            MarketPriceProto.MarketPrice protoMsg = (MarketPriceProto.MarketPrice) msg;
            //经过pipeline的各个decoder,到此Person类型已经可以断定
            log.info("收到一个 MsgProtos.Msg 数据包 =》");
            log.info("protoMsg.getId():=" + protoMsg.getId());
            log.info("protoMsg.getClose():=" + protoMsg.getClose());
        }
    }


    public static void main(String[] args) throws InterruptedException {
        int port = SERVER_PORT;
        new ProtoBufServer(port).runServer();
    }
}

代码中有注释解释,我在这里加一下说明
代码中有两个EventLoopGroup bossLoopGroup和EventLoopGroup workerLoopGroup,使用两个是什么原因呢? 一个是负责处理连接监听事件, 一个负责处理数据IO事件和Handler业务处理,通俗点解释就是一个负责接客,一个负责服务客户。如果只有一个人就会忙不过来,让后面的人等很久。
b.childHandler这个就是我们具体的如何处理接收到的消息,他们都继承ChannelInboundHandlerAdapter,通过PipeLine把消息进行处理。我们从通道里面拿到的都是字节码,那么要转成我们需要的Protobuf类,就需要用到这些处理类

首页 上一页 1 2 3 下一页 尾页 1/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Spring Boot+Mybatis:实现数据库.. 下一篇Java学习九

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目