redis4.0的文件事件与客户端

简介

文件事件的流程大概如下:

  1. 在服务器初始化时生成aeEventLoop并赋值给server,接着创建监听TCP连接事件。
  2. 处理TCP连接时会创建client类型的对象,将其绑定在accept函数返回的文件描述符fd上,并对fd注册一个可读事件,当客户端数据来临时,readQueryFromClient会对数据进行处理。
  3. redis处理完数据后,会调用write函数将数据返回给客户端(但不是在一个循环里)。如果函数返回的值小于写入的值,说明系统缓存区空间不够,或者文件描述符在中途被占用,那么redis会注册一个可写事件,当可写事件触发时,sendReplyToClient函数会写入剩余的数据。
  4. 当客户端断开连接,服务器会释放client相关的资源,随之删除对应的文件事件。

正文

准备阶段

在初始化服务器时,server函数创建clientsclients_pending_write等字段,并通过aeCreateEventLoop创建一个aeEventLoop对象。

aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i; eventLoop = zmalloc(sizeof(*eventLoop));
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
eventLoop->setsize = setsize;
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set.*/
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
}

此处传入参数setsize的值为maxClients+128,maxClients默认值为10,000。events用于存放注册的文件事件,而fired则在事件触发时,存放被触发的事件。两者的长度都为setsize大小。

紧接着便会注册第一个文件事件。

aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL)

这里我们更进一步看下aeCreateFileEvent的代码, fd文件描述符 被用于偏移来获取对应的文件事件结构,因此fd的值必须小于之前注册的事件大小的值。第一个被用于注册文件事件的fd用于监听TCP连接,由于进程启动时会打开一些其他的文件,因此eventLoop->events的空间并没有并完全利用。此处还通过mask来注册对应的事件触发后的处理函数。如果是监听可读事件,那么rfileProc处理函数会被赋值。可写事件同理。此时并没有传入clientData,我们会在下文再回到这个函数。

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
aeFileEvent *fe = &eventLoop->events[fd]; if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}

接受客户端连接

当接受来自客户端连接时,便会调用acceptTcpHandler函数,该函数会接受所有客户端的请求,但一次最多接受MAX_ACCEPTS_PER_CALL1000个客户端,并且如果在轮询中发现没有客户端请求,就会立刻返回。接受了一个客户端连接请求后,便会进入处理函数acceptCommonHandler,它会创建一个client的对象,如果连接的数量大于设置的值,则会断开连接。如果redis跑在保护模式,则可能返回错误信息。

If no pending connections are present on the queue, and the socket is
not marked as nonblocking, accept() blocks the caller until a
connection is present. If the socket is marked nonblocking and no
pending connections are present on the queue, accept() fails with the
error EAGAIN or EWOULDBLOCK.

最主要的代码位于createClient,它会注册客户端可读事件,关联readQueryFromClient函数,并且初始化client的一些属性。

client *createClient(int fd) {
client *c = zmalloc(sizeof(client)); if (fd != -1) {
anetNonBlock(NULL,fd);
anetEnableTcpNoDelay(NULL,fd);
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
} selectDb(c,0);
uint64_t client_id; client_id = server.next_client_id;
server.next_client_id += 1; c->id = client_id;
c->fd = fd;
c->name = NULL;
c->bufpos = 0; //下一个返回数据存入位置
// c->buf 数组存储返回给客户端的数据
c->querybuf = sdsempty(); //查询缓存
c->reqtype = 0; //查询类型 一般为multi
c->argc = 0; //参数个数 由querybuf解析而得
c->argv = NULL;//参数值 由querybuf解析而得
c->cmd = c->lastcmd = NULL;
c->multibulklen = 0; //查询数据的行数
c->bulklen = -1;//一行查询数据的长度
c->sentlen = 0;//已经发送的数据长度
c->flags = 0;
c->ctime = c->lastinteraction = server.unixtime;
c->reply = listCreate(); //如果buf 数组溢出,则使用reply链表
c->reply_bytes = 0; //reply链表中对象总共的字节数
c->obuf_soft_limit_reached_time = 0;
listSetFreeMethod(c->reply,freeClientReplyValue);
listSetDupMethod(c->reply,dupClientReplyValue);
if (fd != -1) linkClient(c);
return c;
}

处理数据

redis通过aeProcessEvents函数处理各种事件,首先它会调用aeApiPoll函数通过多路复用函数来检查已经触发的事件,并将已经触发事件的文件描述符,事件类型赋值给eventLoop->fired。然后根据事件触发的类型,调用之前注册的函数。

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents; int j;
struct timeval tv, *tvp; tvp = NULL; /* wait forever */ numevents = aeApiPoll(eventLoop, tvp); for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd; if (fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
} if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
} processed++;
} return processed; /* return the number of processed file/time events */
}

如果此时有来自客户端的数据,那么将会触发AE_READABLE事件,调用readQueryFromClient函数。默认情况一次读取16KB,除非上次已经读取过数据,并且数据量较大,一行长度超过32KB。(超过32KB则会对其优化,避免了字符串的拷贝,代价是多了几次read调用)。

