NETTY-NIO


什么是netty

Netty是一个NIO网络编程框架,快速开发高性能、高可靠性的网络服务器/客户端程序。 极大地简化了TCP和UDP等网络编程。是一个异步事件驱动的网络框架,快速、高性能。
RPC(pigeon、dubbo、HSF)Hadoop、Spark MQ(swallow、RocketMQ)Zookeeper等, 几乎所有的基于java的分布式中间件都是采用netty作为通信工具的,使用的是epoll模型

epoll io 模型

epoll 原理

相对于select和poll来说,epoll更加灵活,没有描述符限制,epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。

//创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大
int epoll_create(int size);
//对指定描述符fd执行op操作。epfd:是epoll_create()的返回值,op:表示op操作,fd:是需要监听的fd,epoll_event:是告诉内核需要监听什么事
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
//等待epfd上的io事件,最多返回maxevents个事件。
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一 个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait() 时便得到通知。

epoll 优点

epoll监视的描述符数量不受限制,它所支持的FD上限是最大可以打开文件的数目,IO的效率不会随着监视fd的数量的增长而下降。
epoll不同于select和poll轮询的方式,而是通过每个fd定义的回调函数来实现的,只有就绪的fd才会执行回调函数,随着监视的描述符数量的增长,其效率也不会出现线性下降。

netty 的几个核心组件

  1. Bootstrap:netty的辅助启动器,netty客户端和服务器的入口,Bootstrap是创建客户端连接的启动器,ServerBootstrap是监听服务端端口的启动器,跟tomcat的Bootstrap类似,程序的入口。
  1. Channel:关联jdk原生socket的组件,常用的是NioServerSocketChannel和NioSocketChannel,NioServerSocketChannel负责监听一个tcp端口,有连接进来通过boss reactor创建一个NioSocketChannel将其绑定到worker reactor,然后worker reactor负责这个NioSocketChannel的读写等io事件。
  1. EventLoop:netty最核心的几大组件之一,就是我们常说的reactor,人为划分为boss reactor和worker reactor。通过EventLoopGroup(Bootstrap启动时会设置EventLoopGroup)生成,最常用的是nio的NioEventLoop,就如同EventLoop的名字,EventLoop内部有一个无限循环,维护了一个selector,处理所有注册到selector上的io操作,在这里实现了一个线程维护多条连接的工作。
  1. ChannelPipeline:netty最核心的几大组件之一,ChannelHandler的容器,netty处理io操作的通道,与ChannelHandler组成责任链。write、read、connect等所有的io操作都会通过这个ChannelPipeline,依次通过ChannelPipeline上面的ChannelHandler处理,这就是netty事件模型的核心。ChannelPipeline内部有两个节点,head和tail,分别对应着ChannelHandler链的头和尾。
  1. ChannelHandler:netty最核心的几大组件之一,netty处理io事件真正的处理单元,开发者可以创建自己的ChannelHandler来处理自己的逻辑,完全控制事件的处理方式。ChannelHandler和ChannelPipeline组成责任链,使得一组ChannelHandler像一条链一样执行下去。ChannelHandler分为inBound和outBound,分别对应io的read和write的执行链。ChannelHandler用ChannelHandlerContext包裹着,有prev和next节点,可以获取前后ChannelHandler,read时从ChannelPipeline的head执行到tail,write时从tail执行到head,所以head既是read事件的起点也是write事件的终点,与io交互最紧密。
  1. Unsafe:顾名思义这个类就是不安全的意思,但并不是说这个类本身不安全,而是不要在应用程序里面直接使用Unsafe以及他的衍生类对象,实际上Unsafe操作都是在reactor线程中被执行。Unsafe是Channel的内部类,并且是protected修饰的,所以在类的设计上已经保证了不被用户代码调用。Unsafe的操作都是和jdk底层相关。EventLoop轮询到read或accept事件时,会调用unsafe.read(),unsafe再调用ChannelPipeline去处理事件;当发生write事件时,所有写事件都会放在EventLoop的task中,然后从ChannelPipeline的tail传播到head,通过Unsafe写到网络中。

