由于blog各种垃圾评论太多,而且本人审核评论周期较长,所以懒得管理评论了,就把评论功能关闭,有问题可以直接qq骚扰我

Netty 利用MessagePack传输实体对象

JAVA 西门飞冰 1148℃
[隐藏]

1.编解码技术介绍

基于Netty的NIO网络开发,我们关注的重点之一是网络传输。当进行远程跨进程服务调用时,需要把被传输的Java对象编码为字节数组或者ByteBuffer对象。而当远程服务读取到ByteBuffer对象或者字节数组时,需要将其编码为发送时的Java对象。这被称为Java对象编解码技术。

Netty 自身提供了一些编解码器如StringEncoder和StringDecoder对字符串进行编解码,ObjectEncoder和ObjectDecoder对Java对象进行编解码,但是底层使用的是Java提供的序列化技术,由于Java自带序列化缺陷较多,衍生出了多种编解码技术和框架。业界和Netty结合使用的主流编解码框架有如下四种:

  • MessagePack
  • Protobuf
  • Thrift
  • JBoss Marshalling

本篇以Netty利用MessagePack为案例进行说明:

2.MessagePack介绍

MessagePack是一个高效的二进制序列号框架,它像JSON一样支持不同语言间的数据交换。

MessagePack优点:

  • 支持跨语言,提供市面上绝大多数语言的支持;
  • 使用非常的简单;
  • 编解码高效;(相对于JDK原生的,比Protobuf差点意思)
  • 序列化之后的码流小;

MessagePack解决的是实体对象的编解码问题,如果要解决粘包/拆包问题,我们需要使用专门的粘包拆包解码器。

3.配置思路

创建客户端、服务器端Handler初始化配置类思路:

MessagePack已经把消息转换成字节序列,我们使用自定义长度解码器LengthFieldBasedFrameDecoder 进行解码,我们需要在消息字节序列前面加上一个字节长度,这个可以直接利用Netty的LengthFieldPrepender编码器来处理,该编码器作用是自动获取字节序列总长度的值,然后在字节序列前面加上长度域,长度域中填写字节序列值。

所以Hander初始化配置类中的Handler的顺序如下:

image-20221104221631604

4.代码实现

实现步骤:

1、pom.xml中增加MessagePack的依赖;

<dependency>
  <groupId>org.msgpack</groupId>
  <artifactId>msgpack</artifactId>
  <version>0.6.12</version>
</dependency>

2、创建一个实体对象,注意该对象一定要添加@Message注解;

// 传输的实体对象
@Message
public class Information
{
    //消息头
    private short header;

    //消息类型
    private byte msgtype;

    //数据内容
    private String content;

    public short getHeader()
    {
        return header;
    }

    public void setHeader(short header)
    {
        this.header = header;
    }

    public byte getMsgtype()
    {
        return msgtype;
    }

    public void setMsgtype(byte msgtype)
    {
        this.msgtype = msgtype;
    }

    public String getContent()
    {
        return content;
    }

    public void setContent(String content)
    {
        this.content = content;
    }
}

3、创建解码器(不区分客户端/服务器端),该解码器继承的是MessageToMessageDecoder,类型为ByteBuf,重写decode方法,注意其参数List<Object> list的含义,如果不指明实体,list存的是每个属性的value,如果指明实体,存的是就是实体对象的集合;

// 通过扩展MessageToMessageDecoder来自定义解码器,基于MessagePack的解码器,把byte数组反序列化为消息实体对象
public class MessagePackDecoder extends MessageToMessageDecoder<ByteBuf>
{
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception
    {
        MessagePack messagePack=new MessagePack();

        int intLength=byteBuf.readableBytes();
        byte[] raw=new byte[intLength];
        byteBuf.getBytes(byteBuf.readerIndex(),raw,0,intLength);

        list.add(messagePack.read(raw,Information.class));
    }
}

4、创建编码器,该解码器继承的是MessageToByteEncoder,类型为实体,如Information;重写encode方法;

