整个状态机的基本流程如下图所示,后续分析将按该流程来进行。

接上节分解,主线程将接收的连接socket分发给了某工作线程,然后工作线程从任务队列中取出该连接socket的CQ_ITEM,开始处理该连接的所有业务逻辑。这个过程也就是上图中的第一个状态conn_listening。 而工作线程首先进入的状态就是conn_new_cmd,即为这个新的连接做一些准备工作,如清理该连接conn结构的读缓冲区等。

准备状态conn_new_cmd具体分析如下:

{
  <span style="font-size:18px;">case conn_new_cmd://为新连接准备:各种清理重置工作
            /* Only process nreqs at a time to avoid starving other
              connections */
            --nreqs;//记录每个libevent实例处理的最大事件数,通过初始启动参数配置
            if (nreqs >= 0) {//还可以处理请求
                reset_cmd_handler(c);//缩小缓冲区,转为解析读缓冲区数据的状态,然后转为等待读取网络数据包状态
            } else {//拒绝请求
                pthread_mutex_lock(&c->thread->stats.mutex);
                c->thread->stats.conn_yields++;
                pthread_mutex_unlock(&c->thread->stats.mutex);
                if (c->rbytes > 0) {//读缓冲区中有数据了,即表明已经读入了数据,因此不再通知读事件
                    /* We have already read in data into the input buffer,
                      so libevent will most likely not signal read events
                      on the socket (unless more data is available. As a
                      hack we should just put in a request to write data,
                      because that should be possible ;-)
                    */
                    if (!update_event(c, EV_WRITE | EV_PERSIST)) {//更新event为写事件,并重新注册到event_bvase
                        if (settings.verbose > 0)
                            fprintf(stderr, "Couldn't update event\n");
                        conn_set_state(c, conn_closing);//关闭连接
                        break;
                    }
                }
                stop = true;
            }
            break;</span>

}

其中整理缓冲区函数reset_cmd_handler函数:首先调用函数conn_shrink缩小该conn的各种缓冲区,然后进入解析状态,解析读缓冲区中未解析的字节,进而转为等待读数据状态(当读缓冲区中没有数据处理时,即进入等待状态)。

具体分析如下:

static void reset_cmd_handler(conn *c) {
    c->cmd = -1;
    c->substate = bin_no_state;
    if(c->item != NULL) {
        item_remove(c->item);//删除item
        c->item = NULL;
    }
    conn_shrink(c);//整理缓冲区
    if (c->rbytes > 0) {//缓冲区还有字节未解析
        conn_set_state(c, conn_parse_cmd);//转换为解析状态
    } else {//缓冲区没有数据
        conn_set_state(c, conn_waiting);//转为 等待读取一个数据包 状态 ,状态机没有数据要处理,就进入等待状态  
    }
}

其中调用函数conn_shrink,来缩小各缓冲区:

static void conn_shrink(conn *c) {
    assert(c != NULL);

if (IS_UDP(c->transport))//如果是UDP协议,不牵涉缓冲区管理
        return;
 //读缓冲区空间大小>READ_BUFFER_HIGHWAT && 还没解析的数据小于 DATA_BUFFER_SIZE 
    if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
        char *newbuf;

if (c->rcurr != c->rbuf)//如果已经解析了部分数据
            memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);//把读缓冲区中的未解析数据向前移动,已覆盖掉已解析的内容

//重新分配DATA_BUFFER_SIZE大小的空间作为读缓冲区
        newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);

if (newbuf) {
            c->rbuf = newbuf;
            c->rsize = DATA_BUFFER_SIZE;
        }
        /* TODO check other branch... */
        c->rcurr = c->rbuf;//以解析数据被覆盖,因此剩下的全部未解析
    }

////需要写出(发往客户端)的item的数量
    if (c->isize > ITEM_LIST_HIGHWAT) {
        item **newbuf = (item**) realloc((void *)c->ilist, ITEM_LIST_INITIAL * sizeof(c->ilist[0]));
        if (newbuf) {
            c->ilist = newbuf;
            c->isize = ITEM_LIST_INITIAL;
        }
    /* TODO check error condition? */
    }
 //待发送的消息个数,memcached发送消息是通过sendmsg批量发送
    if (c->msgsize > MSG_LIST_HIGHWAT) {
        struct msghdr *newbuf = (struct msghdr *) realloc((void *)c->msglist, MSG_LIST_INITIAL * sizeof(c->msglist[0]));
        if (newbuf) {
            c->msglist = newbuf;
            c->msgsize = MSG_LIST_INITIAL;
        }
    /* TODO check error condition? */
    }
 //一次性顺序写多个item??
    if (c->iovsize > IOV_LIST_HIGHWAT) {
        struct iovec *newbuf = (struct iovec *) realloc((void *)c->iov, IOV_LIST_INITIAL * sizeof(c->iov[0]));
        if (newbuf) {
            c->iov = newbuf;
            c->iovsize = IOV_LIST_INITIAL;
        }
    /* TODO check return value */
    }
}

