目录
Bootstrap、EventLoop(Group) 、Channel
事件和 ChannelHandler、ChannelPipeline
Netty的优势
1. API 使用简单,开发门槛低。
2. 功能强大,预置了多种编解码功能,支持多种主流协议。
3. 定制能力强,可以通过 ChannelHandler 对通信框架进行灵活地扩展。
4. 性能高,通过与其他业界主流的 NIO 框架对比,Netty 的综合性能最优。
5. 成熟、稳定,Netty 修复了已经发现的所有 JDK NIO BUG,业务开发人员不需要再为 NIO 的 BUG 而烦恼。
6. 社区活跃,版本迭代周期短,发现的 BUG 可以被及时修复,同时更多的新功能会加入。
7. 经历了大规模的商业应用考验,质量得到验证。
为什么Netty使用NIO而不是AIO?
Netty 不看重 Windows 上的使用,在 Linux 系统上,AIO 的底层实现仍使用 EPOLL,没有很好实现 AIO,因此在性能上没有明显的优势,而且被 JDK 封装了一层不容易深度优化。
AIO 有个缺点是接收数据需要预先分配缓存,而不是 NIO 那种需要接收时才需要分配缓存,所以对连接数量非常大但流量小的情况,内存浪费很多。而且 Linux 上 AIO 不够成熟,处理回调结果速度跟不上处理需求。
Netty基本组件
Bootstrap、EventLoop(Group)、Channel
Bootstrap 是 Netty 框架的启动类和主入口类,分为客户端类Bootstrap和服务器类ServerBootstrap两种。
Channel 是 Java NIO 的一个基本构造。它代表一个到实体(如一个硬件设备、一个文件、一个网络套接字或者一个能够执行一个或者多个不同的 I/O 操作的程序组件)的开放连接,如读操作和写操作,目前,可以把 Channel 看作是传入(入站)或者传出(出站)数据的载体。因此,它可以被打开或者被关闭,连接或者断开连接。
EventLoop 可以看成一个线程、EventLoopGroup 自然就可以看成线程组。
事件和 ChannelHandler、ChannelPipeline
Netty 使用不同的事件来通知我们状态的改变或者是操作的状态。这使得我们能够基于已经发生的事件来触发适当的动作。 Netty 事件是按照它们与入站或出站数据流的相关性进行分类的。
入站数据或者相关的状态更改而触发的事件包括:
连接已被激活或者连接失活、数据读取、用户事件、错误事件。
出站事件是未来将会触发的某个动作的操作结果包括:
打开或者关闭到远程节点的连接、将数据写到或者冲刷到套接字。
每个事件都可以被分发给 ChannelHandler 类中的某个用户实现的方法,既然事件分为入站和出站,用来处理事件的 ChannelHandler 也被分为可以处理入站事件的 Handler 和出站事件的 Handler,当然有些 Handler 既可以处理入站也可以处理出站。
Netty提供了大量预定义的可以开箱即用的ChannelHandler实现,包括用于各种协议(如 HTTP 和 SSL/TLS)的ChannelHandler。
基于 Netty 的网络应用程序中根据业务需求会使用 Netty 已经提供的 ChannelHandler 或者自行开发 ChannelHandler,这些 ChannelHandler 都放在 ChannelPipeline 中统一管理,事件就会在 ChannelPipeline 中流动,并被其中一个或者多个 ChannelHandler 处理。
ChannelFuture
Netty 中所有的 I/O 操作都是异步的,“异步的意思就是不需要主动等待结果的返回,而是通过其他手段比如,状态通知,回调函数等”,那就是说至少我们需要一种获得异步执行结果的手段。
JDK 预置了 interface java.util.concurrent.Future,Future 提供了一种在操作完成时通知应用程序的方式。这个对象可以看作是一个异步操作的结果的占位符,它将在未来的某个时刻完成,并提供对其结果的访问。但是其所提供的实现,只允许手动检查对应的操作是否已经完成,或者一直阻塞直到它完成。这是非常繁琐的,所以 Netty 提供了它自己的实现 ChannelFuture,用于在执行异步操作的时候使用。
一般来说,每个 Netty 的出站 I/O 操作都将返回一个 ChannelFuture。
Netty入门程序
服务端代码
public class EchoServer {
private static final Logger LOG = LoggerFactory.getLogger(EchoServer.class);
private final int port;
public EchoServer(int port) {
this.port = port;
}
public static void main(String[] args) throws InterruptedException {
int port = 9999;
EchoServer echoServer = new EchoServer(port);
LOG.info("服务器即将启动");
echoServer.start();
LOG.info("服务器关闭");
}
public void start() throws InterruptedException {
/*线程组*/
EventLoopGroup group = new NioEventLoopGroup();
try {
/*服务端启动必备*/
ServerBootstrap b = new ServerBootstrap();
b.group(group)
.channel(NioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoServerHandler());
}
});
/*异步绑定到服务器,sync()会阻塞到完成*/
ChannelFuture f = b.bind().sync();
LOG.info("服务器启动完成。");
/*阻塞当前线程,直到服务器的ServerChannel被关闭*/
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
}
ChannelHandler如下:
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf)msg;
System.out.println("server accept :" + in.toString(CharsetUtil.UTF_8));
ctx.writeAndFlush(in);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("连接已建立");
super.channelActive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
客户端代码
public class EchoClient {
private final int port;
private final String host;
public EchoClient(int port, String host) {
this.port = port;
this.host = host;
}
public void start() throws InterruptedException {
/*线程组*/
EventLoopGroup group = new NioEventLoopGroup();
try {
/*客户端启动必备,和服务器的不同点*/
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)/*指定使用NIO的通信模式*/
/*指定服务器的IP地址和端口,和服务器的不同点*/
.remoteAddress(new InetSocketAddress(host,port))
/*和服务器的不同点*/
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoClientHandler());
}
});
/*异步连接到服务器,sync()会阻塞到完成,和服务器的不同点*/
ChannelFuture f = b.connect().sync();
f.channel().closeFuture().sync();/*阻塞当前线程,直到客户端的Channel被关闭*/
} finally {
group.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws InterruptedException {
new EchoClient(9999,"127.0.0.1").start();
}
}
ChannelHandler如下:
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
/*读取到网络数据后进行业务处理,并关闭连接*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("client Accept"+msg.toString(CharsetUtil.UTF_8));
//关闭连接
///ctx.close();
}
/*channel活跃后,做业务处理*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer(
"Hello,Netty",CharsetUtil.UTF_8));
}
}