原文:C#高性能大容量SOCKET并发(二):SocketAsyncEventArgs封装

1、SocketAsyncEventArgs介绍

SocketAsyncEventArgs是微软提供的高性能异步Socket实现类,主要为高性能网络服务器应用程序而设计,主要是为了避免在在异步套接字 I/O 量非常大时发生重复的对象分配和同步。使用此类执行异步套接字操作的模式包含以下步骤:

1.分配一个新的 SocketAsyncEventArgs 上下文对象,或者从应用程序池中获取一个空闲的此类对象。

2.将该上下文对象的属性设置为要执行的操作(例如,完成回调方法、数据缓冲区、缓冲区偏移量以及要传输的最大数据量)。

3.调用适当的套接字方法 (xxxAsync) 以启动异步操作。

4.如果异步套接字方法 (xxxAsync) 返回 true,则在回调中查询上下文属性来获取完成状态。

5.如果异步套接字方法 (xxxAsync) 返回 false,则说明操作是同步完成的。 可以查询上下文属性来获取操作结果。

6.将该上下文重用于另一个操作,将它放回到应用程序池中,或者将它丢弃。

2、SocketAsyncEventArgs封装

使用SocketAsyncEventArgs之前需要先建立一个Socket监听对象,使用如下代码:

        public void Start(IPEndPoint localEndPoint)
{
listenSocket = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
listenSocket.Bind(localEndPoint);
listenSocket.Listen(m_numConnections);
Program.Logger.InfoFormat("Start listen socket {0} success", localEndPoint.ToString());
//for (int i = 0; i < 64; i++) //不能循环投递多次AcceptAsync,会造成只接收8000连接后不接收连接了
StartAccept(null);
m_daemonThread = new DaemonThread(this);
}

然后开始接受连接,SocketAsyncEventArgs有连接时会通过Completed事件通知外面,所以接受连接的代码如下:

        public void StartAccept(SocketAsyncEventArgs acceptEventArgs)
{
if (acceptEventArgs == null)
{
acceptEventArgs = new SocketAsyncEventArgs();
acceptEventArgs.Completed += new EventHandler<SocketAsyncEventArgs>(AcceptEventArg_Completed);
}
else
{
acceptEventArgs.AcceptSocket = null; //释放上次绑定的Socket,等待下一个Socket连接
} m_maxNumberAcceptedClients.WaitOne(); //获取信号量
bool willRaiseEvent = listenSocket.AcceptAsync(acceptEventArgs);
if (!willRaiseEvent)
{
ProcessAccept(acceptEventArgs);
}
}

接受连接响应事件代码:

        void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs acceptEventArgs)
{
try
{
ProcessAccept(acceptEventArgs);
}
catch (Exception E)
{
Program.Logger.ErrorFormat("Accept client {0} error, message: {1}", acceptEventArgs.AcceptSocket, E.Message);
Program.Logger.Error(E.StackTrace);
}
}
        private void ProcessAccept(SocketAsyncEventArgs acceptEventArgs)
{
Program.Logger.InfoFormat("Client connection accepted. Local Address: {0}, Remote Address: {1}",
acceptEventArgs.AcceptSocket.LocalEndPoint, acceptEventArgs.AcceptSocket.RemoteEndPoint); AsyncSocketUserToken userToken = m_asyncSocketUserTokenPool.Pop();
m_asyncSocketUserTokenList.Add(userToken); //添加到正在连接列表
userToken.ConnectSocket = acceptEventArgs.AcceptSocket;
userToken.ConnectDateTime = DateTime.Now; try
{
bool willRaiseEvent = userToken.ConnectSocket.ReceiveAsync(userToken.ReceiveEventArgs); //投递接收请求
if (!willRaiseEvent)
{
lock (userToken)
{
ProcessReceive(userToken.ReceiveEventArgs);
}
}
}
catch (Exception E)
{
Program.Logger.ErrorFormat("Accept client {0} error, message: {1}", userToken.ConnectSocket, E.Message);
Program.Logger.Error(E.StackTrace);
} StartAccept(acceptEventArgs); //把当前异步事件释放,等待下次连接
}

接受连接后,从当前Socket缓冲池AsyncSocketUserTokenPool中获取一个用户对象AsyncSocketUserToken,AsyncSocketUserToken包含一个接收异步事件m_receiveEventArgs,一个发送异步事件m_sendEventArgs,接收数据缓冲区m_receiveBuffer,发送数据缓冲区m_sendBuffer,协议逻辑调用对象m_asyncSocketInvokeElement,建立服务对象后,需要实现接收和发送的事件响应函数:

        void IO_Completed(object sender, SocketAsyncEventArgs asyncEventArgs)
{
AsyncSocketUserToken userToken = asyncEventArgs.UserToken as AsyncSocketUserToken;
userToken.ActiveDateTime = DateTime.Now;
try
{
lock (userToken)
{
if (asyncEventArgs.LastOperation == SocketAsyncOperation.Receive)
ProcessReceive(asyncEventArgs);
else if (asyncEventArgs.LastOperation == SocketAsyncOperation.Send)
ProcessSend(asyncEventArgs);
else
throw new ArgumentException("The last operation completed on the socket was not a receive or send");
}
}
catch (Exception E)
{
Program.Logger.ErrorFormat("IO_Completed {0} error, message: {1}", userToken.ConnectSocket, E.Message);
Program.Logger.Error(E.StackTrace);
}
}

