Linux 网络编程的5种IO模型:异步IO模型

资料已经整理好,但是还有未竟之业:复习多路复用epoll 阅读例程, 异步IO 函数实现

背景

上一讲《 Linux 网络编程的5种IO模型:信号驱动IO模型 》我们已经介绍了信号驱动模型,以及带有BUG的例程。

前面四种IO模型实际上都属于同步IO,只有最后一种是真正的异步IO,因为无论是多路复用IO还是信号驱动模型,IO操作的第2个阶段都会引起用户线程阻塞,也就是内核进行数据拷贝的过程都会让用户线程阻塞。

这一讲我们来介绍最后一种IO模型。

导言

两种高性能IO设计模式

在传统的网络服务设计模式中,有两种比较经典的模式:多线程,与 线程池。

多线程

对于多线程模式,也就说来了client,服务器就会新建一个线程来处理该client的读写事件,如下图所示:

这种模式虽然处理起来简单方便,但是由于服务器为每个client的连接都采用一个线程去处理,使得资源占用非常大。因此,当连接数量达到上限时,再有用户请求连接,直接会导致资源瓶颈,严重的可能会直接导致服务器崩溃。

线程池

因此,为了解决这种一个线程对应一个客户端模式带来的问题,提出了采用线程池的方式,也就说创建一个固定大小的线程池,来一个客户端,就从线程池取一个空闲线程来处理,当客户端处理完读写操作之后,就交出对线程的占用。因此这样就避免为每一个客户端都要创建线程带来的资源浪费,使得线程可以重用。

但是线程池也有它的弊端,如果连接大多是长连接,因此可能会导致在一段时间内,线程池中的线程都被占用,那么当再有用户请求连接时,由于没有可用的空闲线程来处理,就会导致客户端连接失败,从而影响用户体验。因此,线程池比较适合大量的短连接应用。

高性能IO模型

因此便出现了下面的两种高性能IO设计模式:Reactor和Proactor。

Reactor

在Reactor模式中,会先对每个client注册感兴趣的事件,然后有一个线程专门去轮询每个client是否有事件发生,当有事件发生时,便顺序处理每个事件,当所有事件处理完之后,便再转去继续轮询,如下图所示:

从这里可以看出,多路复用IO就是采用Reactor模式。

注意,上面的图中展示的 是顺序处理每个事件,当然为了提高事件处理速度,可以通过多线程或者线程池的方式来处理事件。

Proactor

在Proactor模式中:当检测到有事件发生时,会新起一个异步操作,然后交由内核线程去处理,当内核线程完成IO操作之后,发送一个通知告知操作已完成;可以得知,异步IO模型采用的就是Proactor模式。

Linux异步IO模型与有关函数

异步IO模型是比较理想的IO模型,在异步IO模型中,当用户线程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从内核的角度,当它受到一个asynchronous read之后,它会立刻返回,说明read请求已经成功发起了,因此不会对用户线程产生任何block。然后,内核会等待数据准备完成,然后将数据拷贝到用户线程,当这一切都完成之后,内核会给用户线程发送一个信号,告诉它read操作完成了。也就说用户线程完全不需要关心实际的整个IO操作是如何进行的,只需要先发起一个请求,当接收内核返回的成功信号时表示IO操作已经完成,可以直接去使用数据了。

也就说在异步IO模型中,IO操作的两个阶段都不会阻塞用户线程,这两个阶段都是由内核自动完成,然后发送一个信号告知用户线程操作已完成。用户线程中不需要再次调用IO函数进行具体的读写。

这点是和信号驱动模型有所不同的,在信号驱动模型中,当用户线程接收到信号表示数据已经就绪,然后需要用户线程调用IO函数进行实际的读写操作;而在异步IO模型中,收到信号表示IO操作已经完成,不需要再在用户线程中调用iO函数进行实际的读写操作。

%% 时序图
sequenceDiagram
title : 异步IO模型
participant application
participant kernel

Note right of application: 应用程序调用系统调用

application ->> kernel: aio_read
kernel ->> application: 返回

kernel ->> kernel: 准备好数据,拷贝到用户空间

kernel ->> application: 递交到aio_read指定的信号中
application ->> application : 信号处理

#include <aio.h>