如何创建TCP netty server 服务器

1. initTcpServer

        /**
         * 初始化TCP server
        * @Description:
        * @param gatewayInitParameter
        * @param isLast
         */
        public void initTcpServer(GatewayDataChannelParameter gatewayInitParameter) {

            EventLoopGroup         acceptor       = new NioEventLoopGroup();// 用于设置服务器端接受客户端的连接
            EventLoopGroup         worker           = new NioEventLoopGroup();// 用于网络事件处理
            EventExecutorGroup     worker2     = new DefaultEventExecutorGroup(16);
            ServerBootstrap     bootstrap     = new ServerBootstrap();// bootstrap用于设置服务端的启动相关参数

            bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
            bootstrap.group(acceptor, worker);
            bootstrap.channel(NioServerSocketChannel.class);
            TcpServerChannelInitializer initializer = new TcpServerChannelInitializer(gatewayInitParameter, worker2);// 初始化数据通道list
            bootstrap.childHandler(initializer);
            int port  = gatewayInitParameter.getGatewayBaseData().getPort();

            try {
                ChannelFuture channelFuture = bootstrap.bind(port).sync().addListener(new ChannelFutureListener() {// 绑定端口启动服务事件
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        logger.info("tcp port is bound!");
                    }
                });

                channelFuture.channel().closeFuture().sync().addListener(new ChannelFutureListener() {//此处sync()会阻塞main方法
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        logger.info("all threads end ,main thread end!");// 正常状态此打印信息不执行上一行代码会阻塞MAIN函数
                    }
                });

                acceptor.shutdownGracefully();//netty server关闭时才会执行
                worker.shutdownGracefully();
                worker2.shutdownGracefully();
              logger.info("TCP netty closed!");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

2. TcpServerChannelInitializer

    import java.util.concurrent.TimeUnit;

    import com.sefon.gateway.baseModule.gatewayServer.gatewayBase.GatewayDataChannelParameter;

    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    import io.netty.handler.timeout.IdleStateHandler;
    import io.netty.util.concurrent.EventExecutorGroup;

    public class TcpServerChannelInitializer2 extends  ChannelInitializer<SocketChannel>  {//tcp此处为socketChannel

        private GatewayDataChannelParameter gatewayInitParameter;

        private EventExecutorGroup group ;

        /**
         * @param gatewayInitParameter
         * @param group
         */
        public TcpServerChannelInitializer2(GatewayDataChannelParameter gatewayInitParameter, EventExecutorGroup group) {
            super();
            this.gatewayInitParameter = gatewayInitParameter;
            this.group = group;
        }

        @Override
        protected void initChannel(SocketChannel arg0) throws Exception {

            ChannelPipeline pipeline = arg0.pipeline();//pipeline 相当于ChannelHandler的容器,下面的stringDecoder也是一个handler
            pipeline.addLast("docode",new StringDecoder());//添加的顺序非常重要
            pipeline.addLast("encode",new StringEncoder());
            pipeline.addLast(new IdleStateHandler(60 * 1000, 60 * 1000 , 60 * 1000 ,TimeUnit.SECONDS));//设置channel超时时间180秒
            ChannelInboundHandlerAdapter adapter = gatewayInitParameter.getSimpleChannelInboundHandler();//此adapter是具体处理消息的handler类
            pipeline.addLast(group, "server", adapter);
        }

    }