如果超过32KB,并且剩余长度小于16KB,那么一次读取剩余该行长度的值。这是因为TCP接受的数据不一定是完整的数据,如果是PROTO_REQ_MULTIBULK多行请求,并且数据量过大,在redis开始处理请求前需要接收全部的数据,等待的时间过长,并且解析完毕之后,执行命令的时间和下发数据的长度也会影响性能。建议一次请求不超过16KB,但这16KB中还包含着*/r/n等格式符号,因此请求的数据量还要再小一些,才能保证服务端尽可能在一次接收数据的过程中完成命令的解析。

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
client *c = (client*) privdata;
int nread, readlen;
size_t qblen; readlen = PROTO_IOBUF_LEN;//1024*16 bytes if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
&& c->bulklen >= PROTO_MBULK_BIG_ARG)
{
ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf); //如果超过**32KB**,并且剩余长度小于**16KB**,那么一次读取剩余该行长度的值。
// 如果触发,则在processMultibulkBuffer可以直接使用现有的字符串避免了字符串的复制,代价是多调用了几次 read(2)函数。
if (remaining < readlen) readlen = remaining;
} qblen = sdslen(c->querybuf); c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
nread = read(fd, c->querybuf+qblen, readlen);
if (nread == -1) {
if (errno == EAGAIN) {
return;
} else {
serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
freeClient(c);
return;
}
} else if (nread == 0) {
serverLog(LL_VERBOSE, "Client closed connection");
freeClient(c);
return;
} sdsIncrLen(c->querybuf,nread); processInputBuffer(c); }

接着就会进入processInputBuffer函数,此时数据可能全部抵达,也可能部分抵达。processInputBuffer函数的主要功能是将,client->querybuff里面的数据解析,并转化为client->argcclient->argv的数据。如果数据全部抵达,那么接着会进入到processComand函数,查找命令表,执行命令并返回数据给客户的。如果数据部分抵达,但是一行的数据内容抵达,那么该行数据会被解析到client->argcclient->argv中去。

返回数据结果

在这里我们假设客户端输入的字符串是quitprocessComand函数会调用addReply函数将当前的client加入到clients_pending_write链表中。

再将存储OK字符串的对象添加到缓冲区,服务端返回给客户端的编码类型只可能是字符型或者是INT型。首先redis会尝试将结果添加到缓冲区,缓冲区的大小默认16KB,并且不能通过配置更改。如果缓冲区会溢出,那么redis会将数据添加到client->reply链表中。

void addReply(client *client, robj *obj) {
if (prepareClientToWrite(client) != C_OK) return; if (sdsEncodedObject(obj)) {
if (_addReplyToBuffer(client,obj->ptr,sdslen(obj->ptr)) != C_OK)
_addReplyObjectToList(client,obj);
} else if (obj->encoding == OBJ_ENCODING_INT) {
...
} else {
// serverPanic("Wrong obj->encoding in addReply()");
serverLog(LL_WARNING, "Wron obj->encoding in addReply()");
}
}

此时数据还没有返回给客户端,在redis进入下一次循环的时候,会调用beforeSleep函数将数据返回给客户端。

为什么redis不直接将数据返回给客户端呢?

源码的注释给出了答案:为了实现fsync=always的效果,将返回数据放在beforeSleep中,可以通过AOF持久后,再返回给客户端结果。

 /* For the fsync=always policy, we want that a given FD is never
* served for reading and writing in the same event loop iteration,
* so that in the middle of receiving the query, and serving it
* to the client, we'll call beforeSleep() that will do the
* actual fsync of AOF to disk. AE_BARRIER ensures that. */

beforeLoop会接着调用handleClientsWithPendingWrites函数来处理有缓存数据的clientwriteToClient函数会将buf中和reply链表中的数据全部发送给客户端,如果实际发送的数据小于应当发送的数据,则表示系统缓存区空间不够,或者文件描述符在中途被占用,那么redis会创建一个事件,当监听到文件描述符可读时,再将剩余数据写入。

int handleClientsWithPendingWrites(void) {
listIter li;
listNode *ln;
int processed = listLength(server.clients_pending_write); listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
listDelNode(server.clients_pending_write,ln); /* Try to write buffers to the client socket. */
if (writeToClient(c->fd,c,0) == C_ERR) continue; /* If after the synchronous writes above we still have data to
* output to the client, we need to install the writable handler. */
if (clientHasPendingReplies(c)) {
int ae_flags = AE_WRITABLE;
if (aeCreateFileEvent(server.el, c->fd, ae_flags,
sendReplyToClient, c) == AE_ERR)
{
freeClientAsync(c);
}
}
}
return processed;
}

在写完数据后,发现客户端有被标记CLIENT_CLOSE_AFTER_REPLY,那么将会释放客户端的资源。

if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
freeClient(c);
return C_ERR;
}

参考文献

accept函数

《Redis设计与实现》

最新文章

  1. 从网易与淘宝的font-size思考前端设计稿与工作流
  2. jquery 基础教程[温故而知新二]
  3. Beyond Compare for mac 无限试用方法
  4. junit基础篇、中级篇-实例代码
  5. subprocess使用
  6. 关于display的那些事儿!
  7. MyEclipse开发JAX-RS架构WebServices收发JSON数据格式
  8. Oracle 数据乱码
  9. SRM 502 DIV1 500pt(DP)
  10. c/c++编译原理
  11. BT5之网络配置
  12. access数据库管理软件收集下载
  13. 利用flask 实现简单模版站
  14. C++ 前期准备
  15. MySQL中的用户与授权
  16. 搭建JMETER+ANT自动化接口测试环境步骤(一)
  17. 初识hibernate——环境搭建
  18. Java并发编程的艺术(一)
  19. idea 在tomcat启动的时候发现控制台输出的是乱码
  20. Docker+Nginx部署Angular

热门文章

  1. 【网鼎杯2020朱雀组】Web WriteUp
  2. javascript布局转换
  3. 通过Folx的排序功能来设置下载任务的优先级
  4. 怎么理解虚拟 DOM?
  5. python字节自适应转化单位KB、MB、GB
  6. Centos7安装Nginx详细步骤
  7. 在VMware下创建windows server 2008虚拟机
  8. Codeforces Round #677 (Div. 3)
  9. C语言项目——工程化编程的案例分析
  10. String.Split()函数 非原创