int aio_read(struct aiocb *__aiocbp);
int aio_write(struct aiocb *__aiocbp); Link with -lrt. /* 有关结构体 ,能够使用的成员已经标出 */
struct aiocb
{
▲ int aio_fildes; /* 对哪个文件进行读写. */
▲ int aio_lio_opcode; /* 要执行的操作 */
int aio_reqprio; /* Request priority offset. */
▲ volatile void *aio_buf; /* 读写用的buffer */
▲ size_t aio_nbytes; /* Length of transfer. */
▲ struct sigevent aio_sigevent; /* 告诉 AIO 在 I/O 操作完成时应该执行什么操作。 */ /* Internal members. */
struct aiocb *__next_prio;
int __abs_prio;
int __policy;
int __error_code;
__ssize_t __return_value; #ifndef __USE_FILE_OFFSET64 // 针对大文件的支持
▲ __off_t aio_offset; /* 在传统的 read 调用中,偏移量是在文件描述符上下文中进行维护的, */
char __pad[sizeof (__off64_t) - sizeof (__off_t)];
#else
▲ __off64_t aio_offset; /* 对于异步 I/O 操作来说这是不可能的,因为我们可以同时执行很多读请求,因此必须为每个特定的读请求都指定偏移量。 */
#endif
char __glibc_reserved[32];
}; struct sigevent {
int sigev_notify; /* 通知方式:为SIGEV_NONE、SIGEV_SIGNAL、SIGEV_THREAD、SIGEV_THREAD_ID(只针对linux)当中的一个; */
int sigev_signo; /* 为signal的值,当sigev_notify为SIGEV_SIGNAL时,会将这个signal发送给进程; */
union sigval sigev_value; /* 信号传递的数据 */
void (*sigev_notify_function) (union sigval);/* 当sigev_notify为SIGEV_THREAD时,处理线程将调用这个处理函数 (SIGEV_THREAD) */
void *sigev_notify_attributes;/* sigev_notify_function的参数 (SIGEV_THREAD) */
pid_t sigev_notify_thread_id; /* 当sigev_notify为SIGEV_THREAD_ID时的处理线程ID (SIGEV_THREAD_ID) */
}; union sigval { /*传递的参数*/
int sival_int; /* 信号机制传递的参数 */
void *sival_ptr; /* 若是线程机制传递的参数 */
}; // 什么时候使用 AIO ?了解 AIO 机制之后,不难发现, AIO 其实是用于解决大量 IO 并发操作而出现的,牺牲一些信号处理耗时,用多线程的方式加速 IO ,提高并行效率。
函数 作用
aio_read 请求异步读操作
aio_error 检查异步请求的状态
aio_return 获得完成的异步请求的返回状态
aio_write 请求异步写操作
aio_suspend 挂起调用进程,直到一个或多个异步请求已经完成(或失败)
aio_cancel 取消异步 I/O 请求
aio_fsync 强制同步
lio_listio 发起一系列 I/O 操作

aio_read

#include <aio.h>
int aio_read( struct aiocb *aiocbp );

描述: 请求一个异步写操作。

返回值:成功返回值 0;出错返回值 -1,并设置 errno的值。

aio_read 例子

#include <unistd.h>
#include <stdio.h>
#include <aio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <errno.h>
#include <sys/timeb.h> #define BUFFER_SIZE 1024*1024 void ptime(const char* tip){
struct timeb tb;
ftime(&tb);
fprintf(stdout, "%s %u : %u\n", tip, tb.time, tb.millitm);
} int main(){
/* 句柄,返回码 */
int fd = -1, ret = -1; fd = open("./file.txt", O_RDONLY);
if(fd <= 0){
fprintf(stderr, "open file errro: %s\n", strerror(errno));
return -1;
} /* aio控制结构 */
aiocb my_aiocb;
memset(&my_aiocb, 0, sizeof(my_aiocb)); /* 初始化 */
my_aiocb.aio_fildes = fd;
my_aiocb.aio_reqprio = 0;
my_aiocb.aio_nbytes = BUFFER_SIZE;
char buf[BUFFER_SIZE + 1] = {0};
my_aiocb.aio_buf = buf; ptime("start read"); /* aio 读 */
ret = aio_read(&my_aiocb);
if(ret < 0){
fprintf(stderr, "aio read error: %s\n", strerror(errno));
return -2;
} ptime("reading"); /* 检查状态 */
while(aio_error(&my_aiocb) == EINPROGRESS);
/* (这种做法不是最有效的,只是为了演示aio_error如何使用)可以调用 aio_error 来确定 aio_read 的状态。只要状态是 EINPROGRESS,就一直忙碌等待,直到状态发生变化为止。请求可能成功,也可能失败。*/ ptime("after read"); if ((ret = aio_return( &my_iocb )) > 0) {
/* got ret bytes on the read */
fprintf(stdout, "read: %10.10s\n", my_aiocb.aio_buf);
} else {
/* read failed, consult errno */
fprintf(stderr, "return: %d\n", ret);
} close(fd); return 0;
}

