Netty用户指南

一、前言

1.问题

当今世界我们需要使用通用的软件或库与其他组件进行通信,例如使用HTTP客户端从服务器中获取信息,或通过网络服务调用一个远程的方法。然而通用的协议及其实现通常不具备较好的伸缩性。所以问题看起来是我们怎么不使用通用的HTTP服务器去传输大文件、e-mail、实事数据、多媒体数据等。我们需要的是针对特定问题而进行优化的协议实现。例如我们可能需要重新实现一个HTTP服务器来与AJAX的客户端进行通信。另外一种情况是需要处理历史遗留的协议保证与旧的系统兼容。这些例子的关键在于怎样快速的实现协议而不损失目标系统的稳定性和性能。

2.解决方案

Netty是一个异步事件驱动的网络应用框架,可以用来快速开发可维护的、高性能、可扩展的协议服务器和客户端。

换句话说,Netty是一个基于NIO的客户端和服务器框架,可以简单快速的开发网络应用程序,如协议的客户端和服务器。它极大的简化了TCP、UDP服务器之类的网络编程。

二、开始

1.编写DiscardServer

最简单的协议并不是“hello world”,而是丢弃。丢弃协议会丢弃任何接受到的数据不做任何的响应。

要实现丢弃协议,需要做的就是丢弃任何接收到的数据。首先从handler的实现开始,handler会处理由Netty产生的I/O事件。

package io.netty.example.discard;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* Handles a server-side channel.
*/
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
// Discard the received data silently.
((ByteBuf) msg).release(); // (3)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
  1. DiscardServerHandler继承了ChannelInboundHandlerAdapter,而他又实现了ChannelInboundHandlerChannelInboundHandler提供了不同的事件处理方法,你可以根据需要去覆写相应的方法。ChannelInboundHandlerAdapter提供了一些默认的实现,所以在这个例子中只需要去继承它就可以了。
  2. 覆写了channelRead方法,Netty从客户端收到数据时就会调用该方法。消息的类型是ByteBuf
  3. ByteBuf是一个引用计数对象,需要进行手动的释放。需要注意的是,handler需要释放任何传递给他的引用计数对象。通常情况下channelRead()方法通常的实现方式如下:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
// Do something with msg
} finally {
ReferenceCountUtil.release(msg);
}
}
  1. 由于IO错误Netty抛出异常或handle处理事件抛出异常,都会使exceptionCaught()方法被调用。在大多数情况下,都需要对异常记日志,并且关闭相关连的channel

到目前为止实现了DISCARD服务的一般,接下来需要实现main()方法来启动服务。

package io.netty.example.discard;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel; /**
* Discards any incoming data.
*/
public class DiscardServer { private int port; public DiscardServer(int port) {
this.port = port;
} public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6) // Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (7) // Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
} public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new DiscardServer(port).run();
}
}
  1. NioEventLoopGroup 是一个多线程的事件循环,用来处理I/O操作。Netty为不同的通信方式提供了多种EventLoopGroup实现。在本例中,我们只需要实现服务器端的应用,所以需要两个NioEventLoopGroup 。第一个通常称为boss,用来接收客户端的链接请求。第二个称为worker,用来处理boss已接收连接的I/O请求和把接收的连接注册到worker
  2. ServerBootstrap是用来创建服务器的辅助类。
  3. 使用NioServerSocketChannel类来实例化channel,用来接收连接请求。
  4. 在这里设置的handler会被每一个新channel调用,ChannelInitializer是一个特殊的handler用来配置一个新的channel。在本例中,我们将DiscardServerHandler添加到新channel 的管道中。随着应用程序的复杂度增加,可能会向管道中加入更多的handler。
  5. 可以通过option()方法给channel设置一些参数。
  6. option()方法是用来设置NioServerSocketChannel参数的,而childOption()是给接收的连接设置参数的。
  7. 剩下的就是绑定端口然后启动服务了。

2. 测试DiscardServer是否成功

