由于blog各种垃圾评论太多,而且本人审核评论周期较长,所以懒得管理评论了,就把评论功能关闭,有问题可以直接qq骚扰我

Netty 编程的基础内容

JAVA 西门飞冰 860℃
[隐藏]

1.Netty介绍

官网:https://netty.io/

Netty是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。

Netty 本质是一个 NIO 框架,适用于服务器通讯相关的多种应用场景。

Netty解决的问题是:以非常轻松的方式解决各种各样的流

Spark、Flink、ElasticSearch、RocketMQ、gRPC底层网络通信都是基于Netty进行开发

使用Netty进行开发的知名项目:https://netty.io/wiki/related-projects.html

2.Netty组件介绍

2.1.EventLoop模块

(1)EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件。1个EventLoop可以服务多个Channel,1个Channel只有一个EventLoop,可以创建多个 EventLoop 来优化资源利用,也就是EventLoopGroup

2)EventLoopGroup 负责分配 EventLoop 到新创建的 Channel,里面包含多个EventLoop

他们的对应关系,如下图所示:

image-20221111161630711

2.1.1.Reactor线程组

服务器端,一般设置两个线程组,监听连接的 parent channel 工作在一个独立的线程组,一般被命名为boss线程组。

连接成功后,负责客户端连接读写的 child channel 工作在另一个线程组,一般被命名为worker线程组。

image-20221111162414956

默认线程数量(cpu核心数 * 2)

2.1.2.Channel模块

Channel: 客户端和服务端建立的一个连接通道

ChannelHandler:用来处理 Channel 上的各种事件,分为入站、出站两种。

ChannelPipeline:所有 ChannelHandler 被连成一串,就是 Pipeline

ChannelHandlerContext:是连接ChannelHandler和ChannelPipeline的桥梁

他们是什么关系:

一个Channel包含一个ChannelPipeline,所有ChannelHandler都会顺序加入到ChannelPipeline中

创建Channel时会自动创建一个ChannelPipeline,每个Channel都有一个管理它的pipeline,这关联是永久性的

Channel当状态出现变化,就会触发对应的事件

状态:

(1)channelRegistered: channel注册到一个EventLoop

(2)channelActive: 变为活跃状态(连接到了远程主机),可以接受和发送数据

(3)channelInactive: channel处于非活跃状态,没有连接到远程主机

(4)channelUnregistered: channel已经创建,但是未注册到一个EventLoop里面,也就是没有和Selector绑定

ChannelHandler下主要是两个子接口:

ChannelInboundHandler:(入站)

  • 处理输入数据和Channel状态类型改变,
  • 适配器 ChannelInboundHandlerAdapter(适配器设计模式)
  • 常用的:SimpleChannelInboundHandler

ChannelOutboundHandler:(出站)

  • 处理输出数据,适配器 ChannelOutboundHandlerAdapter

2.1.3.Bootstrap模块

Bootstrap 意思是引导,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组件,NettyBootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类。

启动一个Bootstrap,大致有8步,如下图:

image-20221111163025863

2.1.4.ChannelFuture模块

ChannelFuture的作用是用来保存Channel异步操作的结果。

我们知道,在Netty中所有的I/O操作都是异步的。这意味着任何的I/O调用都将立即返回,而不保证这些被请求的I/O操作在调用结束的时候已经完成。取而代之地,你会得到一个返回的ChannelFuture实例,这个实例将给你一些关于I/O操作结果或者状态的信息。

3.Netty的Hello World 案例

Netty Maven依赖配置

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.74.Final</version>
</dependency>

服务器端启动类:

// 服务器端启动类
public class AppServerHello {
    private int port;

    public AppServerHello(int port) {
        this.port = port;
    }

    // 启动流程
    public void run() throws InterruptedException {
        // 配置服务端线程组(Netty的Reactor线程池,初始化了一个NioEventLoop数组,用来处理I/O操作,如接受新的连接和读/写数据)
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try {
            // ServerBootstrap Netty启动引导类,传入boss和group两个线程组启动
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup,workGroup)
                    // 通过工厂方法设计模式实例化一个channel
                    .channel(NioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(port))
                    // childHandler 指定处理数据的工序
                    // ChannelInitializer 是一个特殊的处理类,他的目的是帮助使用者配置一个新的 Channel
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            // 添加我们自定义的Handler
                            ch.pipeline().addLast(new HandlerServerHello());
                        }
                    });

            // 绑定服务器和端口:该实例将提供有关IO操作的结果或状态的信息
            ChannelFuture channelFuture = b.bind().sync();

            System.out.println("在" + channelFuture.channel().localAddress()+"上开启监听");
            // 阻塞操作,closeFuture()开启了一个channel的监听器(这期间channel在进行各项工作),直到链路断开
            channelFuture.channel().closeFuture().sync();
        } finally {
            // 优雅退出,释放线程池
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new AppServerHello(18080).run();
    }
}

服务器端Handler类:

// 服务器端I/O处理类
@ChannelHandler.Sharable
public class HandlerServerHello extends ChannelInboundHandlerAdapter {
    // 重写数据读取方法
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //处理收到的数据,并反馈消息到到客户端
        ByteBuf in = (ByteBuf) msg;
        System.out.println("收到客户端发过来的消息: " + in.toString(CharsetUtil.UTF_8));

        //写入并发送信息到远端(客户端)
        ctx.writeAndFlush(Unpooled.copiedBuffer("你好,我是服务端,我已经收到你发送的消息", CharsetUtil.UTF_8));
    }

    // 重写异常处理的方法
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //出现异常的时候执行的动作(打印并关闭通道)
        cause.printStackTrace();
        ctx.close();
    }
}