aio_error

int aio_error( struct aiocb *aiocbp );

描述:用来确定请求的状态。

返回值

  • EINPROGRESS,说明请求尚未完成
  • ECANCELLED,说明请求被应用程序取消了
  • -1,说明发生了错误,具体错误原因可以查阅 errno

aio_return

ssize_t aio_return( struct aiocb *aiocbp );

描述:获得完成的异步请求的返回状态。

异步 I/O 和标准 I/O 之间的另外一个区别是我们不能立即访问这个函数的返回状态,因为我们并没有阻塞在 read 调用上。在标准的 read 调用中,返回状态是在该函数返回时提供的。但是在异步 I/O 中, 我们要使用 aio_return 函数。

只有在 aio_error 调用确定请求已经完成(可能成功,也可能发生了错误)之后,才会调用这个函数。

返回值:所传输的字节数,如果出错,返回 -1(等价于 readwrite 系统调用的返回值)。

aio_write

int aio_write( struct aiocb *aiocbp );

描述: 请求一个异步写操作。

aio_write 函数会立即返回,说明请求已经进行排队(成功时返回值为 0,失败时返回值为 -1, 并相应地设置 errno)。

这与 aio_read 类似,但是在偏移量上有一点不一样:对于write 来说,这个偏移量只有在没有设置 O_APPEND 选项的文件上下文中才会非常重要。

如果设置了 O_APPEND,那么这个偏移量就会被忽略,数据都会被附加到文件的末尾。否则,aio_offset 域就确定了数据在要写入的文件中的偏移量。

aio_suspend

int aio_suspend( const struct aiocb *const aiocb_list[],
int nitems, const struct timespec *timeout );

描述:挂起(或阻塞)调用进程,直到以下情况发生:

  • 一个或多个处于 aiocb_list中的异步请求完成
  • 有信号递达
  • 调用时指定的时间已到,发生超时

调用者提供了一个 aiocb 引用列表,其中任何一个完成都会导致 aio_suspend 返回。

参数解析:

cblist:一组异步IO请求 (aiocb_list中任何 NULL 元素都会被忽略)

nitems:该组的成员数量

timeout:超时时间,NULL代表永远阻塞

返回值:成功返回0;失败返回-1,设置errno:

EAGAIN :超时,希望程序重试。

EINTR : 被信号中断(也有可能是等待的某个操作的完成信号)

ENOSYS :这个功能未被当前系统支持(未实现)

aio_suspend 例程

使用非常简单。我们要提供一个 aiocb 引用列表。

...

struct aioct *cblist[MAX_LIST];

/* Clear the list. */
bzero( (char *)cblist, sizeof(cblist) ); /* Load one or more references into the list */
cblist[0] = &my_aiocb;
...
for(i = 0; i < ..; i++)
{ }
ret = aio_read( &my_aiocb );
... ret = aio_suspend(cblist, MAX_LIST, NULL ); ...

aio_cancel

int aio_cancel( int fd, struct aiocb *aiocbp);

描述:允许我们取消对某个文件描述符执行的一个或所有 I/O 请求。

参数解析:

fd : 与读写请求有关的文件描述符

aiocbp:读写请求(为NULL时,取消所有请求)

返回值:成功取消返回AIO_CANCELED,请求被完成时返回AIO_NOTCANCELED

