同步非阻塞

NIO之所以是同步,是因为它的accept read write方法的内核I/O操作都会阻塞当前线程

IO模型 IO NIO
通信 面向流(Stream Oriented) 面向缓冲区(Buffer Oriented)
处理 阻塞IO(Blocking IO) 非阻塞IO(Non Blocking IO)
触发 (无) 选择器(Selectors)

Channel(通道)

Channel是一个对象,可以通过它读取和写入数据。可以把它看做是IO中的流,不同的是:

    Channel是双向的(NIO面向缓冲区,双向传输),既可以读又可以写,而流是单向的(传统IO操作是面向流,单向传输)

    Channel可以进行异步的读写

    对Channel的读写必须通过buffer对象

    buffer负责存储数据,channel负责传输数据

在Java NIO中的Channel主要有如下几种类型(Java 针对支持通道的类提供了一个 getChannel() 方法):

    FileChannel:从文件读取数据的

    DatagramChannel:读写UDP网络协议数据

    SocketChannel:读写TCP网络协议数据

    ServerSocketChannel:可以监听TCP连接

Buffer(缓冲区)

Buffer是一个对象(实质上是一个数组,通常是一个字节数据):

    它包含一些要写入或者读取Stream对象的,用于读写操作(put():存,get():取)

    它包含四个属性:
capacity(总容量)
limit(当前可用容量)
position(正在操作数据的位置)
mark(标记当前position,可通过reset()恢复mark位置) 使用 Buffer 读写数据一般遵循以下四个步骤: 1.写入数据到 Buffer; 2.调用 flip() 方法(flip() 方法将 Buffer 从写模式切换到读模式); 3.从 Buffer 中读取数据(读完了所有的数据,就需要清空缓冲区); 4.调用 clear() 方法或者 compact() 方法。 有两种方式能清空缓冲区: 调用 clear() 或 compact() 方法。 clear() 方法会清空整个缓冲区。 compact() 方法只会清除已经读过的数据。任何未读的数据都被移到缓冲区的起始处,新写入的数据将放到缓冲区未读数据的后面。 Buffer主要有如下几种: ByteBuffer
CharBuffer
DoubleBuffer
FloatBuffer
IntBuffer
LongBuffer
ShortBuffer

直接缓冲区与非直接缓冲区

非直接缓冲区(直接在堆内存中开辟空间,也就是数组):

    通过allocate()方法分配缓冲区,将缓冲区建立在JVM的内存中。

    在每次调用操作系统的IO之前或者之后,虚拟机都会将缓冲区的内容复制到中间缓冲区(或者从中间缓冲区复制内容),

    缓冲区的内容驻留在JVM内,因此销毁容易,但是占用JVM内存开销,处理过程中有复制操作。

非直接缓冲区的写入步骤:

    创建一个临时的ByteBuffer对象。

    将非直接缓冲区的内容复制到临时缓冲中。

    使用临时缓冲区执行低层次I/O操作。

    临时缓冲区对象离开作用域,并最终成为被回收的无用数据。

直接缓冲区(直接调用了内存页,让操作系统开辟缓存空间):

    通过allocateDirect()方法分配直接缓冲区,将缓冲区建立在物理内存中,减少一次复制过程,可以提高效率。

    虽然直接缓冲可以进行高效的I/O操作,但它使用的内存是操作系统分配的,绕过了JVM堆栈,建立和销毁比堆栈上的缓冲区要更大的开销

简单应用

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel; public class CopyFile {
public static void copyFile(String src, String dst) throws IOException { //源文件输入流
FileInputStream fi = new FileInputStream(new File(src));
//目标文件输出流
FileOutputStream fo = new FileOutputStream(new File(dst)); //获得传输通道channel
FileChannel inChannel = fi.getChannel();
FileChannel outChannel = fo.getChannel(); //获得容器buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (true) {
//判断是否读完文件:阻塞式
int eof = inChannel.read(buffer);
if (eof == -1) {
break;
}
//重设一下buffer的limit=position,position=0
buffer.flip(); //开始写
outChannel.write(buffer); //写完要重置buffer,重设position=0,limit=capacity
buffer.clear();
}
inChannel.close();
outChannel.close();
fi.close();
fo.close();
}
}

分散(Scatter)与聚集(Gather)

分散读取(Scattering Reads):将通道中的数据分散到多个缓冲区中

