NIO 源码分析(02-2) BIO 源码分析 Socket

Netty 系列目录(https://www.cnblogs.com/binarylei/p/10117436.html)

在上一篇文章中详细分析了 ServerSocket 的源码,Socket 和 ServerSocket 一样也只是一个门面模式,真正的实现也是 SocksSocketImpl,所以关于 setImpl、createImpl、new、bind、listen 都是类似的,本文重点关注其 connect 和 IO 流的读取方法。

一、BIO 最简使用姿势

//1. 连接服务器
Socket socket = new Socket();
socket.connect(new InetSocketAddress(HOST, PORT), 0);
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriterout = new PrintWriter(socket.getOutputStream(), true); //2. 发送请求数据
out.println("客户端发送请求数据..."); //3. 接收服务端数据
String response = in.readLine();
System.out.println("Client: " + response);

ok,代码已经完成!!!下面的源码分析都会基于这个 demo。

二、connect 方法

2.1 Socket.connect 方法

// timeout=0 表示永久阻塞,timeout>0 则指定超时时间
public void connect(SocketAddress endpoint, int timeout) throws IOException { InetSocketAddress epoint = (InetSocketAddress) endpoint;
InetAddress addr = epoint.getAddress ();
int port = epoint.getPort(); // 1. 创建底层 socket 套接字
if (!created)
createImpl(true); // 2. oldImpl 默认为 false,也就是进入第一个 if 条件
// checkOldImpl 会判断 impl 中有没有 connect(SocketAddress address, int port) 方法
// 来设置 oldImpl 的值
if (!oldImpl)
impl.connect(epoint, timeout);
else if (timeout == 0) {
if (epoint.isUnresolved())
impl.connect(addr.getHostName(), port);
else
impl.connect(addr, port);
} else
throw new UnsupportedOperationException("SocketImpl.connect(addr, timeout)");
connected = true;
bound = true;
}

总结: Socket 首先和 ServerSocket 一样调用 createImpl 创建底层 socket 对象,然后委托给 impl 完成连接操作

2.2 AbstractPlainSocketImpl.connect 方法

protected void connect(SocketAddress address, int timeout) throws IOException {
boolean connected = false;
try {
InetSocketAddress addr = (InetSocketAddress) address;
this.port = addr.getPort();
this.address = addr.getAddress(); connectToAddress(this.address, port, timeout);
connected = true;
} finally {
if (!connected) {
close();
}
}
} private void connectToAddress(InetAddress address, int port, int timeout) throws IOException {
if (address.isAnyLocalAddress()) {
doConnect(InetAddress.getLocalHost(), port, timeout);
} else {
doConnect(address, port, timeout);
}
}

总结: connect 将连接具体由 doConnect 完成

synchronized void doConnect(InetAddress address, int port, int timeout) throws IOException {
synchronized (fdLock) {
if (!closePending && (socket == null || !socket.isBound())) {
NetHooks.beforeTcpConnect(fd, address, port);
}
}
try {
acquireFD();
try {
socketConnect(address, port, timeout);
/* socket may have been closed during poll/select */
synchronized (fdLock) {
if (closePending) {
throw new SocketException ("Socket closed");
}
}
if (socket != null) {
socket.setBound();
socket.setConnected();
}
} finally {
releaseFD();
}
} catch (IOException e) {
close();
throw e;
}
}

2.3 DualStackPlainSocketImpl.socketConnect 方法

void socketConnect(InetAddress address, int port, int timeout)
throws IOException {
int nativefd = checkAndReturnNativeFD(); if (address == null)
throw new NullPointerException("inet address argument is null."); int connectResult;
if (timeout <= 0) {
connectResult = connect0(nativefd, address, port);
} else {
configureBlocking(nativefd, false);
try {
connectResult = connect0(nativefd, address, port);
if (connectResult == WOULDBLOCK) {
waitForConnect(nativefd, timeout);
}
} finally {
configureBlocking(nativefd, true);
}
} if (localport == 0)
localport = localPort0(nativefd);
}
补充1:connect0 在 JVM 中的实现
JNIEXPORT jint JNICALL Java_java_net_DualStackPlainSocketImpl_connect0
(JNIEnv *env, jclass clazz, jint fd, jobject iaObj, jint port) {
SOCKETADDRESS sa;
int rv;
int sa_len = sizeof(sa); if (NET_InetAddressToSockaddr(env, iaObj, port, (struct sockaddr *)&sa,
&sa_len, JNI_TRUE) != 0) {
return -1;
} rv = connect(fd, (struct sockaddr *)&sa, sa_len);
if (rv == SOCKET_ERROR) {
int err = WSAGetLastError();
if (err == WSAEWOULDBLOCK) {
return java_net_DualStackPlainSocketImpl_WOULDBLOCK;
} else if (err == WSAEADDRNOTAVAIL) {
JNU_ThrowByName(env, JNU_JAVANETPKG "ConnectException",
"connect: Address is invalid on local machine, or port is not valid on remote machine");
} else {
NET_ThrowNew(env, err, "connect");
}
return -1; // return value not important.
}
return rv;
}

总结: rv = connect(fd, (struct sockaddr *)&sa, sa_len) 建立远程连接。

补充2:waitForConnect 在 JVM 中的实现

和 ServerSocket.waitForNewConnection 一样,也是通过 Winsock 库的 select 函数来实现超时的功能。

JNIEXPORT void JNICALL Java_java_net_DualStackPlainSocketImpl_waitForConnect
(JNIEnv *env, jclass clazz, jint fd, jint timeout) {
int rv, retry;
int optlen = sizeof(rv);
fd_set wr, ex;
struct timeval t; FD_ZERO(&wr);
FD_ZERO(&ex);
FD_SET(fd, &wr);
FD_SET(fd, &ex);
t.tv_sec = timeout / 1000;
t.tv_usec = (timeout % 1000) * 1000; rv = select(fd+1, 0, &wr, &ex, &t);
if (rv == 0) {
JNU_ThrowByName(env, JNU_JAVANETPKG "SocketTimeoutException",
"connect timed out");
shutdown( fd, SD_BOTH );
return;
}
if (!FD_ISSET(fd, &ex)) {
return; /* connection established */
} for (retry=0; retry<3; retry++) {
NET_GetSockOpt(fd, SOL_SOCKET, SO_ERROR,
(char*)&rv, &optlen);
if (rv) {
break;
}
Sleep(0);
} if (rv == 0) {
JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException",
"Unable to establish connection");
} else {
NET_ThrowNew(env, rv, "connect");
}
}