3. TCP处理数据的handler

    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandler.Sharable;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.handler.codec.CharSequenceValueConverter;
    import io.netty.handler.timeout.IdleStateEvent;
    import io.netty.util.CharsetUtil;

    /**
    * @Description:    处理netty的handler
    * @author: jackromer
    * @version: 1.0, Jul 24, 2018
    */

    @Sharable//可共享的CHANNEL
    public class TcpChannelHandler2 extends SimpleChannelInboundHandler<String> {

        //数据到达
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                if (ctx.channel().isOpen()) {
                    CharSequence initialResult = CharSequenceValueConverter.INSTANCE.convertObject(msg);// MSG先转为charSequence
                    ByteBuf result = Unpooled.copiedBuffer(initialResult, CharsetUtil.UTF_8);            // 再转为byteBuf
                    if (result.isReadable() && ctx.channel().isActive()) {
                        String reportStr     = result.toString(CharsetUtil.UTF_8);
                        result.release();       // reportStr使用后立马释放资源
              String tcpReplyMsg = "hello client , i am tcp server.";
              Channel = ctx.channel();
              channel.writeAndFlush(tcpReplyMsg);//给客户端写消息
                    } 
                }
        }

        // 当新连接接入
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {}

        //channel事件超时
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
                switch (idleStateEvent.state()) {
                    case READER_IDLE://读空闲
                        break;
                    case WRITER_IDLE://写空闲
                        break;
                    case ALL_IDLE://读写空闲
                        break;
                }
            }
        }

        // 当连接断开
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {}

        //客户端连接
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {}

        //连接断开
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {}

        //异常
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {}

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {}

    }

如何创建UDP NETTY SERVER

1. initUdpServer

注意事项:
UDP使用的bootstrap 而不是ServerBootstrap。
UDP channelHandler使用的是SimpleChannelInboundHandler,TCP使用的是其他的。
且UDP是无连接的,因此不需要调用channel.close()方法,如果调用会触发UDP server 的绑定关闭事件导致UDP服务器关闭。

        /**
         * 初始化TCP server
        * @Description:
        * @param gatewayInitParameter
        * @param isLast
         */
        public void initUdpServer(GatewayDataChannelParameter gatewayInitParameter) {

            EventLoopGroup         acceptor     = new NioEventLoopGroup();// 用于设置服务器端接受客户端的连接
            EventExecutorGroup     worker         = new DefaultEventExecutorGroup(16);
            Bootstrap             bootstrap     = new Bootstrap();// bootstrap用于设置服务端的启动相关参数,此处需要用Bootstrap和tcp不同
            bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
            bootstrap.group(acceptor);
            bootstrap.channel(NioDatagramChannel.class);
            UdpServerChannelInitializer initializer = new UdpServerChannelInitializer(gatewayInitParameter, worker);// 初始化数据通道list
            bootstrap.handler(initializer);

            int port  = gatewayInitParameter.getGatewayBaseData().getPort();

            try {
                ChannelFuture channelFuture = bootstrap.bind(port).sync().addListener(new ChannelFutureListener() {// 绑定端口启动服务事件
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        logger.info("tcp port is bound!");
                    }
                });

                channelFuture.channel().closeFuture().sync().addListener(new ChannelFutureListener() {//此处sync()会阻塞main方法
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        logger.info("all threads end ,main thread end!");// 正常状态此打印信息不执行上一行代码会阻塞MAIN函数
                    }
                });

                acceptor.shutdownGracefully();//netty server关闭时才会执行
                worker.shutdownGracefully();
                worker2.shutdownGracefully();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

2. TcpServerChannelInitializer

    import java.util.concurrent.TimeUnit;

    import com.sefon.gateway.baseModule.gatewayServer.gatewayBase.GatewayDataChannelParameter;

    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    import io.netty.handler.timeout.IdleStateHandler;
    import io.netty.util.concurrent.EventExecutorGroup;

    public class UdpServerChannelInitializer extends  ChannelInitializer<NioDatagramChannel>  {//udp此处为NioDatagramChannel

        private GatewayDataChannelParameter gatewayInitParameter;

        private EventExecutorGroup group ;

        /**
         * @param gatewayInitParameter
         * @param group
         */
        public TcpServerChannelInitializer2(GatewayDataChannelParameter gatewayInitParameter, EventExecutorGroup group) {
            super();
            this.gatewayInitParameter = gatewayInitParameter;
            this.group = group;
        }

        @Override
        protected void initChannel(SocketChannel arg0) throws Exception {

            ChannelPipeline pipeline = arg0.pipeline();//pipeline 相当于ChannelHandler的容器,下面的stringDecoder也是一个handler
            pipeline.addLast("docode",new StringDecoder());//添加的顺序非常重要
            pipeline.addLast("encode",new StringEncoder());
            //pipeline.addLast(new IdleStateHandler(60 * 1000, 60 * 1000 , 60 * 1000 ,TimeUnit.SECONDS));//UDP是无状态连接不需要设置
            ChannelInboundHandlerAdapter adapter = gatewayInitParameter.getSimpleChannelInboundHandler();//此adapter是具体处理消息的handler类
            pipeline.addLast(group, "server", adapter);
        }

    }

