消息接收端使用自定义长度解码器帧解决粘包问题的解决办法

在数据的头部声明数据的长度,通过长度获取数据

将消息分为头部和主体。头部包含身体长度的字段。一般head的第一个字段用一个int值来表示body的长度;

指定消息的长度。如果不足,请填空。读取时,按规定长度读取。比如100个字节,不够就填空格;使用更复杂的应用层协议。使用 LineBasedFrameDecoder

LineBasedFrameDecoder 是 Netty 内置的解码器,对应的编码器是 LineEncoder。

原理就是上面提到的第一个思路,在数据末尾加一个特殊符号来标识边界。默认是使用换行符\n。

用法很简单,发送方添加编码器:

@Override
protected void initChannel(SocketChannel ch) throws Exception {
 //添加编码器,使用默认的符号\n,字符集是UTF-8
    ch.pipeline().addLast(new LineEncoder(LineSeparator.DEFAULT, CharsetUtil.UTF_8));
    ch.pipeline().addLast(new TcpClientHandler());
}

接收器加解码器:

@Override
protected void initChannel(SocketChannel ch) throws Exception {
 //解码器需要设置数据的最大长度,我这里设置成1024
 ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
 //给pipeline管道设置业务处理器
 ch.pipeline().addLast(new TcpServerHandler());
}

然后在发送方,在发送消息时在末尾添加标识符:

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    for (int i = 1; i <= 5; i++) {
  //在末尾加上默认的标识符\n
     ByteBuf byteBuf = Unpooled.copiedBuffer("msg No" + i + StringUtil.LINE_FEED, Charset.forName("utf-8"));
        ctx.writeAndFlush(byteBuf);
 }
}

于是我们再次启动服务端和客户端,在服务端的控制台上可以看到:

在数据末尾添加特殊符号标识数据包的边界,解决了粘连和拆包的问题。

注意:数据的结尾必须是分隔符。分隔符后不要添加数据,否则会作为下一个数据的开始。这是错误演示:

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    for (int i = 1; i <= 5; i++) {
  //在末尾加上默认的标识符\n
     ByteBuf byteBuf = Unpooled.copiedBuffer("msg No" + i + StringUtil.LINE_FEED + "[我是分隔符后面的字符串]", Charset.forName("utf-8"));
        ctx.writeAndFlush(byteBuf);
 }
}

服务器的控制台会看到这样的打印信息:

使用自定义长度帧解码器

使用这个解码器解决粘包问题的原理就是上面提到的第二种,在数据的头部声明数据的长度,根据长度获取数据。这个decoder构造函数需要定义5个参数,比较复杂。我们先来看看参数的解释:

前三个参数比较简单,可以用下图来演示:

校正偏移是什么意思?

假设你的length字段中设置的值除了有效数据的长度外还包括其他字段的长度,那么你必须设置这个值进行修正,否则解码器将无法获取到有效数据。

要丢弃的起始字节数。这个比较简单,就是丢弃这个索引值前面的数据,只要后面的数据就行。通常,长度字段中的数据会被丢弃。当然,如果要获取所有数据,请将其设置为 0。

下面是在消息接收端使用自定义长度帧解码器来解决粘包问题:

@Override
protected void initChannel(SocketChannel ch) throws Exception {
 //数据包最大长度是1024
    //长度域的起始索引是0
    //长度域的数据长度是4
    //矫正值为0,因为长度域只有 有效数据的长度的值
    //丢弃数据起始值是4,因为长度域长度为4,我要把长度域丢弃,才能得到有效数据
    ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
    ch.pipeline().addLast(new TcpClientHandler());
}

然后编写发送方代码,根据解码器设置发送:

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
 for (int i = 1; i <= 5; i++) {
     String str = "msg No" + i;
        ByteBuf byteBuf = Unpooled.buffer(1024);
        byte[] bytes = str.getBytes(Charset.forName("utf-8"));
        //设置长度域的值,为有效数据的长度
        byteBuf.writeInt(bytes.length);
        //设置有效数据
        byteBuf.writeBytes(bytes);
        ctx.writeAndFlush(byteBuf);
    }
}

然后启动服务器,客户端,我们可以看到控制台打印结果:

可以看出,粘包问题是通过使用自定义长度帧解码器解决的。

使用 Google Protobuf 编解码器

Netty官网明确表示支持Google Protobuf,如下图所示:

什么是 Google Protobuf