最简单的方法是使用telnet命令。例如输入telnet localhost 8080。DiscarServer丢弃了任何接受的数据,我们可以把DiscardServer的接收的数据打印出来。

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
try {
while (in.isReadable()) { // (1)
System.out.print((char) in.readByte());
System.out.flush();
}
} finally {
ReferenceCountUtil.release(msg); // (2)
}
}
  1. 循环可以等价于System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII))
  2. 等价于in.release()

3.写一个Echo Server

一个服务器通常需要对请求作出响应,而一个Echo服务仅仅需要做的是把请求的内容返回给客户端。

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg); // (1)
ctx.flush(); // (2)
}
  1. ChannelHandlerContext对象提供了各种出发IO时间的操作。通过调用write(Object)方法把数据发给客户端。在这里没有手动的释放msg,这是因为当把msg写入时Netty会自动的释放它。
  2. ctx.write(Object)并不会把数据写到外部,而是在内部的缓冲区中,通过调用ctx.flush()把数据刷出到外部。可以简洁的调用ctx.wirteAndFlush(msg)达到同样的效果。

4. 写一个Timer Server

TIME协议与前面的例子不同之处在于,它发送一个32位的整数,不接收任何请求,并且只要消息发送了就立刻关闭连接。

因为我们不需要接收任何数据,而且在连接建立时就发送数据,所以不能使用channelRead()方法。需要覆写channelActive()方法

package io.netty.example.time;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(final ChannelHandlerContext ctx) { // (1)
final ByteBuf time = ctx.alloc().buffer(4); // (2)
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L)); final ChannelFuture f = ctx.writeAndFlush(time); // (3)
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();
}
}); // (4)
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
  1. 当一个连接建立时,activeChannel()方法会被调用,然后写一个32位的整数。

  2. 为了发送一个新的信息,需要分配一个缓冲区。通过调用ctx.alloc()获取ByteBufAllocator来分配缓冲区。

  3. 在Netty中的Buffer不需要像Java NIO一样调用flip(),这是因为Netty中的Buffer具有两个指针,分别用于读写操作。当进行写操作时写指针在移动而读指针不移动,读写指针分别代表数据的开始和结束。

    另外需要指出的是,ctx.write()返回一个ChannelFuture对象,该对象代表着一个还未发生的IO操作。这意味着,任何一个请求操作可能都未发生,这是因为在Netty中,所有操作都是异步的。例如下面的代码可能在发送信息前关闭连接:

    Channel ch = ...;
    ch.writeAndFlush(message);
    ch.close();

    所以要在ChannelFuture完成前调用close(),当操作完成时,ChannelFuture会通知他的监听器。close()可能也不会立即关闭连接。

  4. 本例中添加一个匿名内部类作为监听器,来关闭连接。也可以使用预定义的监听器:

    f.addListener(ChannelFutureListener.CLOSE);

5.Time Client

不同于DISCARD和ECHO,TIME协议需要一个客户端将32位的整数转为一个日期。Netty中的客户端和服务器最大的不同在于使用了不同的BootStrapChannel现实。

package io.netty.example.time;

public class TimeClient {
public static void main(String[] args) throws Exception {
String host = args[0];
int port = Integer.parseInt(args[1]);
EventLoopGroup workerGroup = new NioEventLoopGroup(); try {
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync(); // (5)
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
  1. BootStapServerBootStrap很相似,但它是用于客户端的。
  2. 只需指定一个EventLoopGroup,在客户端中不需要boss。
  3. 使用NioSocketChannel而不是NioServerSocketChannel
  4. 不需要childOption()
  5. 使用connect()方法而不是bind()

TimeClientHandler中,将整数翻译成日期格式的类型。

package io.netty.example.time;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg; // (1)
try {
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
} finally {
m.release();
}
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

6.处理基于流的传输问题。

TCP/IP协议接收数据并储存到Socket缓冲区中,但是缓冲区不是数据包的队列,而是字节的队列,这意味着你发送了两条消息,但操作系统会并不认为是两条消息而是一组字节。所以在读数据时并不能确定读到了对方发过来的数据。

在TIME协议中,在调用m.readUnsignedInt()时缓冲区中需要有四个字节,如果缓冲区中还未接收到四个字节时就会抛出异常。

解决方法是,再加一个ChannelHandleChannelPipeline。该handler专门处理编码问题。

package io.netty.example.time;

public class TimeDecoder extends ByteToMessageDecoder { // (1)
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
if (in.readableBytes() < 4) {
return; // (3)
}
out.add(in.readBytes(4)); // (4)
}
}
  1. ByteToMessageDecoderChannelInboundHandler的一个实现,专门用于编码问题。
  2. 当新的数据到达时,Netty会调用decode方法,并且其内部维护着一个累加Buffer。
  3. 当累加Buffer中没有足够的数据时,可以不在out中添加任何数据。当新数据到达后Netty又会调用decode方法。
  4. 如果decode()添加一个对象到out中,意味着编码信息成功了。Netty会丢弃Buffer中已读取的部分数据。

TimeDecoder添加到ChannelPipeline中:

b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
}
});