要取消对某个给定文件描述符的所有请求,我们需要提供这个文件的描述符,以及一个对 aiocbpNULL 引用。

  • 如果所有的请求都取消了,这个函数就会返回 AIO_CANCELED

  • 如果至少有一个请求没有被取消,那么这个函数就会返回 AIO_NOT_CANCELED

  • 如果没有一个请求可以被取消,那么这个函数就会返回 AIO_ALLDONE

可以使用 aio_error 来验证每个 AIO 请求。如果这个请求已经被取消了,那么 aio_error就会返回 -1,并且 errno 会被设置为 ECANCELED

aio_fsync

int aio_fsync(int op, struct aiocb *aiocbp);

描述: 在AIO是交给其他线程来完成的,如果需要手动执行同步,则需要调用这个函数 。 函数执行时,将强制完成该AIO上的所有操作。 与一般文件IO的fsync用法基本一致。

如果想要所有等待的异步操作不等待而写入持久化的存储中,可以设立一个AIO控制板并调用该函数。

aio_read、aio_write函数会进行数据的缓冲。使用了aio_fsync就不必再去使用aio_read和aio_write了

参数解析:

op: operation为操作码

  • O_SYNC : 同步异步IO数据,当前所有IO操作均将完成
  • O_DSYNC:同步一个IO请求,并不等待所有的IO完成 (相当于调用fdatasync函数 )

aiocbp:异步请求

lio_listio

int lio_listio( int mode, struct aiocb *aiocb_list[], int nitems,
struct sigevent *sevp); #include <signal.h> union sigval { /*传递的参数*/
int sival_int; /* 信号机制传递的参数 */
void *sival_ptr; /* 若是线程机制传递的参数 */
}; struct sigevent {
int sigev_notify; /* 设置通知机制方法,线程为SIGEV_THREAD,信号为SIGEV_SIGNAL*/
int sigev_signo; /* 若是信号机制,该参数设置为触发的信号 */
union sigval sigev_value;/* 传递的参数*/
void (*sigev_notify_function)(union sigval);
/* 若是线程机制,该参数为线程函数*/
void *sigev_notify_attributes;
/* 线程函数的属性 */
};

描述:同时发起多个传输。

意味着我们可以在一个系统调用(一次内核上下文切换)中启动大量的 I/O 操作。从性能的角度来看,大大提高了效率。

参数解析:

mode:

  • LIO_WAIT:阻塞这个调用,直到所有的 I/O 都完成为止。
  • LIO_NOWAIT:操作进行排队之后,立即返回。

list:一组异步IO请求 (aiocb_list中任何 NULL 元素都会被忽略)

nitems:请求的个数

sigevent:在所有 I/O 操作都完成时产生信号的方法。

注意:

对于 lio_listio 的请求与传统的 readwrite 请求在必须指定的操作方面稍有不同。

  • 对于读操作来说,aio_lio_opcode 域的值为 LIO_READ

  • 对于写操作来说,我们要使用 LIO_WRITE

  • 允许 LIO_NOP (不执行)

lio_listio 例程

struct aiocb aiocb1, aiocb2;
struct aiocb *list[MAX_LIST]; ... /* Prepare the first aiocb */
aiocb1.aio_fildes = fd;
aiocb1.aio_buf = malloc( BUFSIZE+1 );
aiocb1.aio_nbytes = BUFSIZE;
aiocb1.aio_offset = next_offset;
aiocb1.aio_lio_opcode = LIO_READ; ... bzero( (char *)list, sizeof(list) );
list[0] = &aiocb1;
list[1] = &aiocb2; ret = lio_listio( LIO_WAIT, list, MAX_LIST, NULL );

AIO例程

现在我们已经了解了有关的 AIO 函数。

接下来,我们将通过信号(signal)函数回调(callback)来探索异步函数的通知机制。

使用信号进行异步通知

使用信号进行进程间通信(IPC)是 UNIX 中的一种传统机制,AIO 也可以支持这种机制。在这种范例中, 应用程序需要定义信号处理程序,在产生指定的信号时就会调用这个处理程序。应用程序然后配置一个异步请求将在请求完成时产生一个信号。作为信号上下文的一部分,特定的aiocb 请求被提供用来记录多个可能会出现的请求。