3. UDP处理数据的handler

    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.util.StringUtils;

    import com.sefon.gateway.baseModule.exceptions.CheckChannelMsgException;
    import com.sefon.gateway.baseModule.gatewayServer.checkChannelMsg.ChannelMsgChecker;
    import com.sefon.gateway.baseModule.gatewayServer.gatewayBase.GatewayBaseData;
    import com.sefon.gateway.baseModule.poolsAndThreads.messageProcessors.TcpOrUdpMsgProcessor;
    import com.sefon.gateway.baseModule.poolsAndThreads.processorPool.TcpOrUdpMessagePool;

    import io.netty.channel.ChannelHandler.Sharable;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.channel.socket.DatagramPacket;
    import io.netty.util.CharsetUtil;

    /**
    * @Description:    处理通用设备协议的TCP包数据
    * @author: jackromer
    * @version: 1.0, Jul 24, 2018
    */

    @Sharable//可共享的CHANNEL
    public class UdpChannelHandler extends SimpleChannelInboundHandler<DatagramPacket> {//此处需要用DatagramPacket

        @Override
        public void channelRead0(ChannelHandlerContext ctx, DatagramPacket  dataPacket) throws Exception {
            String result = dataPacket.content().toString(CharsetUtil.UTF_8);

        //udp发送数据给客户端
        String udpReplyMsg = "hello client , i am udp server.";
        DatagramPacket pac = new DatagramPacket(Unpooled.copiedBuffer(udpReplyMsg, CharsetUtil.UTF_8), packet.sender());    
            hannelFuture channelFuture = channel.writeAndFlush(pac);
        }

    }

netty ctx 的fire相关方法

> file 方法  
> `` ` ``java  
> /**
> 
>      * A {@link Channel} is active now, which means it is connected.
>      *
>      * This will result in having the  {@link ChannelInboundHandler#channelActive(ChannelHandlerContext)} method
>      * called of the next  {@link ChannelInboundHandler} contained in the  {@link ChannelPipeline} of the
>      * {@link Channel}.
>      */
>     

    ChannelInboundInvoker fireChannelActive();//调用pipline中的handler列表中的下一个handler的channelActive方法,后面的方法类似
    @Override
    ChannelHandlerContext fireChannelRegistered();

    @Override
    ChannelHandlerContext fireChannelUnregistered();

    @Override
    ChannelHandlerContext fireChannelActive();

    @Override
    ChannelHandlerContext fireChannelInactive();

    @Override
    ChannelHandlerContext fireExceptionCaught(Throwable cause);

    @Override
    ChannelHandlerContext fireUserEventTriggered(Object evt);

    @Override
    ChannelHandlerContext fireChannelRead(Object msg);

    @Override
    ChannelHandlerContext fireChannelReadComplete();

    @Override
    ChannelHandlerContext fireChannelWritabilityChanged();


    ## netty 的http和https支持
    >http 和https和netty的tcp比较类似,不同处在于:
    >1.ChannelInitializer<SocketChannel> 的initChannel方法重写不一样.
    2.handler数据处理方式不一样.
    ### 重写@Override initChannel 方法
    ```java
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {

        ChannelPipeline ph = ch.pipeline();
        添加sslhandler
        SSLEngine sslEngine = context.createSSLEngine();
        sslEngine.setUseClientMode(false);
        sslEngine.setNeedClientAuth(true);  //双向认证
        ph.addFirst(new SslHandler(sslEngine));
        //处理http服务的关键handler
        ph.addLast("codec", new HttpServerCodec());
        ph.addLast("compressor", new HttpContentCompressor());
        ph.addLast("decoder",new HttpRequestDecoder());
        ph.addLast("encoder",new HttpResponseEncoder());
        ph.addLast("aggregator", new HttpObjectAggregator(10 * 1024 * 1024));
        ChannelInboundHandlerAdapter adapter = gatewayInitParameter.getHttpsSimpleChannelInboundHandler();
        ch.pipeline().addLast(group, "httpsServer", adapter);// 处理空指针异常,必须保证观察者中只返回一个对象
    }