官网原话:Protocol buffers 是 Google 的语言中立、平台中立、可扩展的结构化数据序列化机制——想想 XML,但更小、更快、更简单。您只需定义一次数据的结构化方式,然后就可以使用特殊生成的源代码轻松地将结构化数据写入和读取各种数据流,并使用各种语言。

翻译:协议缓冲区是 Google 的一种独立于语言、独立于平台且可扩展的数据序列化机制,类似于 XML,但更小、更快、更简单。您只需定义一次数据的结构方式,然后就可以使用专门生成的源代码轻松地将结构化数据写入和读取到各种数据流中,支持多种语言。

它可以用于许多场景,例如rpc或tcp通信。一般来说,如果客户端和服务端使用不同的语言,那么在服务端定义一个数据结构,通过protobuf转换成字节流,然后发送给客户端解码得到对应的数据结构。这就是protobuf的神奇之处。而且它的通信效率极高,“用protobuf序列化的消息数据大小是json的1/10,xml格式的1/20,二进制序列化的1/10 ”。

谷歌 Protobuf 官网:

为什么使用 Google Protobuf

在某些场景下,数据需要在不同的平台、不同的程序中传输和使用。例如,一个消息是由一个 C++ 程序生成的,另一个程序是用 java 编写的。前者生成消息数据时,需要在不同语言编写的不同程序中进行操作,如何发送消息并在各个程序中使用?这就需要设计一种消息格式,常用的有json和xml,后来出现了protobuf。

Google Protobuf 的优势 Google Protobuf 安装

因为我这里是Mac系统,Mac下除了使用dmg和pkg安装软件外,使用brew命令安装更方便,可以帮助安装其他需要的依赖,从而减少不必要的麻烦。

安装最新版本的protoc

从 github 下载 protobuf3

Mac系统选择第一个,如下图:

下载成功后切换到root用户

sudo -i

解压压缩包,进入自己解压的目录

tar xyf protobuf-all-3.13.0.tar.gz
cd protobuf-3.13.0

设置构建目录

./configure --prefix=/usr/local/protobuf

安装

make
make install

配置环境变量

第 1 步:找到 .bash_profile 文件并进行编辑

cd ~
open .bash_profile

第二步:然后在打开的 bash_profile 文件末尾添加如下配置:

export PROTOBUF=/usr/local/protobuf 
export PATH=$PROTOBUF/bin:$PATH

第三步:source 使文件生效

source .bash_profile

测试安装结果

protoc --version

使用 Google Protobuf

以下步骤参考 Google Protobuf 的 github 项目的指南。

第一步:添加maven依赖


  com.google.protobuf
  protobuf-java
  3.11.0

第二步:编写proto文件Message.proto

如何编写.proto文件相关文档,可以去官网看下面的例子,请看演示:

syntax = "proto3"; //版本
option java_outer_classname = "MessagePojo";//生成的外部类名,同时也是文件名
message Message {
    int32 id = 1;//Message类的一个属性,属性名称是id,序号为1
    string content = 2;//Message类的一个属性,属性名称是content,序号为2
}

第 3 步:使用编译器从 .proto 文件生成代码

执行完以上安装步骤后,进入bin目录,可以看到一个可执行文件protoc

cd /usr/local/protobuf/bin/

然后将之前写好的Message.proto文件复制到这个目录下,如图:

输入命令:

protoc --java_out=. Message.proto

然后就可以看到生成的 MessagePojo.java 文件了。最后,将文件复制到 IDEA 项目中。

第 4 步:在发送方添加编码器,在接收方添加解码器

客户端添加一个编码器来对消息进行编码。

@Override
protected void initChannel(SocketChannel ch) throws Exception {
 //在发送端添加Protobuf编码器
    ch.pipeline().addLast(new ProtobufEncoder());
 ch.pipeline().addLast(new TcpClientHandler());
}

服务器添加一个解码器来解码消息。

@Override
protected void initChannel(SocketChannel ch) throws Exception {
 //添加Protobuf解码器,构造器需要指定解码具体的对象实例
 ch.pipeline().addLast(new ProtobufDecoder(MessagePojo.Message.getDefaultInstance()));
 //给pipeline管道设置处理器
 ch.pipeline().addLast(new TcpServerHandler());
}

第 5 步:发送消息

客户端发送消息,代码如下:

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
 //使用的是构建者模式进行创建对象
 MessagePojo.Message message = MessagePojo
      .Message
            .newBuilder()
            .setId(1)
            .setContent("一角钱,起飞~")
            .build();
    ctx.writeAndFlush(message);
}

服务器接收数据并打印:

@Override
protected void channelRead0(ChannelHandlerContext ctx, MessagePojo.Message messagePojo) throws Exception {
    System.out.println("id:" + messagePojo.getId());
    System.out.println("content:" + messagePojo.getContent());
}

测试结果正确:

分析协议的粘贴和拆包

其实直接使用Protocol编解码器还是有粘包的问题。

为了证明这一点,发送者循环发送 100 条“一毛钱,起飞”消息一百次。请看发件人代码演示:

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
 for (int i = 1; i <= 100; i++) {
  MessagePojo.Message message = MessagePojo
      .Message
            .newBuilder()
            .setId(i)
            .setContent(i + "号一角钱,起飞~")
            .build();
      ctx.writeAndFlush(message);
 }
}

此时,启动服务器和客户端后,可能只打印几条消息,或者在控制台上可能会看到以下错误:

com.google.protobuf.InvalidProtocolBufferException:解析协议消息时,输入意外在字段中间结束。这可能意味着输入已被截断,或者嵌入的消息误报了自己的长度。

含义:解析协议消息时,输入意外在字段中间结束。这可能意味着输入被截断,或者嵌入的消息误报了自己的长度。

其实就是粘包问题。多条数据合并为一条数据,导致解析异常。

解决Protocol的粘贴和拆包问题

只需将编码器 ProtobufVarint32LengthFieldPrepender 添加到发送方

@Override
protected void initChannel(SocketChannel ch) throws Exception {
 ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
    ch.pipeline().addLast(new ProtobufEncoder());
    ch.pipeline().addLast(new TcpClientHandler());
}

接收器加解码器 ProtobufVarint32FrameDecoder

@Override
protected void initChannel(SocketChannel ch) throws Exception {
 ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
 ch.pipeline().addLast(new ProtobufDecoder(MessagePojo.Message.getDefaultInstance()));
 //给pipeline管道设置处理器
 ch.pipeline().addLast(new TcpServerHandler());
}

然后再启动服务端和客户端,可以看到正常了~

ProtobufVarint32LengthFieldPrepender 编码器的工作原理如下:

 * BEFORE ENCODE (300 bytes)       AFTER ENCODE (302 bytes)
 * +---------------+               +--------+---------------+
 * | Protobuf Data |-------------->| Length | Protobuf Data |
 * |  (300 bytes)  |               | 0xAC02 |  (300 bytes)  |
 * +---------------+               +--------+---------------+
@Sharable
public class ProtobufVarint32LengthFieldPrepender extends MessageToByteEncoder {
    @Override
    protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
        int bodyLen = msg.readableBytes();
        int headerLen = computeRawVarint32Size(bodyLen);
        //写入请求头,消息长度
        out.ensureWritable(headerLen + bodyLen);
        writeRawVarint32(out, bodyLen);
        //写入数据
        out.writeBytes(msg, msg.readerIndex(), bodyLen);
    }
}

ProtobufVarint32FrameDecoder 解码器的工作原理如下:

 * BEFORE DECODE (302 bytes)       AFTER DECODE (300 bytes)
 * +--------+---------------+      +---------------+
 * | Length | Protobuf Data |----->| Protobuf Data |
 * | 0xAC02 |  (300 bytes)  |      |  (300 bytes)  |
 * +--------+---------------+      +---------------+
public class ProtobufVarint32FrameDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {
        //标记读取的下标位置
        in.markReaderIndex();
        //获取读取的下标位置
        int preIndex = in.readerIndex();
        //解码,获取消息的长度,并且移动读取的下标位置
        int length = readRawVarint32(in);
        //比较解码前和解码后的下标位置,如果相等。表示字节数不够读取,跳到下一轮
        if (preIndex == in.readerIndex()) {
            return;
        }
        //如果消息的长度小于0,抛出异常
        if (length < 0) {
            throw new CorruptedFrameException("negative length: " + length);
        }
        //如果不够读取一个完整的数据,reset还原下标位置。
        if (in.readableBytes() < length) {
            in.resetReaderIndex();
        } else {
            //否则,把数据写入到out,接收端就拿到了完整的数据了
            out.add(in.readRetainedSlice(length));
        }
 }

总结:

Netty的心跳检测机制是什么?

所谓心跳,就是客户端和服务端在TCP长连接中周期性发送的特殊数据包,通知对方自己仍然在线,保证TCP连接的有效性。

注意:心跳包还有一个功能,经常被忽略,那就是:如果一个连接长时间不使用,防火墙或路由器会断开连接。

在Netty中,实现心跳机制的关键是IdleStateHandler,看它的构造函数:

public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
 this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}