聚集写入(Gathering Writes):将多个缓冲区中的数据聚集到通道中

    public void rw() throws IOException {
// rw:代表读写模式
RandomAccessFile file = new RandomAccessFile("E:\\小视频.mp4","rw");
//获取通道
FileChannel channel = file.getChannel(); // 分配制定缓冲区
ByteBuffer byteBuffer1 = ByteBuffer.allocate(1024*2);
ByteBuffer byteBuffer2 = ByteBuffer.allocate(1024*2);
ByteBuffer byteBuffer3 = ByteBuffer.allocate(1024*2); // 分散读取
ByteBuffer[] buffers= {byteBuffer1,byteBuffer2,byteBuffer3};
channel.read(buffers); for (ByteBuffer buffer : buffers) {
buffer.flip();
} // 聚集写入
RandomAccessFile file2 = new RandomAccessFile("E:\\我的小视频.mp4","rw");
// 获取通道
FileChannel channel2 = file2.getChannel();
channel2.write(buffers); channel.close();
channel2.close();
}

字符集Charset

设置字符集,解决乱码问题

编码:字符串->字节数组

解码:字节数组->字符串

    public static void test() throws CharacterCodingException {
//获取NIO字符集
Charset cs = Charset.forName("utf-8");
//获取编码器
CharsetEncoder ce = cs.newEncoder();
//获取解码器
CharsetDecoder cd = cs.newDecoder(); //申请1024字节的空间地址
CharBuffer cb = CharBuffer.allocate(1024);
//写入内容
cb.put("ld加油"); //转化为读模式
cb.flip(); //编码
ByteBuffer bB = ce.encode(cb); //编码后的内容
//以什么编码就以什么解码
//GBK一个中文字符占两个字节
//UTF-8一个中文占三个字节
for (int i = 0; i < 8; i++) {
//get()返回字节之后,position会自动加1
//get(index)返回字节后,position并未移动
System.out.println(bB.get());
} //转化为读模式
bB.flip(); //解码
System.out.println(cd.decode(bB));
}

通道到通道传输

toChannel.transferFrom(fromChannel, position, count)方法将数据从fromChannel源通道传输到toChannel目的通道

fromChannel.transferTo(position, count, toChannel)方法将数据从fromChannel源通道传输到toChannel目的通道

        //源文件
RandomAccessFile fromFile = new RandomAccessFile("fromFile.txt", "rw");
FileChannel fromChannel = fromFile.getChannel(); //目标文件
RandomAccessFile toFile = new RandomAccessFile("toFile.txt", "rw");
FileChannel toChannel = toFile.getChannel(); long position = 0;
long count = fromChannel.size(); //传输
toChannel.transferFrom(fromChannel, position, count); fromChannel.transferTo(position, count, toChannel);

NIO的非阻塞方式

Selector(选择器)是非阻塞 IO 的核心:将用于传输的通道全部注册到选择器上

选择器和通道的关系:通道注册到选择器上,选择器监控通道(监控这些通道的IO状况(读,写,连接,接收数据的情况等状况))

当某一通道,某一个事件就绪之后,选择器才会将这个通道分配到服务器端的一个或多个线程上

Selector(选择器)是SelectableChannle(通道)对象的多路复用器,
Selector可以同时监控多个SelectableChannel的 IO 状况

选择器(Selector)

Selector的好处在于:使用更少的线程来就可以来处理通道了,相比使用多个线程,避免了线程上下文切换带来的开销

每次请求到达服务器,都是从connect开始,connect成功后,服务端开始准备accept,准备就绪,开始读数据,并处理,最后写回数据返回

创建 Selector:

	通过调用 Selector.open() 方法创建一个 Selector

向选择器注册通道(register()方法会返回一个SelectionKey对象,称之为键对象):

    SelectionKey key= SelectableChannel.register(Selector sel, int interestSet);

    再次调用:SelectableChannel.register(selector, interestSet);