在Completed事件中需要处理发送和接收的具体逻辑代码,其中接收的逻辑实现如下:

        private void ProcessReceive(SocketAsyncEventArgs receiveEventArgs)
{
AsyncSocketUserToken userToken = receiveEventArgs.UserToken as AsyncSocketUserToken;
if (userToken.ConnectSocket == null)
return;
userToken.ActiveDateTime = DateTime.Now;
if (userToken.ReceiveEventArgs.BytesTransferred > 0 && userToken.ReceiveEventArgs.SocketError == SocketError.Success)
{
int offset = userToken.ReceiveEventArgs.Offset;
int count = userToken.ReceiveEventArgs.BytesTransferred;
if ((userToken.AsyncSocketInvokeElement == null) & (userToken.ConnectSocket != null)) //存在Socket对象,并且没有绑定协议对象,则进行协议对象绑定
{
BuildingSocketInvokeElement(userToken);
offset = offset + 1;
count = count - 1;
}
if (userToken.AsyncSocketInvokeElement == null) //如果没有解析对象,提示非法连接并关闭连接
{
Program.Logger.WarnFormat("Illegal client connection. Local Address: {0}, Remote Address: {1}", userToken.ConnectSocket.LocalEndPoint,
userToken.ConnectSocket.RemoteEndPoint);
CloseClientSocket(userToken);
}
else
{
if (count > 0) //处理接收数据
{
if (!userToken.AsyncSocketInvokeElement.ProcessReceive(userToken.ReceiveEventArgs.Buffer, offset, count))
{ //如果处理数据返回失败,则断开连接
CloseClientSocket(userToken);
}
else //否则投递下次介绍数据请求
{
bool willRaiseEvent = userToken.ConnectSocket.ReceiveAsync(userToken.ReceiveEventArgs); //投递接收请求
if (!willRaiseEvent)
ProcessReceive(userToken.ReceiveEventArgs);
}
}
else
{
bool willRaiseEvent = userToken.ConnectSocket.ReceiveAsync(userToken.ReceiveEventArgs); //投递接收请求
if (!willRaiseEvent)
ProcessReceive(userToken.ReceiveEventArgs);
}
}
}
else
{
CloseClientSocket(userToken);
}
}

由于我们制定的协议第一个字节是协议标识,因此在接收到第一个字节的时候需要绑定协议解析对象,具体代码实现如下:

        private void BuildingSocketInvokeElement(AsyncSocketUserToken userToken)
{
byte flag = userToken.ReceiveEventArgs.Buffer[userToken.ReceiveEventArgs.Offset];
if (flag == (byte)SocketFlag.Upload)
userToken.AsyncSocketInvokeElement = new UploadSocketProtocol(this, userToken);
else if (flag == (byte)SocketFlag.Download)
userToken.AsyncSocketInvokeElement = new DownloadSocketProtocol(this, userToken);
else if (flag == (byte)SocketFlag.RemoteStream)
userToken.AsyncSocketInvokeElement = new RemoteStreamSocketProtocol(this, userToken);
else if (flag == (byte)SocketFlag.Throughput)
userToken.AsyncSocketInvokeElement = new ThroughputSocketProtocol(this, userToken);
else if (flag == (byte)SocketFlag.Control)
userToken.AsyncSocketInvokeElement = new ControlSocketProtocol(this, userToken);
else if (flag == (byte)SocketFlag.LogOutput)
userToken.AsyncSocketInvokeElement = new LogOutputSocketProtocol(this, userToken);
if (userToken.AsyncSocketInvokeElement != null)
{
Program.Logger.InfoFormat("Building socket invoke element {0}.Local Address: {1}, Remote Address: {2}",
userToken.AsyncSocketInvokeElement, userToken.ConnectSocket.LocalEndPoint, userToken.ConnectSocket.RemoteEndPoint);
}
}

发送响应函数实现需要注意,我们是把发送数据放到一个列表中,当上一个发送事件完成响应Completed事件,这时我们需要检测发送队列中是否存在未发送的数据,如果存在则继续发送。

        private bool ProcessSend(SocketAsyncEventArgs sendEventArgs)
{
AsyncSocketUserToken userToken = sendEventArgs.UserToken as AsyncSocketUserToken;
if (userToken.AsyncSocketInvokeElement == null)
return false;
userToken.ActiveDateTime = DateTime.Now;
if (sendEventArgs.SocketError == SocketError.Success)
return userToken.AsyncSocketInvokeElement.SendCompleted(); //调用子类回调函数
else
{
CloseClientSocket(userToken);
return false;
}
}