/*
我们在 aio_completion_handler 函数中设置信号处理程序来捕获 SIGIO 信号。 然后
- 初始化 aio_sigevent 结构产生 SIGIO 信号来进行通知,
- 指定aio_sigevent.sigev_notify使用信号方式,
- 指定aio_sigevent.sigev_signo使用的信号。 当读操作完成时,信号处理程序就从该信号的 si_value 结构中提取出 aiocb,并检查错误状态和返回状态来确定 I/O 操作是否完成。 对于性能来说,这个处理程序也是通过请求下一次异步传输而继续进行 I/O 操作的理想地方。采用这种方式,在一次数据传输完成时,我们就可以立即开始下一次数据传输操作。
*/
void setup_io( ... )
{
int fd;
struct sigaction sig_act;
struct aiocb my_aiocb; ... /* Set up the signal handler */
sigemptyset(&sig_act.sa_mask);
sig_act.sa_flags = SA_SIGINFO;
sig_act.sa_sigaction = aio_completion_handler; /* Set up the AIO request */
bzero( (char *)&my_aiocb, sizeof(struct aiocb) );
my_aiocb.aio_fildes = fd;
my_aiocb.aio_buf = malloc(BUF_SIZE+1);
my_aiocb.aio_nbytes = BUF_SIZE;
my_aiocb.aio_offset = next_offset; /* Link the AIO request with the Signal Handler */
my_aiocb.aio_sigevent.sigev_notify = SIGEV_SIGNAL;
my_aiocb.aio_sigevent.sigev_signo = SIGIO;
my_aiocb.aio_sigevent.sigev_value.sival_ptr = &my_aiocb; /* Map the Signal to the Signal Handler */
ret = sigaction( SIGIO, &sig_act, NULL ); ... ret = aio_read( &my_aiocb ); } void aio_completion_handler( int signo, siginfo_t *info, void *context )
{
struct aiocb *req; /* Ensure it's our signal */
if (info->si_signo == SIGIO) { req = (struct aiocb *)info->si_value.sival_ptr; /* Did the request complete? */
if (aio_error( req ) == 0) { /* Request completed successfully, get the return status */
ret = aio_return( req ); } } return;
}

使用回调函数进行异步通知

另外一种通知方式是系统回调函数。这种机制不会为通知而产生一个信号,而是会调用用户空间的一个函数

来实现通知功能。我们在 sigevent结构中设置了对 aiocb 的引用,从而可以惟一标识正在完成的特定请求。

