转自:http://blog.csdn.net/feitianxuxue/article/details/9386843

处理大并发之五 使用libevent利器bufferevent

首先来翻译一段文章

你可能注意到随着我们代码变得越来越高效,程序也变得更加复杂。当我们产生一个进程的时候,我们没有必要为每一个链接管理一个buffer,我们只需要每个处理独立栈分配缓冲区就可以了。在读和写的时候,我们不必明确的跟踪每一个socket,这在我们的代码里是一个暗示,我们没有必要定义一个结构体去跟踪每一个操作什么时候完成,我们只需要使用循环栈变量就可以了。

此外,如果你在windows网络编程方面有着丰富的经验,当你在使用上一篇博客中的例子时,你可能认识到libevent可能达不到最理想的性能。在windows上,你做的快速异步IO不是用的select,它使用的IOCP API。和其他的快速网络API不同,当你的程序执行完成,sock准备完成,IOCP不会通知你的程序,取而代之的是,程序告诉windows网络栈开启一个网络操作,并且当操作执行完成时,IOCP会告诉程序。

幸运的是,libevent2 的bufferevents接口解决了上面的这些冲突,它使得程序更加容易写,并且为windows和unix提供了有效的接口。

分析:

libevent的bufferevent在event的基础上自己维护了一个buffer,这样的话,就不需要再自己管理一个buffer了,上一篇博客是自己维护一个buffer,维护过程复杂,且过程难以理解,既然libevent自己提供了bufferevent这个神器,且有API,何必自己维护呢?

先看看struct bufferevent这个结构体

  1. struct bufferevent {
  2. struct event_base *ev_base;
  3. const struct bufferevent_ops *be_ops;
  4. struct event ev_read;
  5. struct event ev_write;
  6. struct evbuffer *input;
  7. struct evbuffer *output;
  8. ……
  9. bufferevent_data_cb readcb;
  10. bufferevent_data_cb writecb;
  11. bufferevent_event_cb errorcb;
  12. ……
  13. }

可以看出struct bufferevent内置了两个event(读/写)和对应的缓冲区。当有数据被读入(input)的时候,readcb被调用,当output被输出完成的时候,writecb被调用,当网络I/O出现错误,如链接中断,超时或其他错误时,errorcb被调用。

使用bufferevent的过程:

1. 设置sock为非阻塞的

  1. eg:  evutil_make_socket_nonblocking(fd);

2. 使用bufferevent_socket_new创建一个structbufferevent *bev,关联该sockfd,托管给event_base

函数原型为:

  1. struct bufferevent * bufferevent_socket_new(struct event_base *base, evutil_socket_t fd,  int options)
  2. eg:  struct bufferevent *bev;
  3. bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);

3. 设置读写对应的回调函数

函数原型为:

  1. void bufferevent_setcb(struct bufferevent *bufev,
  2. bufferevent_data_cb readcb, bufferevent_data_cb writecb,
  3. bufferevent_event_cb eventcb, void *cbarg)
  4. eg.  bufferevent_setcb(bev, readcb, NULL, errorcb, NULL);

4. 启用读写事件,其实是调用了event_add将相应读写事件加入事件监听队列poll。正如文档所说,如果相应事件不置为true,bufferevent是不会读写数据的

函数原型:

  1. int bufferevent_enable(struct bufferevent *bufev, short event)
  2. eg.  bufferevent_enable(bev, EV_READ|EV_WRITE);

5. 进入bufferevent_setcb回调函数:

在readcb里面从input中读取数据,处理完毕后填充到output中;

writecb对于服务端程序,只需要readcb就可以了,可以置为NULL;

errorcb用于处理一些错误信息。

针对这些使用过程进入源码进行分析:

1. bufferevent_socket_new

(1)在bufferevent_init_common中调用evbuffer_new()初始化input和output

(2)在event_assign中初始化bufferevent中的ev_read和ev_write事件。

(3)在evbuffer_add_cb中给output添加了一个callback bufferevent_socket_outbuf_cb

2. bufferevent_setcb

该函数的作用主要是赋值,把该函数后面的参数,赋值给第一个参数struct bufferevent *bufev定义的变量

3. bufferevent_enable

调用event_add将读写事件加入到事件监听队列中。

对bufferevent常用的几个函数进行分析:

  1. char *evbuffer_readln(struct evbuffer*buffer, size_t *n_read_out,enum evbuffer_eol_style eol_style);

含义:Read a single line from an evbuffer.

返回值:读到的一行内容

  1. int evbuffer_add(struct evbuffer *buf,const void *data, size_t datlen);

含义:将数据添加到evbuffer的结尾