客户端启动类:

public class AppClientHello {
    private final String host;
    private final int port;

    // 设置服务器端端IP和端口
    public AppClientHello(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void run() throws InterruptedException {
        // 配置客户端线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            // 配置客户端辅助启动类,启动客户端线程组
            Bootstrap bs = new Bootstrap();
            bs.group(group)
                    // 实例化一个Channel
                    .channel(NioSocketChannel.class)
                    .remoteAddress(new InetSocketAddress(host,port))
                    // 指明处理数据的工序
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            // 添加我们自定义的handler
                            socketChannel.pipeline().addLast(new HandlerClientHello());
                        }
                    });
            // 连接到服务端,connect是异步连接,在调用同步等待sync,等待连接成功
            ChannelFuture future = bs.connect().sync();
            future.channel().writeAndFlush(Unpooled.copiedBuffer("Hello World", CharsetUtil.UTF_8));
            // 阻塞操作,closeFuture()开启了一个channel的监听器(这期间channel在进行各项工作),直到链路断开
            future.channel().closeFuture().sync();
        } finally {
            // 优雅退出,释放NIO线程组
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new AppClientHello("127.0.0.1",18080).run();
    }
}

客户端Handler类:

// 客户端 I/O处理类
@ChannelHandler.Sharable
public class HandlerClientHello extends SimpleChannelInboundHandler<ByteBuf>
{
    // 重写数据读取的方法
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception
    {
        System.out.println("接收到的消息:"+byteBuf.toString(CharsetUtil.UTF_8));
    }

    // 重写异常处理的方法
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
    {
        cause.printStackTrace();
        ctx.close();
    }
}

程序测试:先启动Server端在启动Client端

服务端输出如下:

image-20221105145055472

客户端输出如下:

image-20221105145110508

4.Netty 开发的基本流程

netty外边的启动类模版都是一模一样的,工作中也就是编写handler逻辑,加入到Pipeline中

客户端和服务端端开发流程基本一致,都是创建handler,然后创建服务启动类

image-20221104143602720

5.Netty连接及活动状态监测

当Channel状态出现变化,就会触发对应的事件,我们可以根据Channel状态的变化执行对应的操作

所有的状态,只需要在客户端和服务端的Handler中修改即可

image-20221104154524802

代码示例:服务端和客户端启动类代码和上述案例一致,所以只粘贴Server和Client的Handler代码

HandlerServer:

// 服务器端I/O处理类,连接监测
@ChannelHandler.Sharable
public class HandlerServerMonitorEvent extends ChannelInboundHandlerAdapter {
    //通道数组,保存所有注册到EventLoop的通道
    public static ChannelGroup channels=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)  throws Exception
    {
        //处理收到的数据,并反馈消息到到客户端
        ByteBuf in = (ByteBuf) msg;
        System.out.println("收到客户端发过来的消息: " + in.toString(CharsetUtil.UTF_8));

        //写入并发送信息到远端(客户端)
        SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss SSS");//设置日期格式
        String strDate=df.format(new Date());
        ctx.writeAndFlush(Unpooled.copiedBuffer("这是服务器端在Read方法中反馈的消息 "+strDate+"\r\n", CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
    {
        //出现异常的时候执行的动作(打印并关闭通道)
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception
    {
        //新建立连接时触发的动作
        Channel incoming=ctx.channel();
        System.out.println("客户端:"+incoming.remoteAddress()+"已连接上来");
        channels.add(incoming);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception
    {
        //连接断开时触发的动作
        Channel incoming=ctx.channel();
        System.out.println("客户端:"+incoming.remoteAddress()+"已断开");
        channels.remove(incoming);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception
    {
        //通道处于活动状态触发的动作,该方法只会在通道建立时调用一次
        Channel incoming=ctx.channel();
        System.out.println("客户端:"+incoming.remoteAddress()+"在线");

        //写入并发送信息到远端(客户端)
        SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss SSS");//设置日期格式
        String strDate=df.format(new Date());
        ctx.writeAndFlush(Unpooled.copiedBuffer("这是服务器端在Active方法中反馈的消息 "+strDate+"\r\n", CharsetUtil.UTF_8));
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception
    {
        //通道处于非活动状态触发的动作,该方法只会在通道失效时调用一次
        Channel incoming=ctx.channel();
        System.out.println("客户端:"+incoming.remoteAddress()+"掉线");
    }
}

HandlerClient:

// 客户端 I/O处理类
@ChannelHandler.Sharable
public class HandlerClientMonitorEvent extends SimpleChannelInboundHandler<ByteBuf>
{
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception
    {
        // 处理接收到的消息
        System.out.println("接收到的消息:"+byteBuf.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
    {
        // 处理I/O事件的异常
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception
    {
        //建立连接后该channelActive()方法只会被调用一次,这里的逻辑:建立连接后,字节序列被发送到服务器,编码格式是utf-8
        SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss SSS");//设置日期格式
        String strDate=df.format(new Date());
        ctx.writeAndFlush(Unpooled.copiedBuffer("这是客户端通过Active方法发送的消息 "+strDate+"\r\n", CharsetUtil.UTF_8));
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception
    {
        //通道处于非活动状态触发的动作,该方法只会在通道失效时调用一次
        Channel incoming=ctx.channel();
        System.out.println("服务器端掉线");
    }
}

测试结果:

(1)测试客户端连接和掉线状态

image-20221105151411320

(2)测试服务端掉线

image-20221105151514712

 

转载请注明:西门飞冰的博客 » Netty 编程的基础内容

喜欢 (0)or分享 (0)