方法会直接对兴趣进行重新赋值,也就是会覆盖掉之前的兴趣设置 public SelectionKey nioInterestOps(int ops) {
if ((ops & ~channel().validOps()) != 0)
throw new IllegalArgumentException();
interestOps = ops;
selector.setEventOps(this);
return this;
} ServerSocketChannel只有OP_ACCEPT可用,OP_CONNECT,OP_READ,OP_WRITE用于SocketChannel 可以监听的事件类型(interestSet 可使用 SelectionKey 的四个常量表示):
读 : SelectionKey.OP_READ(1)
写 : SelectionKey.OP_WRITE(4)
连接 : SelectionKey.OP_CONNECT(8)
接收 : SelectionKey.OP_ACCEPT(16) 客户端的SocketChannel支持:
OP_CONNECT, OP_READ, OP_WRITE三个操作。 服务端ServerSocketChannel只支持OP_ACCEPT操作:
在服务端由ServerSocketChannel的accept()方法产生的SocketChannel只支持OP_READ, OP_WRITE操作。 如果你对不止一种事件感兴趣,使用或运算符即可,如下: int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE; 方法:
public abstract Set<SelectionKey> keys(); //所有注册的SelectionKeys集合
public abstract Set<SelectionKey> selectedKeys(); //至少有一个感兴趣的事件ready的keys集合
protected final Set<SelectionKey> cancelledKeys(); //被cancel掉的keys以及对应channel被close的keys wakeup():
调用该方法会时,阻塞在select()处的线程会立马返回
(ps:下面一句划重点)
即使当前不存在线程阻塞在select()处,
那么下一个执行select()方法的线程也会立即返回结果,相当于执行了一次selectNow()方法 close():
用完Selector后调用其close()方法会关闭该Selector,
且使注册到该Selector上的所有SelectionKey实例无效。channel本身并不会关闭。 select():
选择已经准备就绪的通道 (这些通道包含你感兴趣的的事件) Selector几个重载的select()方法: int select():阻塞到至少有一个通道在你注册的事件上就绪了。
int select(long timeout):和select()一样,但最长阻塞时间为timeout毫秒。无论是否有读写等事件发生,selector每隔timeout都被唤醒一次
int selectNow():非阻塞,只要有通道就绪就立刻返回。 select()方法返回的int值表示有多少通道已经就绪 一旦调用select()方法,并且返回值不为0时,则可以通过调用Selector的selectedKeys()方法来访问已选择键集合

SelectionKey(键对象)

SelectionKey:表示 SelectableChannel 和 Selector 之间的注册关系:

    SelectionKey对象包含了以下四种属性:

        interest集合(最初,该兴趣集合是通道被注册到Selector时传进来的值):判断Selector是否对Channel的某种事件感兴趣:
int interestSet = selectionKey.interestOps();
boolean isInterestedInAccept = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT; read集合:通道已经就绪的操作的集合
int readSet=selectionKey.readOps();
//等价于selectionKey.readyOps() & SelectionKey.OP_ACCEPT
selectionKey.isAcceptable(); //检测 Channel 中接收是否就绪(server socket channel准备好接收新进入的连接)
//检测 Channel 中连接是否就绪(channel成功连接到一个服务器)(当客户端调用SocketChannel.connect()时,该操作会就绪)
selectionKey.isConnectable();
//检测 Channal 中读事件是否就绪(有数据可读的通道)(当OS的读缓冲区中有数据可读时,该操作就绪)
selectionKey.isReadable();
//检测 Channal 中写事件是否就绪(等待写数据的通道)(当OS的写缓冲区中有空闲的空间时(大部分时候都有),该操作就绪)
selectionKey.isWritable(); SelectableChannel channel():返回该SelectionKey对应的channel通道 Selector selector():返回回该SelectionKey对应的Selector选择器 方法:
public abstract boolean isValid()
判断此密钥是否有效。
密钥在创建时有效并保持不变,直到它被取消,其通道关闭或其选择器关闭。 public abstract void cancel()
请求取消此密钥通道及其选择器的注册。
返回时,密钥将无效,并且将被添加到其选择器的已取消密钥集中。
在下一个选择操作期间,该键将从所有选择器的键集中删除。 //如果try()catch()捕获到该SelectionKey对应的Channel出现了异常,即表明该Channel对应的Client出现了问题
//应从Selector中取消该SelectionKey的注册
key.cancel();
if (key.channel() != null) {
key.channel().close();
} public abstract SelectionKey interestOps(int ops);
直接对兴趣进行重新赋值,也就是会覆盖掉之前的兴趣设置
public SelectionKey interestOps(int ops) {
ensureValid();
if ((ops & ~channel().validOps()) != 0)
throw new IllegalArgumentException();
int oldOps = (int) INTERESTOPS.getAndSet(this, ops);
if (ops != oldOps) {
selector.setEventOps(this);
}
return this;
} 缓冲区绝大部分事件都是有空闲空间的,所以当你注册写事件后,写操作一直就是就绪的,
这样会导致Selector处理线程会占用整个CPU的资源。
所以最佳实践是当你确实有数据写入时再注册OP_WRITE事件,并且在写完以后马上取消注册。 注册OP_WRITE是比较好的做法,注册方式有两种: 直接注册到SocketChannel:
SocketChannel.register(selector, SelectionKey.OP_WRITE),这种方式直接用SocketChannel来写ByteBuffer
SelectonKey方式注册:
SelectionKey.interestOps(SelectionKey.interestOps() | SelectionKey.OP_WRITE) 取消注册:写操作就绪,将之前写入缓冲区的数据写入到Channel,并取消注册
channel.write(writeBuffer);
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); 当read读操作时:
len > 0 网络正常,且收到数据;
len = 0 网络正常,没有收到数据;
len < 0 网络已中断 客户端断开后,需要手动关闭channel:readChannel.close(); read什么时候返回-1 read返回-1说明客户端的数据发送完毕,并且主动的close socket。
所以在这种场景下,(服务器程序)你需要关闭socketChannel并且取消key,最好是退出当前函数。
注意,这个时候服务端要是继续使用该socketChannel进行读操作的话,就会抛出“远程主机强迫关闭一个现有的连接”的IO异常。 read什么时候返回0 一是某一时刻socketChannel中当前(注意是当前)没有数据可以读,这时会返回0, 其次是bytebuffer的position等于limit了,即bytebuffer的remaining等于0, 最后一种情况就是客户端的数据发送完毕了(注意看后面的程序里有这样子的代码),
这个时候客户端想获取服务端的反馈调用了recv函数,若服务端继续read,这个时候就会返回0。