返回值:成功返回0,失败返回-1

  1. int evbuffer_remove(struct evbuffer*buf, void *data, size_t datlen);

含义:从evbuffer读取数据到data

返回值:成功返回0,失败返回-1

  1. size_t evbuffer_get_length(const structevbuffer *buf);

含义:返回evbuffer中存储的字节长度

暂时先分析到这里,下面是代码,客户端发送消息:HTTP/1.0, Client 0 send Message:

Request: Hello Server! over,服务端一条消息收完成后,会回复:Response ok! Hello Client!

服务端从bufferevent中取出消息是按行取的。代码可能有不完善的地方,由于才疏学浅,研究时间短(3天),希望高手提出宝贵意见。

libevent_eventbuffer_server.c

  1. #include <netinet/in.h>
  2. #include <sys/socket.h>
  3. #include <fcntl.h>
  4. #include <event2/event.h>
  5. #include <event2/buffer.h>
  6. #include <event2/bufferevent.h>
  7. #include <assert.h>
  8. #include <unistd.h>
  9. #include <string.h>
  10. #include <stdlib.h>
  11. #include <stdio.h>
  12. #include <errno.h>
  13. void do_read(evutil_socket_t fd, short events, void *arg);
  14. //struct bufferevent内建了两个event(read/write)和对应的缓冲区(struct evbuffer *input, *output),并提供相应的函数用来操作>
  15. 缓冲区(或者直接操作bufferevent)
  16. //接收到数据后,判断是不一样一条消息的结束,结束标志为"over"字符串
  17. void readcb(struct bufferevent *bev, void *ctx)
  18. {
  19. printf("called readcb!\n");
  20. struct evbuffer *input, *output;
  21. char *request_line;
  22. size_t len;
  23. input = bufferevent_get_input(bev);//其实就是取出bufferevent中的input
  24. output = bufferevent_get_output(bev);//其实就是取出bufferevent中的output
  25. size_t input_len = evbuffer_get_length(input);
  26. printf("input_len: %d\n", input_len);
  27. size_t output_len = evbuffer_get_length(output);
  28. printf("output_len: %d\n", output_len);
  29. while(1)
  30. {
  31. request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF);//从evbuffer前面取出一行,用一个新分配的空字符结束
  32. 的字符串返回这一行,EVBUFFER_EOL_CRLF表示行尾是一个可选的回车,后随一个换行符
  33. if(NULL == request_line)
  34. {
  35. printf("The first line has not arrived yet.\n");
  36. free(request_line);//之所以要进行free是因为 line = mm_malloc(n_to_copy+1)),在这里进行了malloc
  37. break;
  38. }
  39. else
  40. <span style="white-space: pre;">    </span>{
  41. printf("Get one line date: %s\n", request_line);
  42. if(strstr(request_line, "over") != NULL)//用于判断是不是一条消息的结束
  43. {
  44. char *response = "Response ok! Hello Client!\r\n";
  45. evbuffer_add(output, response, strlen(response));//Adds data to an event buffer
  46. printf("服务端接收一条数据完成,回复客户端一条消息: %s\n", response);
  47. free(request_line);
  48. break;
  49. }
  50. }
  51. free(request_line);
  52. }
  53. size_t input_len1 = evbuffer_get_length(input);
  54. printf("input_len1: %d\n", input_len1);
  55. size_t output_len1 = evbuffer_get_length(output);
  56. printf("output_len1: %d\n\n", output_len1);
  57. }
  58. void errorcb(struct bufferevent *bev, short error, void *ctx)
  59. {
  60. if (error & BEV_EVENT_EOF)
  61. {
  62. /* connection has been closed, do any clean up here */
  63. printf("connection closed\n");
  64. }
  65. else if (error & BEV_EVENT_ERROR)
  66. {
  67. /* check errno to see what error occurred */
  68. printf("some other error\n");
  69. }
  70. else if (error & BEV_EVENT_TIMEOUT)
  71. {
  72. /* must be a timeout event handle, handle it */
  73. printf("Timed out\n");
  74. }
  75. bufferevent_free(bev);
  76. }
  77. void do_accept(evutil_socket_t listener, short event, void *arg)
  78. {
  79. struct event_base *base = arg;
  80. struct sockaddr_storage ss;
  81. socklen_t slen = sizeof(ss);
  82. int fd = accept(listener, (struct sockaddr*)&ss, &slen);
  83. if (fd < 0)
  84. {
  85. perror("accept");
  86. }
  87. else if (fd > FD_SETSIZE)
  88. {
  89. close(fd);
  90. }
  91. else
  92. {
  93. struct bufferevent *bev;
  94. evutil_make_socket_nonblocking(fd);
  95. //使用bufferevent_socket_new创建一个struct bufferevent *bev,关联该sockfd,托管给event_base
  96. ////BEV_OPT_CLOSE_ON_FREE表示释放bufferevent时关闭底层传输端口。这将关闭底层套接字,释放底层bufferevent等。
  97. bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
  98. //设置读写对应的回调函数
  99. bufferevent_setcb(bev, readcb, NULL, errorcb, NULL);
  100. //      bufferevent_setwatermark(bev, EV_READ, 0, MAX_LINE);
  101. //启用读写事件,其实是调用了event_add将相应读写事件加入事件监听队列poll。正如文档所说,如果相应事件不置为true,buf
  102. ferevent是不会读写数据的
  103. bufferevent_enable(bev, EV_READ|EV_WRITE);
  104. }
  105. }
  106. void run(void)
  107. {
  108. evutil_socket_t listener;
  109. struct sockaddr_in sin;
  110. struct event_base *base;
  111. struct event *listener_event;
  112. base = event_base_new();
  113. if (!base)
  114. return; /*XXXerr*/
  115. sin.sin_family = AF_INET;
  116. sin.sin_addr.s_addr = 0;
  117. sin.sin_port = htons(8000);
  118. listener = socket(AF_INET, SOCK_STREAM, 0);
  119. evutil_make_socket_nonblocking(listener);
  120. #ifndef WIN32
  121. {
  122. int one = 1;
  123. setsockopt(listener, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
  124. }
  125. #endif
  126. if (bind(listener, (struct sockaddr*)&sin, sizeof(sin)) < 0)
  127. {
  128. perror("bind");
  129. return;
  130. }
  131. if (listen(listener, 16)<0)
  132. {
  133. perror("listen");
  134. return;
  135. }
  136. listener_event = event_new(base, listener, EV_READ|EV_PERSIST, do_accept, (void*)base);
  137. /*XXX check it */
  138. event_add(listener_event, NULL);
  139. event_base_dispatch(base);
  140. }
  141. int main(int argc, char **argv)
  142. {
  143. setvbuf(stdout, NULL, _IONBF, 0);
  144. run();
  145. return 0;
  146. }

编译:gcc -I/usr/include-o test libevent_eventbuffer_server.c -L/usr/local/lib –levent

运行:

服务端:

客户端:

今晚博客暂时写完了,时间比较仓促,错误估计不会少,关于bufferevent很多API都还不是很熟悉,还有libevent 添加事件event_add是非线程安全的,如果使用多线程,需要保证event_add不能出现在多个线程中,以后有时间慢慢研究。

体会:关于源码,还需要好好研究,其实今晚挺郁闷的,弄了半天,没有什么进展,现在自己还有很多疑问,主要是自己太急了,不过3天时间做到基本了解,自己还算满意,下一步有时间多研究下吧。晚安,北京

如是转载,请指明原出处:http://blog.csdn.net/feitianxuxue,谢谢合作!

最新文章

  1. Javascript实现HashTable类
  2. C#设计模式-代理模式
  3. JavaScript知识总结&lt;一&gt;
  4. 怎样调整CODESOFT中条码线的宽度
  5. [转]谈谈C++中的swap函数
  6. POJ 3273
  7. 认识JavaScript的原型
  8. webstorm安装express报错
  9. Spring初识(通过小实例清晰认识Spring)
  10. LeetCode 380. Insert Delete GetRandom O(1) (插入删除和获得随机数 常数时间)
  11. Volley 图片加载相关源码解析
  12. iterm2 快捷键(转载)
  13. 阿里云 oss 上传文件,js直传,.net 签名,回调
  14. Jmeter连接数据库方式
  15. 日常训练 dfs 之 拓扑排序
  16. 【linux】硬盘原理简介和分区
  17. Browsersync结合gulp和nodemon实现express全栈自动刷新
  18. browerify初步了解
  19. iOS开发-UINavigationController简单介绍
  20. stout代码分析之零

热门文章

  1. ACM_1001_Exponentiation 详解
  2. CxImage图像库的使用 .
  3. FTP : mput with no confirmation
  4. Intellj IDEA Java随笔
  5. Javascript 事件对象进阶(一)拖拽的原理
  6. C++操作MySQL大量数据插入效率低下的解决方法
  7. MySQL数据库权限操作指南
  8. Xamarin Android.Views.WindowManagerBadTokenException: Unable to add window -- token android.os.BinderProxy
  9. Spike Notes on Theory of (Software) Transactional Memory[Doing]
  10. Github账户注册的过程