另外一种更简单的方式是使用ReplayingDecoder

public class TimeDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(
ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
out.add(in.readBytes(4));
}
}

当调用in.readBytes(4)抛出异常时,ReplayingDecoder会捕捉异常并重复执行decode()

7.使用POJO代替ByteBuf

在之前的TIME服务中,都是直接使用ByteBuf作为协议的数据结构。在Handler中使用POJO对象,可以把从ByteBuf抽取POJO的代码分离开。

首先定义UnixTime类:

package io.netty.example.time;

import java.util.Date;

public class UnixTime {

    private final long value;

    public UnixTime() {
this(System.currentTimeMillis() / 1000L + 2208988800L);
} public UnixTime(long value) {
this.value = value;
} public long value() {
return value;
} @Override
public String toString() {
return new Date((value() - 2208988800L) * 1000L).toString();
}
}

TimeDecoder中解码产生UnixTime对象

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
if (in.readableBytes() < 4) {
return;
}
out.add(new UnixTime(in.readUnsignedInt()));
}

TimeClientHandler中不再需要使用ByteBuf了。

在服务器端,首先更改TimeServerHandler

@Override
public void channelActive(ChannelHandlerContext ctx) {
ChannelFuture f = ctx.writeAndFlush(new UnixTime());
f.addListener(ChannelFutureListener.CLOSE);
}

还需要创建一个编码器,将UnixTime转为ByteBuf以便网络传输

public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
@Override
protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
out.writeInt((int)msg.value());
}
}

最新文章

  1. centos执行yum出现Could not retrieve mirrorlist错误
  2. C++学习笔记 指针与引用
  3. TYVJ P1008 传球游戏
  4. Java SpringMVC实现国际化整合案例分析(i18n)
  5. php的一些数组用法
  6. VBA读取word中的内容到Excel中
  7. 【动态规划】HDU 1081 &amp; XMU 1031 To the Max
  8. 用&quot;池&quot;来提升对象的复用
  9. 编码规范系列(二):Eclipse Checkstyle配置
  10. arrayList LinkedList HashMap HashTable的区别
  11. Struts1、WebWork、Struts2介绍
  12. javascript参数传递中处理+号
  13. 使用 /sys 文件系统访问 Linux 内核
  14. c/c++ 多线程 等待一次性事件 future概念
  15. 网站搭建 (第06天) Ckeditor编辑器
  16. Oracle 归档日志文件
  17. SimpleChannelInboundHandler与ChannelInboundHandlerAdapter
  18. Java并发编程三个性质:原子性、可见性、有序性
  19. Windows上的git、github部署及基本使用方法
  20. 51nod 1673 树有几多愁(链表维护树形DP+状压DP)

热门文章

  1. java内存溢出异常
  2. ARKit对安卓的提示 ARKit与Google Tango
  3. 检查路径是否存在与创建指定路径(mfc)
  4. [Selenium With C#基础教程] Lesson-02 Web元素定位
  5. Webservice发布
  6. css选择器与DOM&#39;匹配的关系
  7. window系统JAVA开发环境的搭建
  8. 【题解】 AGC029-A Irreversible operation
  9. 2、Orcal数据库创建第一个(管理员)连接
  10. Docker 网络模式和跨主机通信