由准备状态conn_new_cmd,如果读缓冲区还有未解析数据,则进入解析状态conn_parse_cmd,按协议解析读取到的网络数据。如果没有待处理数据,则进入等待状态conn_waiting。

memcached采用二进制协议和文本协议两种网络协议,解析时,根据具体的协议解析,然后进入具体命令状态,执行相应具体的操作如:SET GET等待。

解析状态conn_parse_cmd:

//解析读缓冲区中的数据
        case conn_parse_cmd :
   //如果缓冲区中有完整的命令行,则读取之,否则继续转为等待状态
            if (try_read_command(c) == 0) {//缓冲区中没有一条完成的命令行,则需要更多的数据,因此继续等待客户端发来数据
                /* wee need more data! */
                conn_set_state(c, conn_waiting);//继续进入等待状态
            }

break;

解析缓冲区中的一条完整命令:

//解析缓冲区数据
static int try_read_command(conn *c) {
    assert(c != NULL);
    assert(c->rcurr <= (c->rbuf + c->rsize));
    assert(c->rbytes > 0);

if (c->protocol == negotiating_prot || c->transport == udp_transport)  {
        if ((unsigned char)c->rbuf[0] == (unsigned char)PROTOCOL_BINARY_REQ) {
            c->protocol = binary_prot;//二进制协议
        } else {
            c->protocol = ascii_prot;//文本协议
        }

if (settings.verbose > 1) {
            fprintf(stderr, "%d: Client using the %s protocol\n", c->sfd,
                    prot_text(c->protocol));
        }
    }
 //采用二进制协议
    if (c->protocol == binary_prot) {
      //如果二进制协议读取的数据小于二进制协议头部长度,则需要继续读取数据
        if (c->rbytes < sizeof(c->binary_header)) {
            /* need more data! */
            return 0;
        } else {
#ifdef NEED_ALIGN
   //则按8字节对齐,提高CPU读取的效率
            if (((long)(c->rcurr)) % 8 != 0) {
              //调整缓冲区
                memmove(c->rbuf, c->rcurr, c->rbytes);
                c->rcurr = c->rbuf;
                if (settings.verbose > 1) {
                    fprintf(stderr, "%d: Realign input buffer\n", c->sfd);
                }
            }
#endif
            protocol_binary_request_header* req;//二进制协议头部
            req = (protocol_binary_request_header*)c->rcurr;

//...
  //....
  
        //解析二进制协议数据,根据解析结果进行具体操作,如GET SET等
            dispatch_bin_command(c);

c->rbytes -= sizeof(c->binary_header);//更新已读取到的字节数
            c->rcurr += sizeof(c->binary_header);//更新缓冲区的路标信息
        }
    } else {//文本协议
 
        //...
 //....
  
  //根据文本协议解析结果,执行具体操作如SET GET。
        process_command(c, c->rcurr);

}

return 1;
}

二进制协议处理函数:dispatch_bin_command。根据二进制协议解析的结果,处理具体的(比如get,set等)操作(进入相应的操作命令状态)。文本协议操作类似。 
具体的命令如SET,GET,DELETE等操作放到后面讲解。

/根据二进制协议解析的结果,处理具体的(比如get,set等)操作
static void dispatch_bin_command(conn *c) {
  //...
  //...
  
    switch (c->cmd) {
    case PROTOCOL_BINARY_CMD_SETQ: //SET命令
        c->cmd = PROTOCOL_BINARY_CMD_SET;
        break;
    case PROTOCOL_BINARY_CMD_ADDQ: //ADD命令
        c->cmd = PROTOCOL_BINARY_CMD_ADD;
        break;
    //...
    case PROTOCOL_BINARY_CMD_DELETEQ:  //DELETE命令
        c->cmd = PROTOCOL_BINARY_CMD_DELETE;
        break;
  //...
  //...
    }

switch (c->cmd) {
      
    //...
    //....
      case PROTOCOL_BINARY_CMD_GETQ:  /* FALLTHROUGH */
        case PROTOCOL_BINARY_CMD_GET:  /* FALLTHROUGH */
        case PROTOCOL_BINARY_CMD_GETKQ: /* FALLTHROUGH */
        case PROTOCOL_BINARY_CMD_GETK:
            if (extlen == 0 && bodylen == keylen && keylen > 0) {
                bin_read_key(c, bin_reading_get_key, 0);  //在该函数中: conn_set_state(c, conn_nread),进入读状态,读取指定数目的数据
            } else {
                protocol_error = 1;
            }
            break;
      
    if (protocol_error)
        handle_binary_protocol_error(c);
 }
}