/*
在创建自己的 aiocb 请求之后,我们使用 SIGEV_THREAD 请求了一个线程回调函数来作为通知方法(aio_sigevent.sigev_notify指定)。 然后我们将指定特定的通知处理程序,并将要传输的上下文加载到处理程序中(aio_sigevent.notify_function指定,在这种情况中,是个对 aiocb 请求自己的引用)。 在这个处理程序中,我们简单地引用到达的 sigval 指针并使用 AIO 函数来验证请求已经完成。
*/
void setup_io( ... )
{
int fd;
struct aiocb my_aiocb; ... /* Set up the AIO request */
bzero( (char *)&my_aiocb, sizeof(struct aiocb) );
my_aiocb.aio_fildes = fd;
my_aiocb.aio_buf = malloc(BUF_SIZE+1);
my_aiocb.aio_nbytes = BUF_SIZE;
my_aiocb.aio_offset = next_offset; /* Link the AIO request with a thread callback */
my_aiocb.aio_sigevent.sigev_notify = SIGEV_THREAD;
my_aiocb.aio_sigevent.notify_function = aio_completion_handler;
my_aiocb.aio_sigevent.notify_attributes = NULL;
my_aiocb.aio_sigevent.sigev_value.sival_ptr = &my_aiocb; ... ret = aio_read( &my_aiocb ); } void aio_completion_handler( sigval_t sigval )
{
struct aiocb *req; req = (struct aiocb *)sigval.sival_ptr; /* Did the request complete? */
if (aio_error( req ) == 0) { /* Request completed successfully, get the return status */
ret = aio_return( req ); } return;
}
#include <stdio.h>
#include <stdlib.h>
#include <string.h> #include <aio.h>
#include <unistd.h>
#include <signal.h>
#include <sys/stat.h>
#include <fcntl.h> static char *memBuffer;
static int sFileDesc;
static struct sigaction sOldSigAction; static void MySigQuitHandler(int sig)
{
printf("Signal Quit! The number is: %d\n", sig);
} static void MyFileReadCompleteProcedure(int sig, siginfo_t *si, void *ucontext)
{
printf("The file length is: %zu, and the content is: %s\n", strlen(memBuffer), memBuffer);
int status = close(sFileDesc);
if(status == 0)
puts("File closed successfully!");
else
printf("The error code is: %d\n", status); free(memBuffer); // 还原原来的SIGUSR1信号行为
if(sigaction(SIGUSR1, &sOldSigAction, NULL) == -1)
puts("SIGUSR1 signal restore failed!");
} int main(void)
{
struct sigaction sigAction = { .sa_flags = SA_RESTART, .sa_handler = &MySigQuitHandler }; sigemptyset(&sigAction.sa_mask); if (sigaction(SIGQUIT, &sigAction, NULL) == -1)
{
puts("Signal failed!");
return -1;
} sigAction.sa_sigaction = &MyFileReadCompleteProcedure;
if(sigaction(SIGUSR1, &sigAction, &sOldSigAction) == -1)
{
puts("Signal failed!");
return -1;
} const char *filePath = "myfile.txt"; const mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
sFileDesc = open(filePath, O_RDONLY, mode);
if(sFileDesc == -1)
{
printf("The file: %s cannot be opened!\n", filePath);
return -1;
} const long fileLength = lseek(sFileDesc, 0, SEEK_END);
lseek(sFileDesc, 0, SEEK_SET); memBuffer = malloc(fileLength + 1);
memBuffer[fileLength] = '\0'; struct aiocb aioBuffer;
aioBuffer.aio_fildes = sFileDesc;
aioBuffer.aio_offset = 0;
aioBuffer.aio_buf = memBuffer;
aioBuffer.aio_nbytes = fileLength;
aioBuffer.aio_reqprio = 0;
aioBuffer.aio_sigevent = (struct sigevent){.sigev_notify = SIGEV_SIGNAL, .sigev_signo = SIGUSR1, .sigev_value.sival_ptr = memBuffer }; aio_read(&aioBuffer); getchar(); return 0;
}

附录 :对 AIO 进行系统优化

proc 文件系统包含了两个虚拟文件,它们可以用来对异步 I/O 的性能进行优化( 这对于大部分应用程序来说都已经足够了):

  • /proc/sys/fs/aio-nr 文件提供了系统范围异步 I/O 请求现在的数目。
  • /proc/sys/fs/aio-max-nr 文件是所允许的并发请求的最大个数。最大个数通常是 64KB

参考资料

最新文章

  1. Android之垂直显示TextView
  2. 通过GPS数据反向地理信息编码, 得到当前位置信息
  3. http协议简述
  4. 【转】 memwatch使用说明书
  5. Spring Autowiring by AutoDetect
  6. Linux编程简介
  7. Altium Designer6打印PCB 设置
  8. php对mongo操作问题
  9. STM32学习笔记——新建工程模板步骤(向原子哥学习)
  10. hdu1114(完全背包)
  11. 不要错过iost币的免费派发机会
  12. 奥酷HTML5视频直播系统AMS6.0
  13. 王之泰201771010131《面向对象程序设计(java)》第十二周学习总结
  14. CMOS Sensor的调试经验分享【转】
  15. JS数字指定长度不足前补零的实现
  16. TC-572-D1L2 (双向搜索+记忆化)
  17. iOS开发transform的使用
  18. 9.1C#中类的定义
  19. java.lang.IndexOutOfBoundsException: setSpan (35 ... 35) ends beyond length 28
  20. fluentd 推送 mariadb audit log

热门文章

  1. CUP的MESI协议
  2. jdbc原理与步骤
  3. Linux常用命令代码大全
  4. matlab中sum
  5. P4454 [CQOI2018]破解D-H协议
  6. Java安全之Commons Collections1分析前置知识
  7. vue 异步提交php 两种方式传值
  8. 增强for循环的用法
  9. 以太坊blockchain源码分析
  10. docker启动服务---------------kafka+zookeeper