总结: rv = select(fd+1, 0, &wr, &ex, &t) 轮询会阻塞程序。

三、SocketInputStream

3.1 构造方法

SocketInputStream(AbstractPlainSocketImpl impl) throws IOException {
super(impl.getFileDescriptor());
this.impl = impl;
socket = impl.getSocket();
}

总结: SocketInputStream 内部实现上也是对 impl 的封装。SocketInputStream.read 其实也是调用底层 socket 的 read 方法。

3.2 read 方法

int read(byte b[], int off, int length, int timeout) throws IOException {
int n; // EOF already encountered
if (eof) {
return -1;
} // connection reset
if (impl.isConnectionReset()) {
throw new SocketException("Connection reset");
} // bounds check
if (length <= 0 || off < 0 || off + length > b.length) {
if (length == 0) {
return 0;
}
throw new ArrayIndexOutOfBoundsException();
} boolean gotReset = false; // acquire file descriptor and do the read
FileDescriptor fd = impl.acquireFD();
try {
n = socketRead(fd, b, off, length, timeout);
if (n > 0) {
return n;
}
} catch (ConnectionResetException rstExc) {
gotReset = true;
} finally {
impl.releaseFD();
} /*
* We receive a "connection reset" but there may be bytes still
* buffered on the socket
*/
if (gotReset) {
impl.setConnectionResetPending();
impl.acquireFD();
try {
n = socketRead(fd, b, off, length, timeout);
if (n > 0) {
return n;
}
} catch (ConnectionResetException rstExc) {
} finally {
impl.releaseFD();
}
} /*
* If we get here we are at EOF, the socket has been closed,
* or the connection has been reset.
*/
if (impl.isClosedOrPending()) {
throw new SocketException("Socket closed");
}
if (impl.isConnectionResetPending()) {
impl.setConnectionReset();
}
if (impl.isConnectionReset()) {
throw new SocketException("Connection reset");
}
eof = true;
return -1;
} private int socketRead(FileDescriptor fd, byte b[], int off, int len,
int timeout) throws IOException {
return socketRead0(fd, b, off, len, timeout);
}
补充2:socketRead0 在 JVM 中的实现
// src/windows/native/java/net/SocketInputStream.c
JNIEXPORT jint JNICALL Java_java_net_SocketInputStream_socketRead0(JNIEnv *env, jobject this,
jobject fdObj, jbyteArray data, jint off, jint len, jint timeout) {
char *bufP;
char BUF[MAX_BUFFER_LEN];
jint fd, newfd;
jint nread; if (IS_NULL(fdObj)) {
JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException", "socket closed");
return -1;
}
fd = (*env)->GetIntField(env, fdObj, IO_fd_fdID);
if (fd == -1) {
NET_ThrowSocketException(env, "Socket closed");
return -1;
} /*
* If the caller buffer is large than our stack buffer then we allocate
* from the heap (up to a limit). If memory is exhausted we always use
* the stack buffer.
*/
if (len <= MAX_BUFFER_LEN) {
bufP = BUF;
} else {
if (len > MAX_HEAP_BUFFER_LEN) {
len = MAX_HEAP_BUFFER_LEN;
}
bufP = (char *)malloc((size_t)len);
if (bufP == NULL) {
/* allocation failed so use stack buffer */
bufP = BUF;
len = MAX_BUFFER_LEN;
}
} if (timeout) {
if (timeout <= 5000 || !isRcvTimeoutSupported) {
int ret = NET_Timeout (fd, timeout); if (ret <= 0) {
if (ret == 0) {
JNU_ThrowByName(env, JNU_JAVANETPKG "SocketTimeoutException",
"Read timed out");
} else if (ret == JVM_IO_ERR) {
JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException", "socket closed");
} else if (ret == JVM_IO_INTR) {
JNU_ThrowByName(env, JNU_JAVAIOPKG "InterruptedIOException",
"Operation interrupted");
}
if (bufP != BUF) {
free(bufP);
}
return -1;
} /*check if the socket has been closed while we were in timeout*/
newfd = (*env)->GetIntField(env, fdObj, IO_fd_fdID);
if (newfd == -1) {
NET_ThrowSocketException(env, "Socket Closed");
if (bufP != BUF) {
free(bufP);
}
return -1;
}
}
} // 最关键的代码,recv 从 socketfd 中读取数据
nread = recv(fd, bufP, len, 0); if (nread > 0) {
(*env)->SetByteArrayRegion(env, data, off, nread, (jbyte *)bufP);
} else {
if (nread < 0) {
// Check if the socket has been closed since we last checked.
// This could be a reason for recv failing.
if ((*env)->GetIntField(env, fdObj, IO_fd_fdID) == -1) {
NET_ThrowSocketException(env, "Socket closed");
} else {
switch (WSAGetLastError()) {
case WSAEINTR:
JNU_ThrowByName(env, JNU_JAVANETPKG "SocketException",
"socket closed");
break;
case WSAECONNRESET:
case WSAESHUTDOWN:
/*
* Connection has been reset - Windows sometimes reports
* the reset as a shutdown error.
*/
JNU_ThrowByName(env, "sun/net/ConnectionResetException", "");
break;
case WSAETIMEDOUT :
JNU_ThrowByName(env, JNU_JAVANETPKG "SocketTimeoutException", "Read timed out");
break;
default:
NET_ThrowCurrent(env, "recv failed");
}
}
}
}
if (bufP != BUF) {
free(bufP);
}
return nread;
}