解析完完成后,就该进入具体的命令操作了,如SET  GET  等待。具体就后续分解

case bin_read_set_value:  
        complete_update_bin(c);//执行Update操作  
        break;  
case bin_reading_get_key:  
        process_bin_get(c);//执行get操作  
        break;

当缓冲区中没有可解析的数据时,则进入等待状态。

等待状态conn_waiting:

{
//进入等待状态
        case conn_waiting:
   //更新libevent中对该连接socket注册的事件为读事件,再重新注册。以等待客户端发数据到读缓冲区
            if (!update_event(c, EV_READ | EV_PERSIST)) {//注册为永久事件,直到下次更新该event事件
                if (settings.verbose > 0)
                    fprintf(stderr, "Couldn't update event\n");
                conn_set_state(c, conn_closing);
                break;
            }

conn_set_state(c, conn_read);//转为读状态
            stop = true;
            break;
}

其中函数update_event:注意,每次转为读状态,或写状态时,都要更新该连接socket在该工作线程libevent实例中注册的事件event,然后再从新注册回libevent。

且每次都注册为EV_PERSIST持久事件,直到下次更新该event。

具体更新过程如下:

//更新event,再重新注册到event_base中
static bool update_event(conn *c, const int new_flags) {
    assert(c != NULL);

struct event_base *base = c->event.ev_base;
    if (c->ev_flags == new_flags)
        return true;
    if (event_del(&c->event) == -1) return false;
    event_set(&c->event, c->sfd, new_flags, event_handler, (void *)c);
    event_base_set(base, &c->event);
    c->ev_flags = new_flags;
    if (event_add(&c->event, 0) == -1) return false;
    return true;
}

切换conn状态的函数:

//切换状态:将conn的状态设为state
static void conn_set_state(conn *c, enum conn_states state) {
    assert(c != NULL);
    assert(state >= conn_listening && state < conn_max_state);//检验状态合法性

if (state != c->state) {
        if (settings.verbose > 2) {
            fprintf(stderr, "%d: going from %s to %s\n",
                    c->sfd, state_text(c->state),
                    state_text(state));
        }

if (state == conn_write || state == conn_mwrite) {
            MEMCACHED_PROCESS_COMMAND_END(c->sfd, c->wbuf, c->wbytes);
        }
        c->state = state;//设置为新的状态
    }
}

当连接socket读事件就绪时,就进入读状态,读取网络数据,存入读缓冲区。 
读状态conn_read:

case conn_read:  
        res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);//判断采用UDP协议还是TCP协议  
  
        switch (res)  
        {  
        case READ_NO_DATA_RECEIVED://未读取到数据  
            conn_set_state(c, conn_waiting);//继续等待  
            break;  
        case READ_DATA_RECEIVED://读取数据  
            conn_set_state(c, conn_parse_cmd);//开始解析数据  
            break;  
        case READ_ERROR://读取发生错误  
            conn_set_state(c, conn_closing);//关闭连接  
            break;  
        case READ_MEMORY_ERROR: //申请内存空间错误,继续尝试  
            break;  
        }  
        break;

若采用TCP协议,从网络读取数据,其中调用函数read():

static enum try_read_result try_read_network(conn *c) {
    enum try_read_result gotdata = READ_NO_DATA_RECEIVED;
    int res;
    int num_allocs = 0;//记录从新分配缓冲区空间的次数,每次空间增倍
    assert(c != NULL);

//如果原缓冲区中有部分数据已解析,则用未解析数据覆盖以解析部分
    if (c->rcurr != c->rbuf) {
        if (c->rbytes != 0) /* otherwise there's nothing to copy */
            memmove(c->rbuf, c->rcurr, c->rbytes);
        c->rcurr = c->rbuf;
    }

while (1) {//循环读取数据
        if (c->rbytes >= c->rsize) {
            if (num_allocs == 4) {//如果分配了四次,缓冲区空间还是不够,则返回
                return gotdata;
            }
            ++num_allocs;
            char *new_rbuf = realloc(c->rbuf, c->rsize * 2);//重分配2倍空间
            if (!new_rbuf) {//分配空间失败,则进入关闭连接状态
                STATS_LOCK();
                stats.malloc_fails++;//全局状态
                STATS_UNLOCK();
                if (settings.verbose > 0) {
                    fprintf(stderr, "Couldn't realloc input buffer\n");
                }
                c->rbytes = 0; /* ignore what we read */
                out_of_memory(c, "SERVER_ERROR out of memory reading request");
                c->write_and_go = conn_closing;
                return READ_MEMORY_ERROR;
            }
            c->rcurr = c->rbuf = new_rbuf;
            c->rsize *= 2;
        }

int avail = c->rsize - c->rbytes;//可用空间大小=总空间- 未解析空间
  //执行网络读取,这个是非阻塞的读
        res = read(c->sfd, c->rbuf + c->rbytes, avail);//从套接字中读取数据,存入读缓冲区中,存放在原来未解析数据的后面
        if (res > 0) {
            pthread_mutex_lock(&c->thread->stats.mutex);
            c->thread->stats.bytes_read += res;//更新线程状态
            pthread_mutex_unlock(&c->thread->stats.mutex);
            gotdata = READ_DATA_RECEIVED; //已读取到数据
            c->rbytes += res;
            if (res == avail) {//最多读取到avail个,如果已经读到了,则可以尝试继续读取 
                continue;
            } else {
                break;
            }
        }
        if (res == 0) {//表示已经断开网络连接了  
            return READ_ERROR;
        }
        if (res == -1) {//因为是非阻塞的,所以会返回下面的两个错误码
            if (errno == EAGAIN || errno == EWOULDBLOCK) {
                break;
            }//如果返回为负数,且不是上面两个数,则表示发生了其他错误,返回READ_ERROR  
            return READ_ERROR;
        }
    }
    return gotdata;//返回读取结果
}