SendCompleted用于回调下次需要发送的数据,具体实现过程如下:

        public virtual bool SendCompleted()
{
m_activeDT = DateTime.UtcNow;
m_sendAsync = false;
AsyncSendBufferManager asyncSendBufferManager = m_asyncSocketUserToken.SendBuffer;
asyncSendBufferManager.ClearFirstPacket(); //清除已发送的包
int offset = 0;
int count = 0;
if (asyncSendBufferManager.GetFirstPacket(ref offset, ref count))
{
m_sendAsync = true;
return m_asyncSocketServer.SendAsyncEvent(m_asyncSocketUserToken.ConnectSocket, m_asyncSocketUserToken.SendEventArgs,
asyncSendBufferManager.DynamicBufferManager.Buffer, offset, count);
}
else
return SendCallback();
} //发送回调函数,用于连续下发数据
public virtual bool SendCallback()
{
return true;
}

当一个SocketAsyncEventArgs断开后,我们需要断开对应的Socket连接,并释放对应资源,具体实现函数如下:

        public void CloseClientSocket(AsyncSocketUserToken userToken)
{
if (userToken.ConnectSocket == null)
return;
string socketInfo = string.Format("Local Address: {0} Remote Address: {1}", userToken.ConnectSocket.LocalEndPoint,
userToken.ConnectSocket.RemoteEndPoint);
Program.Logger.InfoFormat("Client connection disconnected. {0}", socketInfo);
try
{
userToken.ConnectSocket.Shutdown(SocketShutdown.Both);
}
catch (Exception E)
{
Program.Logger.ErrorFormat("CloseClientSocket Disconnect client {0} error, message: {1}", socketInfo, E.Message);
}
userToken.ConnectSocket.Close();
userToken.ConnectSocket = null; //释放引用,并清理缓存,包括释放协议对象等资源 m_maxNumberAcceptedClients.Release();
m_asyncSocketUserTokenPool.Push(userToken);
m_asyncSocketUserTokenList.Remove(userToken);
}

3、SocketAsyncEventArgs封装和MSDN的不同点

MSDN在http://msdn.microsoft.com/zh-cn/library/system.net.sockets.socketasynceventargs(v=vs.110).aspx实现了示例代码,并实现了初步的池化处理,我们是在它的基础上扩展实现了接收数据缓冲,发送数据队列,并把发送SocketAsyncEventArgs和接收SocketAsyncEventArgs分开,并实现了协议解析单元,这样做的好处是方便后续逻辑实现文件的上传,下载和日志输出。

DEMO下载地址:http://download.csdn.net/detail/sqldebug_fan/7467745

免责声明:此代码只是为了演示C#完成端口编程,仅用于学习和研究,切勿用于商业用途。水平有限,C#也属于初学,错误在所难免,欢迎指正和指导。邮箱地址:fansheng_hx@163.com。

最新文章

  1. SVN分支和映射总结和数据库初步使用
  2. understand equal and gethashcode
  3. Insertion Sort List
  4. JS 学习笔记--JS中的事件对象基础
  5. 配置hive元数据库mysql时候出现 Unable to find the JDBC database jar on host : master
  6. java里面的equals和hashcode的总结
  7. linux下制作共享库.a和 .so
  8. Java面试题之Struts优缺点
  9. Trufun云端建模平台之云端UML工具发布
  10. sql2005数据库置疑修复断电崩溃索引损坏 数据库索引错误修复/数据库表损坏/索引损坏/系统表混乱等问题修复
  11. C#从基于FTPS的FTP server下载数据 (FtpWebRequest 的使用)SSL 加密
  12. 利用grep-console插件使Intellij idea显示多颜色调试日志
  13. Lintcode389 Valid Sudoku solution 题解
  14. Spring Cloud @HystrixCommand和@CacheResult注解使用,参数配置
  15. semantic ui框架学习笔记二
  16. 2018.10.24 NOIP模拟 小 C 的序列(链表+数论)
  17. Qwidget+opencv显示图像
  18. Unity3d 常用的方法
  19. 微信小程序中的bindTap事件(微信小程序开发QQ群:604788754)
  20. IPv6 Scapy Samples

热门文章

  1. LUOGU 1525 关押罪犯 - 并查集拆点(对立点) / 二分+二分图染色
  2. Log4erl
  3. 检索 04 --Stack栈 Queue队列 Hashtable哈希表
  4. 在C++ Builder6上使用Boost正则表达式库
  5. 前端常见算法JS实现
  6. base64编码转图片
  7. OpenCL基本概念
  8. DirectUI 2D/3D 界面库集合 分析之总结
  9. Array.prototype.forEach()&amp;&amp;Array.prototype.map()
  10. C++使用Windows API CreateMutex函数多线程编程