非阻塞客户端

    public static void client() throws IOException, InterruptedException {

        //创建socket连接通道
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 10000));
//切换成非阻塞模式
socketChannel.configureBlocking(false); /**
* 发送数据到服务器
*/
System.out.println("开始发送数据!"); //开辟缓存区
ByteBuffer clientBuffer = ByteBuffer.allocate(1024);
String inputStr = "你好,明天!";
clientBuffer.put((new Date().toString() + "---" + inputStr + "\r\n").getBytes());
clientBuffer.flip();
socketChannel.write(clientBuffer);
clientBuffer.clear(); Thread.sleep(1000); /**
* 从服务端接收数据
*/
System.out.println("从服务端接收数据!"); ByteBuffer buffer = ByteBuffer.allocate(1024);
StringBuffer buf = new StringBuffer();
int count = 0;
while ((count = socketChannel.read(buffer)) > 0) {
byte[] array = buffer.array();
buffer.clear();
buf.append(new String(array, 0, count));
}
if (buf.length() > 0) {
System.out.println(buf.toString());
} /**
* 关闭客户端管道
*/
System.out.println("关闭客户端!");
socketChannel.close();
}

非阻塞服务端

    public static void server() throws IOException {
//ServerSocketChannel是一个可以监听新进来的TCP连接的通道,就像标准IO中的ServerSocket一样
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
//绑定连接
serverSocketChannel.bind(new InetSocketAddress(10000)); //选择器
Selector selector = Selector.open();
//将通道注册到选择器上,并制定监听事件:服务端首先要监听客户端的接受状态
SelectionKey register = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //阻塞式,没有就行就阻塞在这
while (selector.select() > 0) { System.out.println("selector.select()轮询"); //获取已经就绪的监听事件
Iterator<SelectionKey> selectorIterator = selector.selectedKeys().iterator(); while (selectorIterator.hasNext()) {
// 获取准备就绪的事件
SelectionKey key = selectorIterator.next();
SocketChannel socketChannel = null; if (key.isAcceptable()) {
//获取服务端通道
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
//接受成功连接到服务器的channel,则获取客户端连接
socketChannel = serverChannel.accept();
socketChannel.configureBlocking(false); //将该客户端通道注册到选择器上,监控客户端socketChannel的“读就绪”事件
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isValid() && key.isReadable()) {
SocketChannel readChannel = null;
FileChannel outputChannel = null;
try { System.out.println("开始读!"); //获取当前选择器上“读就绪”状态的通道
readChannel = (SocketChannel) key.channel();
readChannel.configureBlocking(false); //创建文件输出管道:FileChannel不能使用非阻塞模式
//从服务端接收文件,并将文件写到本地(写方式,如果文件不存在就创建)
outputChannel = FileChannel.open(Paths.get("E:\\小视频.txt"), StandardOpenOption.WRITE, StandardOpenOption.CREATE); //创建缓冲区,进行读写操作
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
while ((readChannel.read(readBuffer)) > 0) {
readBuffer.flip();
outputChannel.write(readBuffer);
readBuffer.clear();
} System.out.println("读完了,加入写兴趣!"); key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); if ((readChannel.read(readBuffer)) == -1) { System.out.println("客户端断开连接!"); key.cancel();
try {
readChannel.close();
outputChannel.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
} catch (IOException e) { System.out.println("异常关闭!"); //当客户端关闭channel时,服务端再往通道缓冲区中写或读数据,都会报IOException
//解决方法是:在服务端这里捕获掉这个异常,并且关闭掉服务端这边的Channel通道
key.cancel();
try {
readChannel.close();
outputChannel.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
} else if (key.isValid() && key.isWritable()) {
try { System.out.println("开始写!"); SocketChannel channel = (SocketChannel) key.channel();
channel.write(ByteBuffer.wrap("收到!".getBytes())); System.out.println("写完了,取消写兴趣!"); key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* selector不会自己删除selectedKeys()集合中的selectionKey,
* 如果不人工remove(),
* 将导致下次select()的时候selectedKeys()中仍有上次轮询留下来的信息,这样必然会出现错误
* 应此当某个消息被处理后我们需要从该集合里去掉
*/
selectorIterator.remove();
}
}

管道Pipe

Java NIO 管道是2个线程之间的单向数据连接。Pipe有一个sink通道和一个source通道。

数据会写到sink通道,从source通道读取,

打开管道:Pipe pipe = Pipe.open();

public class Pipe {
public static void main(String[] args) throws IOException {
// 1. 获取管道
Pipe pipe = Pipe.open(); // 2. 将缓冲区数据写入到管道 // 2.1 获取一个通道
Pipe.SinkChannel sinkChannel = pipe.sink(); // 2.2 定义缓冲区
ByteBuffer buffer = ByteBuffer.allocate(48);
buffer.put("发送数据".getBytes());
buffer.flip(); //切换数据模式
// 2.3 将数据写入到管道
sinkChannel.write(buffer); // 3. 从管道读取数据
Pipe.SourceChannel sourceChannel = pipe.source();
buffer.flip();
int len = sourceChannel.read(buffer);
System.out.println(new String(buffer.array(),0,len)); // 4. 关闭管道
sinkChannel.close();
sourceChannel.close();
}
}

最新文章

  1. qq
  2. 重装Ubuntu16.04及安装theano
  3. java提高篇之理解java的三大特性——多态
  4. IRedisClient 常用方法说明
  5. js本地图片预览代码兼容所有浏览器
  6. Spring+MyBatis实践—登录和权限控制
  7. The Angles of a Triangle
  8. 使用pillow生成分享图片
  9. SignalR在ASP.NET MVC中的应用
  10. DAY5 基本数据类型及内置方法
  11. Java - Tips
  12. Doris FE负载均衡配置
  13. Linux修改主机名【转】
  14. CentOS下Storm 1.0.0集群安装具体解释
  15. avast! 2014正式版下载
  16. python 将歌词解析封装成类,要求:提供一个方法(根据时间返回歌词) - 提示:封装两个类:歌词类、歌词管理类
  17. javaScript动画2 scroll家族
  18. SQL优化例子
  19. (转)js原生自定义事件的触发dispatchEvent
  20. MSCN(Mean Subtracted Contrast Normalized)系数的直方图

热门文章

  1. 怎样有效防止ddos
  2. Zabbix-1.8.14 安装
  3. docker启动elasticsearch异常Failed to create node environment(解决)
  4. 关于h5打包后 wag包无法安装的问题
  5. Jenkins 搭建 .NET Core 持续集成环境
  6. Download ubuntu Linux
  7. Zookeeper的单机&amp;集群环境搭建
  8. springcloud分布式事务TXLCN
  9. CUDA并行计算 | 线程模型与内存模型
  10. C++ 将汉字转换成拼音全拼【转载】