采用UDP是数据报的形式时,每次读取到的都是一个完整的数据报形式。

函数try_read_udp:

//UDP读取网络数据  
static enum try_read_result try_read_udp(conn *c)  
{  
int res;  
  
assert(c != NULL);  
  
c->request_addr_size = sizeof(c->request_addr);  
res = recvfrom(c->sfd, c->rbuf, c->rsize, 0, &c->request_addr,  
        &c->request_addr_size);//执行UDP的网络读取  
if (res > 8)//UDP数据包大小大于8,已经有可能是业务数据包  
{  
    unsigned char *buf = (unsigned char *) c->rbuf;  
    pthread_mutex_lock(&c->thread->stats.mutex);  
    c->thread->stats.bytes_read += res;//更新每个线程的统计数据  
    pthread_mutex_unlock(&c->thread->stats.mutex);  
  
    /* Beginning of UDP packet is the request ID; save it. */  
    c->request_id = buf[0] * 256 + buf[1];//UDP为了防止丢包,增加了确认字段  
  
    /* If this is a multi-packet request, drop it. */  
    if (buf[4] != 0 || buf[5] != 1)//一些业务的特征信息判断  
    {  
        out_string(c, "SERVER_ERROR multi-packet request not supported");  
        return READ_NO_DATA_RECEIVED;  
    }  
  
    /* Don't care about any of the rest of the header. */  
    res -= 8;  
    memmove(c->rbuf, c->rbuf + 8, res);//调整缓冲区  
  
    c->rbytes = res;//更新信息  
    c->rcurr = c->rbuf;  
    return READ_DATA_RECEIVED;  
}  
return READ_NO_DATA_RECEIVED;  
}

到此状态机中的主要状态就分析得差不多了,剩下的其他状态主要是一系列具体命令操作,如SET 、GET、  DELETE等,这些正是根据对客户端数据解析的结果所进入的状态,后面将继续分析这些命令的执行过程。

最新文章

  1. POJ2743Mobile Computing[DFS 状态压缩]
  2. Xcode环境下OpenGL C++ GLFW开发环境搭建
  3. 图文详解远程部署ASP.NET MVC 5项目
  4. Ubuntu搜狗输入法的使用
  5. MySQL --log-slave-updates
  6. POJ2031Building a Space Station
  7. 转载 基于Selenium WebDriver的Web应用自动化测试
  8. ng表单验证,提交以后才显示错误
  9. 微信支付bug
  10. Android中的测试类配置AndroidManifest.xml
  11. JAVA混型和潜在类型机制
  12. oracle的to_char中的fm
  13. python——面向对象进阶
  14. idhttp提交post带参数并带上cookie
  15. macOS 下 PHPStorm + Xdebug 调试 Docker 环境中的代码
  16. layui---form表单模块
  17. js EL 正则表达式
  18. linux系统lnmp环境包搬家教程
  19. webpack window 安装loader
  20. 移动端视频h5表现问题汇总

热门文章

  1. Shtter抓图时,包含光标的解决方案
  2. 内存中加载DLL DELPHI版
  3. VIM操作基础命令
  4. SpringBoot实现文件上传功能
  5. Http请求原理与相关知识
  6. pyCharm上解决安装不上pandas库问题
  7. MySQL忘记密码解决方案
  8. 3个IO口8个按键
  9. c/s和b/s的区别
  10. JMX心得 -- Server端