总结: socketRead0 实现很长,其实我们只用关注核心的实现 nread = recv(fd, bufP, len, 0); 即可,毕竟我们不是专门做 c++。

四、SocketInputStream

和 SocketInputStream 类似,就不继续分析了。


每天用心记录一点点。内容也许不重要,但习惯很重要!

最新文章

  1. MySQL学习(二)SQL语句的总结
  2. oracle 32位导64位
  3. web初学之重定向与请求转发
  4. 开源安卓Http文件下载框架file-downloader的使用
  5. 低信噪比的HTML5优化
  6. 关于python性能提升的一些方案(上)
  7. [VIM] 格式化代码
  8. asp.net(c#)修改时的赋值操作
  9. 分布式数据存储 - Zabbix监控MySQL性能
  10. sqlserver2008中如何用右键可视化的设置外键
  11. android bindService()
  12. 快速搭建Android 开发环境-使用ADT Bundle
  13. jsp验证码 (通过单击验证码或超链接换验证码)
  14. [POJ 2115} C Looooops 题解(扩展欧几里德)
  15. static,final,包,访问修饰符,内部类
  16. ------- 软件调试——挫败 QQ.exe 的内核模式保护机制 -------
  17. [TCP/IP] 数据链路层-ethereal 抓包分析数据帧
  18. Java复数的加乘除运算
  19. shell脚本学习-分支结构
  20. 安卓开发_浅谈Notification(通知栏)

热门文章

  1. Portal for ArcGIS启动失败,无法访问任何门户计算机,请联系您的系统管理员。
  2. PAT甲级——A1151 LCA_in_a_BinaryTree【30】
  3. 类的私有private构造函数 ,为什么要这样做
  4. Java实验报告(三)&amp;第五周课程总结
  5. 在不打开excel的情况下用python执行excel
  6. Excel解析工具POI
  7. 项目中AOP的实例应用
  8. linux终端命令行缩短显示路径
  9. sql中char,varchar,nvarchar的区别
  10. python_django_分页