什么是netty
Netty是一个NIO网络编程框架,快速开发高性能、高可靠性的网络服务器/客户端程序。 极大地简化了TCP和UDP等网络编程。是一个异步事件驱动的网络框架,快速、高性能。
RPC(pigeon、dubbo、HSF)Hadoop、Spark MQ(swallow、RocketMQ)Zookeeper等, 几乎所有的基于java的分布式中间件都是采用netty作为通信工具的,使用的是epoll模型
epoll io 模型
epoll 原理
相对于select和poll来说,epoll更加灵活,没有描述符限制,epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。
//创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大
int epoll_create(int size);
//对指定描述符fd执行op操作。epfd:是epoll_create()的返回值,op:表示op操作,fd:是需要监听的fd,epoll_event:是告诉内核需要监听什么事
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
//等待epfd上的io事件,最多返回maxevents个事件。
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一 个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait() 时便得到通知。
epoll 优点
epoll监视的描述符数量不受限制,它所支持的FD上限是最大可以打开文件的数目,IO的效率不会随着监视fd的数量的增长而下降。
epoll不同于select和poll轮询的方式,而是通过每个fd定义的回调函数来实现的,只有就绪的fd才会执行回调函数,随着监视的描述符数量的增长,其效率也不会出现线性下降。
netty 的几个核心组件
- Bootstrap:netty的辅助启动器,netty客户端和服务器的入口,Bootstrap是创建客户端连接的启动器,ServerBootstrap是监听服务端端口的启动器,跟tomcat的Bootstrap类似,程序的入口。
- Channel:关联jdk原生socket的组件,常用的是NioServerSocketChannel和NioSocketChannel,NioServerSocketChannel负责监听一个tcp端口,有连接进来通过boss reactor创建一个NioSocketChannel将其绑定到worker reactor,然后worker reactor负责这个NioSocketChannel的读写等io事件。
- EventLoop:netty最核心的几大组件之一,就是我们常说的reactor,人为划分为boss reactor和worker reactor。通过EventLoopGroup(Bootstrap启动时会设置EventLoopGroup)生成,最常用的是nio的NioEventLoop,就如同EventLoop的名字,EventLoop内部有一个无限循环,维护了一个selector,处理所有注册到selector上的io操作,在这里实现了一个线程维护多条连接的工作。
- ChannelPipeline:netty最核心的几大组件之一,ChannelHandler的容器,netty处理io操作的通道,与ChannelHandler组成责任链。write、read、connect等所有的io操作都会通过这个ChannelPipeline,依次通过ChannelPipeline上面的ChannelHandler处理,这就是netty事件模型的核心。ChannelPipeline内部有两个节点,head和tail,分别对应着ChannelHandler链的头和尾。
- ChannelHandler:netty最核心的几大组件之一,netty处理io事件真正的处理单元,开发者可以创建自己的ChannelHandler来处理自己的逻辑,完全控制事件的处理方式。ChannelHandler和ChannelPipeline组成责任链,使得一组ChannelHandler像一条链一样执行下去。ChannelHandler分为inBound和outBound,分别对应io的read和write的执行链。ChannelHandler用ChannelHandlerContext包裹着,有prev和next节点,可以获取前后ChannelHandler,read时从ChannelPipeline的head执行到tail,write时从tail执行到head,所以head既是read事件的起点也是write事件的终点,与io交互最紧密。
- Unsafe:顾名思义这个类就是不安全的意思,但并不是说这个类本身不安全,而是不要在应用程序里面直接使用Unsafe以及他的衍生类对象,实际上Unsafe操作都是在reactor线程中被执行。Unsafe是Channel的内部类,并且是protected修饰的,所以在类的设计上已经保证了不被用户代码调用。Unsafe的操作都是和jdk底层相关。EventLoop轮询到read或accept事件时,会调用unsafe.read(),unsafe再调用ChannelPipeline去处理事件;当发生write事件时,所有写事件都会放在EventLoop的task中,然后从ChannelPipeline的tail传播到head,通过Unsafe写到网络中。
如何创建TCP netty server 服务器
1. initTcpServer
/**
* 初始化TCP server
* @Description:
* @param gatewayInitParameter
* @param isLast
*/
public void initTcpServer(GatewayDataChannelParameter gatewayInitParameter) {
EventLoopGroup acceptor = new NioEventLoopGroup();// 用于设置服务器端接受客户端的连接
EventLoopGroup worker = new NioEventLoopGroup();// 用于网络事件处理
EventExecutorGroup worker2 = new DefaultEventExecutorGroup(16);
ServerBootstrap bootstrap = new ServerBootstrap();// bootstrap用于设置服务端的启动相关参数
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
bootstrap.group(acceptor, worker);
bootstrap.channel(NioServerSocketChannel.class);
TcpServerChannelInitializer initializer = new TcpServerChannelInitializer(gatewayInitParameter, worker2);// 初始化数据通道list
bootstrap.childHandler(initializer);
int port = gatewayInitParameter.getGatewayBaseData().getPort();
try {
ChannelFuture channelFuture = bootstrap.bind(port).sync().addListener(new ChannelFutureListener() {// 绑定端口启动服务事件
@Override
public void operationComplete(ChannelFuture future) throws Exception {
logger.info("tcp port is bound!");
}
});
channelFuture.channel().closeFuture().sync().addListener(new ChannelFutureListener() {//此处sync()会阻塞main方法
@Override
public void operationComplete(ChannelFuture future) throws Exception {
logger.info("all threads end ,main thread end!");// 正常状态此打印信息不执行上一行代码会阻塞MAIN函数
}
});
acceptor.shutdownGracefully();//netty server关闭时才会执行
worker.shutdownGracefully();
worker2.shutdownGracefully();
logger.info("TCP netty closed!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
2. TcpServerChannelInitializer
import java.util.concurrent.TimeUnit;
import com.sefon.gateway.baseModule.gatewayServer.gatewayBase.GatewayDataChannelParameter;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.EventExecutorGroup;
public class TcpServerChannelInitializer2 extends ChannelInitializer<SocketChannel> {//tcp此处为socketChannel
private GatewayDataChannelParameter gatewayInitParameter;
private EventExecutorGroup group ;
/**
* @param gatewayInitParameter
* @param group
*/
public TcpServerChannelInitializer2(GatewayDataChannelParameter gatewayInitParameter, EventExecutorGroup group) {
super();
this.gatewayInitParameter = gatewayInitParameter;
this.group = group;
}
@Override
protected void initChannel(SocketChannel arg0) throws Exception {
ChannelPipeline pipeline = arg0.pipeline();//pipeline 相当于ChannelHandler的容器,下面的stringDecoder也是一个handler
pipeline.addLast("docode",new StringDecoder());//添加的顺序非常重要
pipeline.addLast("encode",new StringEncoder());
pipeline.addLast(new IdleStateHandler(60 * 1000, 60 * 1000 , 60 * 1000 ,TimeUnit.SECONDS));//设置channel超时时间180秒
ChannelInboundHandlerAdapter adapter = gatewayInitParameter.getSimpleChannelInboundHandler();//此adapter是具体处理消息的handler类
pipeline.addLast(group, "server", adapter);
}
}
3. TCP处理数据的handler
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.CharSequenceValueConverter;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
/**
* @Description: 处理netty的handler
* @author: jackromer
* @version: 1.0, Jul 24, 2018
*/
@Sharable//可共享的CHANNEL
public class TcpChannelHandler2 extends SimpleChannelInboundHandler<String> {
//数据到达
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (ctx.channel().isOpen()) {
CharSequence initialResult = CharSequenceValueConverter.INSTANCE.convertObject(msg);// MSG先转为charSequence
ByteBuf result = Unpooled.copiedBuffer(initialResult, CharsetUtil.UTF_8); // 再转为byteBuf
if (result.isReadable() && ctx.channel().isActive()) {
String reportStr = result.toString(CharsetUtil.UTF_8);
result.release(); // reportStr使用后立马释放资源
String tcpReplyMsg = "hello client , i am tcp server.";
Channel = ctx.channel();
channel.writeAndFlush(tcpReplyMsg);//给客户端写消息
}
}
}
// 当新连接接入
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {}
//channel事件超时
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
switch (idleStateEvent.state()) {
case READER_IDLE://读空闲
break;
case WRITER_IDLE://写空闲
break;
case ALL_IDLE://读写空闲
break;
}
}
}
// 当连接断开
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {}
//客户端连接
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {}
//连接断开
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {}
//异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {}
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {}
}
如何创建UDP NETTY SERVER
1. initUdpServer
注意事项:
UDP使用的bootstrap 而不是ServerBootstrap。
UDP channelHandler使用的是SimpleChannelInboundHandler,TCP使用的是其他的。
且UDP是无连接的,因此不需要调用channel.close()方法,如果调用会触发UDP server 的绑定关闭事件导致UDP服务器关闭。
/**
* 初始化TCP server
* @Description:
* @param gatewayInitParameter
* @param isLast
*/
public void initUdpServer(GatewayDataChannelParameter gatewayInitParameter) {
EventLoopGroup acceptor = new NioEventLoopGroup();// 用于设置服务器端接受客户端的连接
EventExecutorGroup worker = new DefaultEventExecutorGroup(16);
Bootstrap bootstrap = new Bootstrap();// bootstrap用于设置服务端的启动相关参数,此处需要用Bootstrap和tcp不同
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
bootstrap.group(acceptor);
bootstrap.channel(NioDatagramChannel.class);
UdpServerChannelInitializer initializer = new UdpServerChannelInitializer(gatewayInitParameter, worker);// 初始化数据通道list
bootstrap.handler(initializer);
int port = gatewayInitParameter.getGatewayBaseData().getPort();
try {
ChannelFuture channelFuture = bootstrap.bind(port).sync().addListener(new ChannelFutureListener() {// 绑定端口启动服务事件
@Override
public void operationComplete(ChannelFuture future) throws Exception {
logger.info("tcp port is bound!");
}
});
channelFuture.channel().closeFuture().sync().addListener(new ChannelFutureListener() {//此处sync()会阻塞main方法
@Override
public void operationComplete(ChannelFuture future) throws Exception {
logger.info("all threads end ,main thread end!");// 正常状态此打印信息不执行上一行代码会阻塞MAIN函数
}
});
acceptor.shutdownGracefully();//netty server关闭时才会执行
worker.shutdownGracefully();
worker2.shutdownGracefully();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
2. TcpServerChannelInitializer
import java.util.concurrent.TimeUnit;
import com.sefon.gateway.baseModule.gatewayServer.gatewayBase.GatewayDataChannelParameter;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.EventExecutorGroup;
public class UdpServerChannelInitializer extends ChannelInitializer<NioDatagramChannel> {//udp此处为NioDatagramChannel
private GatewayDataChannelParameter gatewayInitParameter;
private EventExecutorGroup group ;
/**
* @param gatewayInitParameter
* @param group
*/
public TcpServerChannelInitializer2(GatewayDataChannelParameter gatewayInitParameter, EventExecutorGroup group) {
super();
this.gatewayInitParameter = gatewayInitParameter;
this.group = group;
}
@Override
protected void initChannel(SocketChannel arg0) throws Exception {
ChannelPipeline pipeline = arg0.pipeline();//pipeline 相当于ChannelHandler的容器,下面的stringDecoder也是一个handler
pipeline.addLast("docode",new StringDecoder());//添加的顺序非常重要
pipeline.addLast("encode",new StringEncoder());
//pipeline.addLast(new IdleStateHandler(60 * 1000, 60 * 1000 , 60 * 1000 ,TimeUnit.SECONDS));//UDP是无状态连接不需要设置
ChannelInboundHandlerAdapter adapter = gatewayInitParameter.getSimpleChannelInboundHandler();//此adapter是具体处理消息的handler类
pipeline.addLast(group, "server", adapter);
}
}
3. UDP处理数据的handler
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import com.sefon.gateway.baseModule.exceptions.CheckChannelMsgException;
import com.sefon.gateway.baseModule.gatewayServer.checkChannelMsg.ChannelMsgChecker;
import com.sefon.gateway.baseModule.gatewayServer.gatewayBase.GatewayBaseData;
import com.sefon.gateway.baseModule.poolsAndThreads.messageProcessors.TcpOrUdpMsgProcessor;
import com.sefon.gateway.baseModule.poolsAndThreads.processorPool.TcpOrUdpMessagePool;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.CharsetUtil;
/**
* @Description: 处理通用设备协议的TCP包数据
* @author: jackromer
* @version: 1.0, Jul 24, 2018
*/
@Sharable//可共享的CHANNEL
public class UdpChannelHandler extends SimpleChannelInboundHandler<DatagramPacket> {//此处需要用DatagramPacket
@Override
public void channelRead0(ChannelHandlerContext ctx, DatagramPacket dataPacket) throws Exception {
String result = dataPacket.content().toString(CharsetUtil.UTF_8);
//udp发送数据给客户端
String udpReplyMsg = "hello client , i am udp server.";
DatagramPacket pac = new DatagramPacket(Unpooled.copiedBuffer(udpReplyMsg, CharsetUtil.UTF_8), packet.sender());
hannelFuture channelFuture = channel.writeAndFlush(pac);
}
}
netty ctx 的fire相关方法
> file 方法
> `` ` ``java
> /**
>
> * A {@link Channel} is active now, which means it is connected.
> *
> * This will result in having the {@link ChannelInboundHandler#channelActive(ChannelHandlerContext)} method
> * called of the next {@link ChannelInboundHandler} contained in the {@link ChannelPipeline} of the
> * {@link Channel}.
> */
>
ChannelInboundInvoker fireChannelActive();//调用pipline中的handler列表中的下一个handler的channelActive方法,后面的方法类似
@Override
ChannelHandlerContext fireChannelRegistered();
@Override
ChannelHandlerContext fireChannelUnregistered();
@Override
ChannelHandlerContext fireChannelActive();
@Override
ChannelHandlerContext fireChannelInactive();
@Override
ChannelHandlerContext fireExceptionCaught(Throwable cause);
@Override
ChannelHandlerContext fireUserEventTriggered(Object evt);
@Override
ChannelHandlerContext fireChannelRead(Object msg);
@Override
ChannelHandlerContext fireChannelReadComplete();
@Override
ChannelHandlerContext fireChannelWritabilityChanged();
## netty 的http和https支持
>http 和https和netty的tcp比较类似,不同处在于:
>1.ChannelInitializer<SocketChannel> 的initChannel方法重写不一样.
2.handler数据处理方式不一样.
### 重写@Override initChannel 方法
```java
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline ph = ch.pipeline();
添加sslhandler
SSLEngine sslEngine = context.createSSLEngine();
sslEngine.setUseClientMode(false);
sslEngine.setNeedClientAuth(true); //双向认证
ph.addFirst(new SslHandler(sslEngine));
//处理http服务的关键handler
ph.addLast("codec", new HttpServerCodec());
ph.addLast("compressor", new HttpContentCompressor());
ph.addLast("decoder",new HttpRequestDecoder());
ph.addLast("encoder",new HttpResponseEncoder());
ph.addLast("aggregator", new HttpObjectAggregator(10 * 1024 * 1024));
ChannelInboundHandlerAdapter adapter = gatewayInitParameter.getHttpsSimpleChannelInboundHandler();
ch.pipeline().addLast(group, "httpsServer", adapter);// 处理空指针异常,必须保证观察者中只返回一个对象
}
handler处理数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
send(ctx, "OK", HttpResponseStatus.OK);
String result = "";
if (!(msg instanceof FullHttpRequest)) {
result = "未知请求!";
send(ctx, result, HttpResponseStatus.BAD_REQUEST);
//return;
}
FullHttpRequest httpRequest = (FullHttpRequest) msg;
try {
String path = httpRequest.uri(); //获取路径
System.out.println("url is :" + path);
String body = getBody(httpRequest); //获取参数
HttpMethod method = httpRequest.method(); //获取请求方法
System.out.println("接收到:" + method + " 请求");
//如果是GET请求
if (HttpMethod.GET.equals(method)) {
//接受到的消息,做业务逻辑处理...
System.out.println("body:" + body);
result = "GET请求";
send(ctx, result, HttpResponseStatus.OK);
//return;
}
//如果是POST请求
if (HttpMethod.POST.equals(method)) {
//接受到的消息,做业务逻辑处理...
System.out.println("body:" + body);
result = "POST请求";
send(ctx, result, HttpResponseStatus.OK);
//return;
}
ctx.flush();
} catch (Exception e) {
System.out.println("处理请求失败!");
e.printStackTrace();
} finally {
//释放请求
httpRequest.release();
ctx.flush();
}
}
注意事项
netty服务的关闭在官网写的很明确,就是当EventLoopGroups关闭时会触发服务关闭,TCP和UDP不同,当调用udp的ctx.channel.close()方法时会导致udp服务EventLoopGroups关闭,而调用tcp的ctx.channel.close()方法时只会关闭连接上的当前channel.
Shutting Down Your Application
Shutting down a Netty application is usually as simple as shutting down all EventLoopGroups you created via shutdownGracefully().
It returns a Future that notifies you when the EventLoopGroup has been terminated completely and all Channels that belong to the group have been closed.
总结
netty NIO机制支持高并发,在很多JAVA的分布式框架中都有使用.在数据处理方面TCP和UDP不同,由于TCP是有连接的在数据传输过程中可能出现粘包阻包问题,处理数据的时候需要根据协议单独处理,保证数据的完整性,而UDP是无连接的所以不需要处理粘包阻包问题,收到什么数据就是什么数据.同时NETTY也可以集成http和https,具体实现可参照官方文档。
一盏灯, 一片昏黄; 一简书, 一杯淡茶。 守着那一份淡定, 品读属于自己的寂寞。 保持淡定, 才能欣赏到最美丽的风景! 保持淡定, 人生从此不再寂寞。