五、Netty 教程 – 解码器详解

作者:唐亚峰 | 出自:唐亚峰博客

TCP以流的形式传输,在上一章,我们讲了粘包和拆包,以及LineBaseFrameDecoder使用和源码探讨,接下来讲讲Netty为我们实现的其它解码器…..

TCP以流的方式进行数据传输,上层的应用为了对消息进行区分,往往采用如下方式

  • 固定消息长度,累计读取到长度和定长LEN的报文后,就认为读取到了个完整的消息,然后将计数器位置重置在读取下一个报文内容
  • 将回车换行符作为消息结束符\r\n,列如FTP协议,这种方式在文本中应用比较广泛
  • 将特殊分隔符作为消息结束符标志位,回车换行符就是一个特殊结束分隔符(DelimiterBasedFrameDecoder)
  • 通过在消息头定义一个长度字段来标示消息的总长度(FixedLengthFrameDecoder)

Netty对以上4种做个统一抽象封装,提供了四种不同解码器来解决对应问题,使用起来也非常的方便,了解了它们,我们就不需要自己对读取的报文人工解码,也不需要考虑TCP粘包和拆包的问题了…

Delimiter自定义分隔符

我将公共的部分做了一层抽离,定义成常量方便调用

public interface EchoConstant {
    String SEPARATOR = "$_";//特殊分割符号,DelimiterBasedFrameDecoder使用
    Integer ECHO_DELIMITER_PORT = 4040;
    Integer ECHO_LENGTH_PORT = 5050;
    String HOST = "127.0.0.1";
    Integer FRAME_LENGTH = 10;//固定消息长度,FixedLengthFrameDecoder使用
}

定义EchoDelimiterServer,毫无疑问大部分代码和以前类似,区别是多了一个日志输出以及DelimiterBasedFrameDecoder的使用

划重点:在做开发调试的时候,我们可以使用Netty为我们提供的LoggingHandler输出日志

public static void bind(int port) {
    EventLoopGroup masterGroup = new NioEventLoopGroup();//线程组,含一组NIO线程,专门用来处理网络事件
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    try {
        ServerBootstrap bootstrap = new ServerBootstrap();//NIO服务端启动辅助类
        bootstrap.group(masterGroup, workerGroup).channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 100)
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel channel) throws Exception {
                        ByteBuf delimiter = Unpooled.copiedBuffer(EchoConstant.SEPARATOR.getBytes());
                        channel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
                        channel.pipeline().addLast(new StringDecoder());
                        channel.pipeline().addLast(new EchoServerHandler());
                    }
                });
        //绑定端口,同步等待成功,
        System.out.println("绑定端口,同步等待成功......");
        ChannelFuture future = bootstrap.bind(port).sync();
        //等待服务端监听端口关闭
        future.channel().closeFuture().sync();
        System.out.println("等待服务端监听端口关闭......");
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        //优雅退出释放线程池
        masterGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
        System.out.println("优雅退出释放线程池......");
    }
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    String body = (String) msg;
    System.out.println("EchoDelimiterServer 接收到的消息 :" + body + "; 当前统计:" + ++counter);
    body = body + EchoConstant.SEPARATOR;//在消息后面加上特殊分隔符
    ByteBuf echo = Unpooled.copiedBuffer(body.getBytes());
    ctx.writeAndFlush(echo);//消息写出
}

定义EchoDelimiterClient

public static void connect(String host, int port) {
    EventLoopGroup group = new NioEventLoopGroup();
    Bootstrap bootstrap = new Bootstrap();
    ChannelFuture future = null;
    try {
        bootstrap.group(group).channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel channel) throws Exception {
                        ByteBuf delimiter = Unpooled.copiedBuffer(EchoConstant.SEPARATOR.getBytes());
                        channel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
                        channel.pipeline().addLast(new StringDecoder());
                        channel.pipeline().addLast(new EchoClientHandler());
                    }
                });
        //发起异步请求
        future = bootstrap.connect(host, port).sync();
        //等待客户端链路关闭
        future.channel().closeFuture().sync();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        group.shutdownGracefully();
    }
}