三个参数的含义如下:

注意:这三个参数的默认时间单位是秒。如果需要指定其他时间单位,可以使用另一个构造函数:

public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
 this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}

实现Netty服务端心跳检测机制,需要在服务端ChannelInitializer中添加如下代码:

pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));

Netty心跳源码分析

初步看一下IdleStateHandler的源码编码器的意思是什么,先看一下IdleStateHandler中的channelRead方法:

红框代码其实表示该方法只是透传,不做任何业务逻辑处理,让channelPipe中的next handler处理channelRead方法;

我们再来看看channelActive方法:

有一个initialize方法,就是IdleStateHandler的精髓,再探索:

一个任务,ReaderIdleTimeoutTask,将在这里被触发。本任务中run方法的源码如下:

第一个红框代码是当前时间减去最后一次调用channelRead方法的时间。如果结果是 6s,则表示最后一次调用 channelRead 已经是 6s 前了。你设置为5s,那么nextDelay就是-1。如果超时,那么第二个红框代码会触发下一个handler的userEventTriggered方法:

如果没有超时,则不会触发 userEventTriggered 方法。

Netty 心跳检测代码示例服务器

package com.niuh.netty.heartbeat;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
public class HeartBeatServer {
    public static void main(String[] args) throws Exception {
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(boss, worker)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast("decoder", new StringDecoder());
                            pipeline.addLast("encoder", new StringEncoder());
                            //IdleStateHandler的readerIdleTime参数指定超过3秒还没收到客户端的连接,
                            //会触发IdleStateEvent事件并且交给下一个handler处理,下一个handler必须
                            //实现userEventTriggered方法处理对应事件
                            pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
                            pipeline.addLast(new HeartBeatServerHandler());
                        }
                    });
            System.out.println("netty server start。。");
            ChannelFuture future = bootstrap.bind(9000).sync();
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            worker.shutdownGracefully();
            boss.shutdownGracefully();
        }
    }
}

服务器端回调处理类

package com.niuh.netty.heartbeat;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleStateEvent;
public class HeartBeatServerHandler extends SimpleChannelInboundHandler {
    int readIdleTimes = 0;
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
        System.out.println(" ====== > [server] message received : " + s);
        if ("Heartbeat Packet".equals(s)) {
            ctx.channel().writeAndFlush("ok");
        } else {
            System.out.println(" 其他信息处理 ... ");
        }
    }
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        IdleStateEvent event = (IdleStateEvent) evt;
        String eventType = null;
        switch (event.state()) {
            case READER_IDLE:
                eventType = "读空闲";
                readIdleTimes++; // 读空闲的计数加1
                break;
            case WRITER_IDLE:
                eventType = "写空闲";
                // 不处理
                break;
            case ALL_IDLE:
                eventType = "读写空闲";
                // 不处理
                break;
        }
        System.out.println(ctx.channel().remoteAddress() + "超时事件:" + eventType);
        if (readIdleTimes > 3) {
            System.out.println(" [server]读空闲超过3次,关闭连接,释放更多资源");
            ctx.channel().writeAndFlush("idle close");
            ctx.channel().close();
        }
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.err.println("=== " + ctx.channel().remoteAddress() + " is active ===");
    }
}

客户

package com.niuh.netty.heartbeat;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Random;
public class HeartBeatClient {
    public static void main(String[] args) throws Exception {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast("decoder", new StringDecoder());
                            pipeline.addLast("encoder", new StringEncoder());
                            pipeline.addLast(new HeartBeatClientHandler());
                        }
                    });
            System.out.println("netty client start。。");
            Channel channel = bootstrap.connect("127.0.0.1", 9000).sync().channel();
            String text = "Heartbeat Packet";
            Random random = new Random();
            while (channel.isActive()) {
                int num = random.nextInt(10);
                Thread.sleep(2 * 1000);
                channel.writeAndFlush(text);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
    static class HeartBeatClientHandler extends SimpleChannelInboundHandler {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            System.out.println(" client received :" + msg);
            if (msg != null && msg.equals("idle close")) {
                System.out.println(" 服务端关闭连接,客户端也关闭");
                ctx.channel().closeFuture();
            }
        }
    }
}

PS:以上代码是在Github上提交的:

文章持续更新中编码器的意思是什么,大家可以通过搜索“一毛钱科技”公众号尽快阅读。

本文已收录在 GitHub org_hejianhui/JavaStudy,欢迎 Star。

© 版权声明
THE END
喜欢就支持一下吧
点赞0
分享
评论 抢沙发

请登录后发表评论