handler处理数据

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
      send(ctx, "OK", HttpResponseStatus.OK);
      String result = "";
        if (!(msg instanceof FullHttpRequest)) {
            result = "未知请求!";
            send(ctx, result, HttpResponseStatus.BAD_REQUEST);
            //return;
        }
        FullHttpRequest httpRequest = (FullHttpRequest) msg;
        try {
            String path = httpRequest.uri();          //获取路径
            System.out.println("url is :" + path);
            String body = getBody(httpRequest);       //获取参数
            HttpMethod method = httpRequest.method(); //获取请求方法
            System.out.println("接收到:" + method + " 请求");
            //如果是GET请求
            if (HttpMethod.GET.equals(method)) {
                //接受到的消息,做业务逻辑处理...
                System.out.println("body:" + body);
                result = "GET请求";
                send(ctx, result, HttpResponseStatus.OK);
                //return;
            }

            //如果是POST请求
            if (HttpMethod.POST.equals(method)) {
                //接受到的消息,做业务逻辑处理...
                System.out.println("body:" + body);
                result = "POST请求";
                send(ctx, result, HttpResponseStatus.OK);
                //return;
            }

            ctx.flush();
        } catch (Exception e) {
            System.out.println("处理请求失败!");
            e.printStackTrace();
        } finally {
            //释放请求
            httpRequest.release();
            ctx.flush();
        }
    }

注意事项

netty服务的关闭在官网写的很明确,就是当EventLoopGroups关闭时会触发服务关闭,TCP和UDP不同,当调用udp的ctx.channel.close()方法时会导致udp服务EventLoopGroups关闭,而调用tcp的ctx.channel.close()方法时只会关闭连接上的当前channel.

    Shutting Down Your Application
    Shutting down a Netty application is usually as simple as shutting down all EventLoopGroups you created via shutdownGracefully(). 
    It returns a Future that notifies you when the EventLoopGroup has been terminated completely and all Channels that belong to the group have been closed.

总结

netty NIO机制支持高并发,在很多JAVA的分布式框架中都有使用.在数据处理方面TCP和UDP不同,由于TCP是有连接的在数据传输过程中可能出现粘包阻包问题,处理数据的时候需要根据协议单独处理,保证数据的完整性,而UDP是无连接的所以不需要处理粘包阻包问题,收到什么数据就是什么数据.同时NETTY也可以集成http和https,具体实现可参照官方文档。

一盏灯, 一片昏黄; 一简书, 一杯淡茶。 守着那一份淡定, 品读属于自己的寂寞。 保持淡定, 才能欣赏到最美丽的风景! 保持淡定, 人生从此不再寂寞。



   Reprint policy


《NETTY-NIO》 by jackromer is licensed under a Creative Commons Attribution 4.0 International License
 Previous
SPRING TRANSACTIONAL-事务理解和使用 SPRING TRANSACTIONAL-事务理解和使用
SPRING transactional 简介 事务管理是企业级应用程序开发中必不可少的技术,用来确保数据的完整性和一致性。事务就是一系列的动作,它们被当作一个单独的工作单元,这些动作要么全部完成,要么全部不起作用. 事务的四个关键属
2019-08-27
Next 
THREAD-POOL-多线程并发 THREAD-POOL-多线程并发
多线程简介 多线程适合多种开发语言的开发, 这是一种思想,基于的是操作系统底层的CPU调度.此篇介绍线程、多线程、和线程池的使用和机制。 进程\线程 进程 1 进程 是指在系统中正在运行的一个应用程序, 每一个进程都有一个PID,
2019-08-27
  目录