创建EchoClientHandler继承ChannelHandlerAdapter,重写读取和写出事件

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    for (int i = 0; i < 10; i++) {
        ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));
    }
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    String body = (String) msg;
    System.out.println("EchoDelimiterClient 接收到的消息 :" + body + "; 当前统计:" + ++counter);
}

试验一把

分别启动EchoDelimiterServerEchoDelimiterClient,输出如下日志

绑定端口,同步等待成功......
九月 04, 2017 10:41:27 下午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0x3b1849e8] REGISTERED
九月 04, 2017 10:41:27 下午 io.netty.handler.logging.LoggingHandler bind
信息: [id: 0x3b1849e8] BIND: 0.0.0.0/0.0.0.0:4040
九月 04, 2017 10:41:27 下午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0x3b1849e8, /0:0:0:0:0:0:0:0:4040] ACTIVE
九月 04, 2017 10:41:33 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0x3b1849e8, /0:0:0:0:0:0:0:0:4040] RECEIVED: [id: 0xa45511cd, /127.0.0.1:50226 => /127.0.0.1:4040]
EchoDelimiterServer 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:1
EchoDelimiterServer 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:2
EchoDelimiterServer 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:3
EchoDelimiterServer 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:4
EchoDelimiterServer 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:5
EchoDelimiterServer 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:6
EchoDelimiterServer 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:7
EchoDelimiterServer 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:8
EchoDelimiterServer 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:9
EchoDelimiterServer 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:10
------------------------------------------------------------------------------------------------
EchoDelimiterClient 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:1
EchoDelimiterClient 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:2
EchoDelimiterClient 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:3
EchoDelimiterClient 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:4
EchoDelimiterClient 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:5
EchoDelimiterClient 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:6
EchoDelimiterClient 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:7
EchoDelimiterClient 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:8
EchoDelimiterClient 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:9
EchoDelimiterClient 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:10

FixedLength指定消息长度

FixedLengthFrameDecoder 的方式比较极端,就是解析固定长度的报文消息,举个例子:假设我的报文长度为50,解析长度为30,那么这个数据包会被拆分成2次来解析,反之亦然…..

创建EchoLengthServer,重写初始化通道与读取事件

@Override
protected void initChannel(SocketChannel channel) throws Exception {
    channel.pipeline().addLast(new FixedLengthFrameDecoder(EchoConstant.FRAME_LENGTH));
    channel.pipeline().addLast(new StringDecoder());
    channel.pipeline().addLast(new EchoLengthServer.EchoServerHandler());
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    String body = (String) msg;
    System.out.println("EchoLengthServer 接收到的消息 :" + body + "; 当前统计:" + ++counter);
}

创建EchoLengthClient,重写初始化通道,写出与读取事件

@Override
protected void initChannel(SocketChannel channel) throws Exception {
    channel.pipeline().addLast(new FixedLengthFrameDecoder(EchoConstant.FRAME_LENGTH));
    channel.pipeline().addLast(new StringDecoder());
    channel.pipeline().addLast(new EchoLengthServer.EchoServerHandler());
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    String body = (String) msg;
    System.out.println("EchoLengthClient 接收到的消息 :" + body + "; 当前统计:" + ++counter);
}

试验一把

分别启动EchoLengthServerEchoLengthClient,输出如下日志

绑定端口,同步等待成功......
EchoLengthServer 接收到的消息 :EchoLength; 当前统计:1
EchoLengthServer 接收到的消息 :Client  Hi; 当前统计:2
EchoLengthServer 接收到的消息 : .Welcome ; 当前统计:3

警告: 前面说过,如果切割内容过长或者过断都会出现拆包或者粘包情况,所以这种方式需要根据具体业务需求来…..

总结

由此可以看到DelimiterBasedFrameDecoder用于对使用分割符结尾的消息进行自动解码,FixedLengthFrameDecoder用于对固定长度的消息进行自动解码,有了以上两种解码器在结合其它的解码器,如(StringDecoder),可以轻松完成大部分消息的自动解码,且无需考虑TCP粘包/拆包导致的半读问题,极大的提升了开发效率…..

– 说点什么

全文代码:https://git.oschina.net/battcn/battcn-netty/tree/master/Chapter5-1/battcn-netty-5-1-1

附录:Netty 教程系列文章