由于blog各种垃圾评论太多,而且本人审核评论周期较长,所以懒得管理评论了,就把评论功能关闭,有问题可以直接qq骚扰我

Netty 的网络可靠性保障

JAVA 西门飞冰 58℃
[隐藏]

1.介绍

Netty作为一个基础的NIO通信框架,被广泛应用于大数据处理、互联网消息中间件等场景。这些应用场景都是分布式场景,总结就是把一个服务的不同角色在分散在不同的服务器上。各个角色在运行过程中都需要通过Netty进行数据或者参数的传递,这个过程一定离不开网络。可以把网络理解成联通每一个城市(服务器、仓储计算节点)的高速公路,通过这条高速公路可以看到城市间的协调联通,也可以看到它们之间都有哪些协作的工作,从而更深的理解城市间的关系。

分布式架构下对网络通信的可靠性要求非常高,一旦Netty运行环境的网络出现问题(例如网卡,交换机故障等,这时底层的TCP连接会断开),但是应用程序没有感知到,就有可能导致业务中断。这就要求Netty自身的可靠性和网络环境足够好。

Netty在网络可靠性这个层面常用的机制有超时检测、心跳机制、断线重连等手段,来应对网络通信故障。可以让开发人员检测到具体的状态后,进行相应的处理。

生产环境中,靠Netty的断线重连机制,基本可以搞定95%的网络通信问题。要是断线重连一直不行,就需要根据实际业务情况考虑处理的方式了。

2.超时检测

超时分三种:读超时、写超时、读写超时;

Netty中实现超时检测相当方便,只需做两步工作就能优雅的实现超时检测(客户端、服务器端实现步骤一样):

1)Handler中重写userEventTriggered方法

2)在Pipeline中增加IdleStateHandler实例,该实例的构造函数有四个:

  • readerIdleTime:读超时,设置为0是,表示不启用
  • writeIdleTime:写超时,设置为0是,表示不启用
  • allIdleTime:读写超时,设置为0是,表示不启用
  • TimeUnit:时间单位,该参数可不设置,默认是秒

代码配置示例:

image-20221104212754977

3.心跳机制

为什么要有心跳机制?

(1)发现长期不用的连接就关闭掉,减轻服务器的连接压力;

(2)检测异常连接,在很多异常场合,表面上看连接还存在,其实已经挂掉,所以一般发送一个空数据包来测试一下链路是否有效。

如何建立心跳机制?

很简单,利用超时检测,检测到超时后就向对方发送一个心跳包。

超时和心跳实现既可以在客户端,也可以在服务器端,根据实际场景来定。

严格来说这是一种心律不齐的心跳机制,后面课程会讲到如何利用定时任务来实现真正的心跳机制。

代码示例:在Handler类重写的userEventTriggered方法中,添加心跳检测代码,因为要根据超时来发送心跳

image-20221107214217943

在客户端启动类中添加如下代码,模拟超时消息(上面设置的读超时是5秒,这里设置6秒发送一条消息)

image-20221105161720891

验证:

服务端心跳消息接收:

image-20221105162028019

客户端超时输出:

image-20221105161930674

4.断线重连

说明:这个动作只能由客户端发起

为断线重连需要考虑到两种情况?

(1)客户端启动时,服务器端还未启动或已崩掉,需要客户端自动不断发起连接,而不是退出程序;

(2)服务器端正常,但客户端断开连接了,这时也需要自动发起重新连接请求。

断线重连实现步骤:

(1)客户端的连接由同步阻塞改为异步阻塞(去掉连接的sync())

image-20221105200540258

(2)创建监听类,该类实现ChannelFutureListener接口,在该类中发起重连。

image-20221104213018016

(3)在ChannelFuture的实例中增加监听,把我们自定义的监听类添加进去。

image-20221105200523818

(4)在handler中的channelInactive方法中增加启动类运行的代码

image-20221105200438138

测试验证:在正常通信中关闭服务端在启动,验证客户端可以实现断线重连功能

image-20221105200755544

5.三个内容整体代码

服务端启动类:

public class AppServerReconnHeart
{
    private int port;

    public AppServerReconnHeart(int port)
    {
        this.port = port;
    }

    public void run() throws Exception
    {
        // Netty的Reactor线程池
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            //启动NIO服务
            ServerBootstrap b = new ServerBootstrap();
            b.group(group)
                    .channel(NioServerSocketChannel.class) 
                    .localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        //ChannelInitializer是一个特殊的处理类,他的目的是帮助使用者配置一个新的Channel,用于把许多自定义的处理类增加到pipline上来
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new HandlerServerReconnHeart());
                        }
                    });

            //绑定服务器
            ChannelFuture channelFuture= b.bind().sync();
            System.out.println("在" + channelFuture.channel().localAddress()+"上开启监听");

            //阻塞操作
            channelFuture.channel().closeFuture().sync();
        } finally {
            // 优雅关闭
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args)  throws Exception
    {
        new AppServerReconnHeart(18080).run();
    }
}

服务端Handler类:

@ChannelHandler.Sharable
public class HandlerServerReconnHeart extends ChannelInboundHandlerAdapter
{
    //通道数组,保存所有注册到EventLoop的通道
    public static ChannelGroup channels=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)  throws Exception
    {
        //处理收到的数据,并反馈消息到到客户端
        ByteBuf in = (ByteBuf) msg;
        System.out.println("收到客户端发过来的消息: " + in.toString(CharsetUtil.UTF_8));

        //写入并发送信息到远端(客户端)
        SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss SSS");//设置日期格式
        String strDate=df.format(new Date());
        ctx.writeAndFlush(Unpooled.copiedBuffer("这是服务器端在Read方法中反馈的消息 "+strDate+"\r\n", CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
    {
        //出现异常的时候执行的动作(打印并关闭通道)
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception
    {
        //新建立连接时触发的动作
        Channel incoming=ctx.channel();
        System.out.println("客户端:"+incoming.remoteAddress()+"已连接上来");
        channels.add(incoming);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception
    {
        //连接断开时触发的动作
        Channel incoming=ctx.channel();
        System.out.println("客户端:"+incoming.remoteAddress()+"已断开");
        channels.remove(incoming);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception
    {
        //通道处于活动状态触发的动作,该方法只会在通道建立时调用一次
        Channel incoming=ctx.channel();
        System.out.println("客户端:"+incoming.remoteAddress()+"在线");

        //写入并发送信息到远端(客户端)
        SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss SSS");//设置日期格式
        String strDate=df.format(new Date());
        ctx.writeAndFlush(Unpooled.copiedBuffer("这是服务器端在Active方法中反馈的消息 "+strDate+"\r\n", CharsetUtil.UTF_8));
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception
    {
        //通道处于非活动状态触发的动作,该方法只会在通道失效时调用一次
        Channel incoming=ctx.channel();
        System.out.println("客户端:"+incoming.remoteAddress()+"掉线");
    }
}

客户端启动类:

public class AppClientReconnHeart
{
    private final String host;
    private final int port;

    public AppClientReconnHeart(String host, int port)
    {
        this.host = host;
        this.port = port;
    }

    public void run() throws Exception
    {
        // I/O线程池
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            // 客户端辅助启动类
            Bootstrap bs = new Bootstrap();
            bs.group(group)
                     // 实例化一个Channel
                    .channel(NioSocketChannel.class)
                    .remoteAddress(new InetSocketAddress(host,port))
                    .handler(new ChannelInitializer<SocketChannel>()//进行通道初始化配置
                    {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception
                        {
                            // 超时检测handler
                            socketChannel.pipeline().addLast(new IdleStateHandler(5,0,0, TimeUnit.SECONDS));
                            // 我们自定义的Handler
                            socketChannel.pipeline().addLast(new HandlerClientReconnHeart());
                        }
                    });

            //连接到远程节点;等待连接完成
            ChannelFuture future=bs.connect();
            future.addListener(new ListenerClientReconnHeart());

            //每隔6秒,自动向服务器发送一条消息
            while (true)
            {
                Thread.sleep(6000);
                SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
                String strDate=df.format(new Date());
                future.channel().writeAndFlush(Unpooled.copiedBuffer(strDate, CharsetUtil.UTF_8));
            }

        } finally {
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws Exception
    {
        new AppClientReconnHeart("127.0.0.1",18080).run();
    }
}

客户端Handler类:

@ChannelHandler.Sharable
public class HandlerClientReconnHeart extends SimpleChannelInboundHandler<ByteBuf>
{
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception
    {
        // 处理接收到的消息
        System.out.println("接收到的消息:"+byteBuf.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
    {
        // 处理I/O事件的异常
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception
    {
        //建立连接后该channelActive()方法只会被调用一次,这里的逻辑:建立连接后,字节序列被发送到服务器,编码格式是utf-8
        SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss SSS");//设置日期格式
        String strDate=df.format(new Date());
        ctx.writeAndFlush(Unpooled.copiedBuffer("这是客户端通过Active方法发送的消息 "+strDate+"\r\n", CharsetUtil.UTF_8));
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception
    {
        //通道处于非活动状态触发的动作,该方法只会在通道失效时调用一次
        Channel incoming=ctx.channel();
        System.out.println("掉线");

        //检测到掉线后,重新开始连接
        new AppClientReconnHeart("127.0.0.1",18080).run();

    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception
    {
        //超时检测
        if (evt instanceof IdleStateEvent)
        {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE)
            {
                System.out.println("客户端读消息包超时");

                //检测到读超时,就向服务器端发送一个消息包
                SimpleDateFormat df = new SimpleDateFormat("HH:mm:ss SSS");//设置日期格式
                String strDate = df.format(new Date());
                ctx.writeAndFlush(Unpooled.copiedBuffer("我是心跳消息 " + strDate + "\r\n", CharsetUtil.UTF_8));
            }
        }
    }
}

监听类:

// 通过接口ChannelFutureListener来实现客户端的自动重连
public class ListenerClientReconnHeart implements ChannelFutureListener
{
    private  AppClientReconnHeart appClientReconnHeart=new AppClientReconnHeart("127.0.0.1",18080);
    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception
    {
        if(!channelFuture.isSuccess())
        {
            EventLoop loop=channelFuture.channel().eventLoop();
            ScheduledFuture<?> schedule=loop.schedule(new Runnable()
            {
                @Override
                public void run()
                {
                    try {
                        System.out.println(">>>自动启动客户端,开始连接服务器...");
                        appClientReconnHeart.run();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            },5, TimeUnit.SECONDS);
        }
        else
        {
            System.out.println(">>>服务器连接成功...");
        }
    }
}

转载请注明:西门飞冰的博客 » Netty 的网络可靠性保障

喜欢 (0)or分享 (0)