// 通过扩展MessageToByteEncoder来自定义编码器
public class MessagePackEncoder extends MessageToByteEncoder<Information>
{
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Information information, ByteBuf byteBuf) throws Exception
    {
        MessagePack messagePack=new MessagePack();

        //进行序列化(编码)
        byte[] raw=messagePack.write(information);
        byteBuf.writeBytes(raw);
    }
}

5、创建客户端、服务器端handler,注意这里读取到的消息就直接是实体了;

客户端handler代码:

public class ClientHandler extends SimpleChannelInboundHandler<Information> {

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Information info) throws Exception
    {
        System.out.println("收到了一条信息:"+info.getContent());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
    {
        // 处理I/O事件的异常
        cause.printStackTrace();
        ctx.close();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception
    {
        //发送自定义协议格式的消息
        Information info=new Information();
        info.setHeader((short)0x5A5A);
        info.setMsgtype((byte)0x01);
        info.setContent("你好,天王盖地虎!");
        System.out.println("发送了一条消息:"+info.getContent());

        ctx.writeAndFlush(info);
    }
}

服务器端handler代码:

public class ServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)  throws Exception
    {
        //读取消息,并按照自定义的协议格式进行消息的处理
        Information in=(Information)msg;
        System.out.println("收到了一条消息:"+in.getContent());

        Information returnInfo=new Information();
        returnInfo.setHeader((short)0x3C3C);
        returnInfo.setMsgtype((byte)0x01);
        returnInfo.setContent("你好,宝塔镇河妖!");

        //发送消息
        ctx.writeAndFlush(returnInfo);

    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

6、创建客户端、服务器启动类代码,并添加相应pipeline

客户端启动类代码:

public class NettyClient {

    private String host;
    private int port;

    public EchoClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void start() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .remoteAddress(new InetSocketAddress(host, port))
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    public void initChannel(SocketChannel ch)throws Exception {
                        //自定义长度解码器LengthFieldBasedFrameDecoder
                        ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024,0,2,0,2));
                        ch.pipeline().addLast(new MessagePackDecoder());
                        ch.pipeline().addLast(new LengthFieldPrepender(2));
                        ch.pipeline().addLast(new MessagePackEncoder());
                        ch.pipeline().addLast(new ClientHandler());
                    } });
            //连接到服务端,connect是异步链接,再调用同步方法sync,等待连接成功
            ChannelFuture f = bootstrap.connect().sync();
            //阻塞直到客户端通道关闭
            f.channel().closeFuture().sync();
        } finally {
            //优雅退出,释放NIO线程组
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws Exception {
        new EchoClient("127.0.0.1", 8080).start();
    }
}

服务器端启动类代码:

public class NettyServer {

    private int port;

    public EchoServer(int port){
        this.port = port;
    }

    public void run() throws Exception{
        //配置服务端的线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workGroup = new NioEventLoopGroup();
        try{
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workGroup)

                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG,128)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024,0,2,0,2));
                        ch.pipeline().addLast(new MessagePackDecoder());
                        ch.pipeline().addLast(new LengthFieldPrepender(2));
                        ch.pipeline().addLast(new MessagePackEncoder());
                        ch.pipeline().addLast(new ServerHandler());
                    }
                });

            System.out.println("Echo 服务器启动");
            //绑定端口,同步等待成功
            ChannelFuture channelFuture =  serverBootstrap.bind(port).sync();
            //等待服务端监听端口关闭
            channelFuture.channel().closeFuture().sync();
        }finally {
            //优雅退出,释放线程池
            workGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }


    public static void main(String[] args) throws Exception {
        int port = 8080;

        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }
        new EchoServer(port).run();
    }
}

验证:先启动Netty服务端,在启动Netty客户端,验证能否将实体对象的数据发送过去

image-20221107223942817

image-20221107223952560

转载请注明:西门飞冰的博客 » Netty 利用MessagePack传输实体对象

喜欢 (0)or分享 (0)