C#高性能Socket服务器的实现(IOCP)

https://www.jianshu.com/p/c65c0eb59f22

引言

我一直在探寻一个高性能的Socket客户端代码。以前,我使用Socket类写了一些基于传统异步编程模型的代码(BeginSend、BeginReceive,等等)也看过很多博客的知识,在linux中有poll和epoll来实现,在windows下面

微软MSDN中也提供了SocketAsyncEventArgs这个类来实现IOCP 地址:https://msdn.microsoft.com/zh-cn/library/system.net.sockets.socketasynceventargs.aspx

NET Framework中的APM也称为Begin/End模式。这是因为会调用Begin方法来启动异步操作,然后返回一个IAsyncResult 对象。可以选择将一个代理作为参数提供给Begin方法,异步操作完成时会调用该方法。或者,一个线程可以等待 IAsyncResult.AsyncWaitHandle。当回调被调用或发出等待信号时,就会调用End方法来获取异步操作的结果。这种模式很灵活,使用相对简单,在 .NET Framework 中非常常见。

但是,您必须注意,如果进行大量异步套接字操作,是要付出代价的。针对每次操作,都必须创建一个IAsyncResult对象,而且该对象不能被重复使用。由于大量使用对象分配和垃圾收集,这会影响性能。为了解决这个问题,新版本提供了另一个使用套接字上执行异步I/O的方法模式。这种新模式并不要求为每个套接字操作分配操作上下文对象。

代码下载:http://download.csdn.net/detail/zhujunxxxxx/8431289这里的代码优化了的

目标

在上面微软提供的例子我觉得不是很完整,没有具体一个流程,只是受到客户端消息后发送相同内容给客户端,初学者不容易看懂流程,因为我花了一天的时间来实现一个功能齐全的IOCP服务器,

效果如下



代码

首先是ICOPServer.cs 这个类是IOCP服务器的核心类,目前这个类是网络上比较全的代码,MSDN上面的例子都没有我的全

using System;  

using System.Collections.Generic;  

using System.Linq;  

using System.Text;  

using System.Net.Sockets;  

using System.Net;  

using System.Threading;  

namespace ServerTest  

{  

///   

/// IOCP SOCKET服务器  

///   

public class IOCPServer : IDisposable  

    {  

const int opsToPreAlloc = 2;  

        #region Fields  

///   

/// 服务器程序允许的最大客户端连接数  

///   

private int _maxClient;  

///   

/// 监听Socket,用于接受客户端的连接请求  

///   

private Socket _serverSock;  

///   

/// 当前的连接的客户端数  

///   

private int _clientCount;  

///   

/// 用于每个I/O Socket操作的缓冲区大小  

///   

private int _bufferSize = 1024;  

///   

/// 信号量  

///   

        Semaphore _maxAcceptedClients;  

///   

/// 缓冲区管理  

///   

        BufferManager _bufferManager;  

///   

/// 对象池  

///   

        SocketAsyncEventArgsPool _objectPool;  

private bool disposed = false;  

        #endregion  

        #region Properties  

///   

/// 服务器是否正在运行  

///   

public bool IsRunning { get; private set; }  

///   

/// 监听的IP地址  

///   

public IPAddress Address { get; private set; }  

///   

/// 监听的端口  

///   

public int Port { get; private set; }  

///   

/// 通信使用的编码  

///   

public Encoding Encoding { get; set; }  

        #endregion  

        #region Ctors  

///   

/// 异步IOCP SOCKET服务器  

///   

/// 监听的端口  

/// 最大的客户端数量  

public IOCPServer(int listenPort,int maxClient)  

:this(IPAddress.Any, listenPort, maxClient)  

        {  

        }  

///   

/// 异步Socket TCP服务器  

///   

/// 监听的终结点  

/// 最大客户端数量  

public IOCPServer(IPEndPoint localEP, int maxClient)  

:this(localEP.Address, localEP.Port,maxClient)  

        {  

        }  

///   

/// 异步Socket TCP服务器  

///   

/// 监听的IP地址  

/// 监听的端口  

/// 最大客户端数量  

public IOCPServer(IPAddress localIPAddress, int listenPort, int maxClient)  

        {  

this.Address = localIPAddress;  

this.Port = listenPort;  

this.Encoding = Encoding.Default;  

            _maxClient = maxClient;  

_serverSock =new Socket(localIPAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp);  

_bufferManager =new BufferManager(_bufferSize * _maxClient * opsToPreAlloc,_bufferSize);  

_objectPool =new SocketAsyncEventArgsPool(_maxClient);  

_maxAcceptedClients =new Semaphore(_maxClient, _maxClient);   

        }  

        #endregion  

        #region 初始化  

///   

/// 初始化函数  

///   

public void Init()  

        {  

// Allocates one large byte buffer which all I/O operations use a piece of.  This gaurds   

// against memory fragmentation  

            _bufferManager.InitBuffer();  

// preallocate pool of SocketAsyncEventArgs objects  

            SocketAsyncEventArgs readWriteEventArg;  

for (int i = 0; i < _maxClient; i++)  

            {  

//Pre-allocate a set of reusable SocketAsyncEventArgs  

readWriteEventArg =new SocketAsyncEventArgs();  

readWriteEventArg.Completed +=new EventHandler(OnIOCompleted);  

readWriteEventArg.UserToken =null;  

// assign a byte buffer from the buffer pool to the SocketAsyncEventArg object  

                _bufferManager.SetBuffer(readWriteEventArg);  

// add SocketAsyncEventArg to the pool  

                _objectPool.Push(readWriteEventArg);  

            }  

        }  

        #endregion  

        #region Start  

///   

/// 启动  

///   

public void Start()  

        {  

if (!IsRunning)  

            {  

                Init();  

IsRunning =true;  

IPEndPoint localEndPoint =new IPEndPoint(Address, Port);  

// 创建监听socket  

_serverSock =new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);  

//_serverSock.ReceiveBufferSize = _bufferSize;  

//_serverSock.SendBufferSize = _bufferSize;  

if (localEndPoint.AddressFamily == AddressFamily.InterNetworkV6)  

                {  

// 配置监听socket为 dual-mode (IPv4 & IPv6)   

// 27 is equivalent to IPV6_V6ONLY socket option in the winsock snippet below,  

_serverSock.SetSocketOption(SocketOptionLevel.IPv6, (SocketOptionName)27,false);  

_serverSock.Bind(new IPEndPoint(IPAddress.IPv6Any, localEndPoint.Port));  

                }  

else  

                {  

                    _serverSock.Bind(localEndPoint);  

                }  

// 开始监听  

_serverSock.Listen(this._maxClient);  

// 在监听Socket上投递一个接受请求。  

StartAccept(null);  

            }  

        }  

        #endregion  

        #region Stop  

///   

/// 停止服务  

///   

public void Stop()  

        {  

if (IsRunning)  

            {  

IsRunning =false;  

                _serverSock.Close();  

//TODO 关闭对所有客户端的连接  

            }  

        }  

        #endregion  

        #region Accept  

///   

/// 从客户端开始接受一个连接操作  

///   

private void StartAccept(SocketAsyncEventArgs asyniar)  

        {  

if (asyniar == null)  

            {  

asyniar =new SocketAsyncEventArgs();  

asyniar.Completed +=new EventHandler(OnAcceptCompleted);  

            }  

else  

            {  

//socket must be cleared since the context object is being reused  

asyniar.AcceptSocket =null;  

            }  

            _maxAcceptedClients.WaitOne();  

if (!_serverSock.AcceptAsync(asyniar))  

            {  

                ProcessAccept(asyniar);  

//如果I/O挂起等待异步则触发AcceptAsyn_Asyn_Completed事件  

//此时I/O操作同步完成,不会触发Asyn_Completed事件,所以指定BeginAccept()方法  

            }  

        }  

///   

/// accept 操作完成时回调函数  

///   

/// Object who raised the event.  

/// SocketAsyncEventArg associated with the completed accept operation.  

private void OnAcceptCompleted(object sender, SocketAsyncEventArgs e)  

        {  

            ProcessAccept(e);  

        }  

///   

/// 监听Socket接受处理  

///   

/// SocketAsyncEventArg associated with the completed accept operation.  

private void ProcessAccept(SocketAsyncEventArgs e)  

        {  

if (e.SocketError == SocketError.Success)  

            {  

Socket s = e.AcceptSocket;//和客户端关联的socket  

if (s.Connected)  

                {  

try  

                    {  

Interlocked.Increment(ref _clientCount);//原子操作加1  

                        SocketAsyncEventArgs asyniar = _objectPool.Pop();  

                        asyniar.UserToken = s;  

Log4Debug(String.Format("客户 {0} 连入, 共有 {1} 个连接。", s.RemoteEndPoint.ToString(), _clientCount));  

if (!s.ReceiveAsync(asyniar))//投递接收请求  

                        {  

                            ProcessReceive(asyniar);  

                        }  

                    }  

catch (SocketException ex)  

                    {  

Log4Debug(String.Format("接收客户 {0} 数据出错, 异常信息: {1} 。", s.RemoteEndPoint, ex.ToString()));  

//TODO 异常处理  

                    }  

//投递下一个接受请求  

                    StartAccept(e);  

                }  

            }  

        }  

        #endregion  

        #region 发送数据  

///   

/// 异步的发送数据  

///   

///   

///   

public void Send(SocketAsyncEventArgs e, byte[] data)  

        {  

if (e.SocketError == SocketError.Success)  

            {  

Socket s = e.AcceptSocket;//和客户端关联的socket  

if (s.Connected)  

                {  

Array.Copy(data, 0, e.Buffer, 0, data.Length);//设置发送数据  

//e.SetBuffer(data, 0, data.Length); //设置发送数据  

if (!s.SendAsync(e))//投递发送请求,这个函数有可能同步发送出去,这时返回false,并且不会引发SocketAsyncEventArgs.Completed事件  

                    {  

// 同步发送时处理发送完成事件  

                        ProcessSend(e);  

                    }  

else  

                    {  

                        CloseClientSocket(e);  

                    }  

                }  

            }  

        }  

///   

/// 同步的使用socket发送数据  

///   

///   

///   

///   

///   

///   

public void Send(Socket socket, byte[] buffer, int offset, int size, int timeout)  

        {  

            socket.SendTimeout = 0;  

int startTickCount = Environment.TickCount;  

int sent = 0; // how many bytes is already sent  

do  

            {  

if (Environment.TickCount > startTickCount + timeout)  

                {  

//throw new Exception("Timeout.");  

                }  

try  

                {  

                    sent += socket.Send(buffer, offset + sent, size - sent, SocketFlags.None);  

                }  

catch (SocketException ex)  

                {  

if (ex.SocketErrorCode == SocketError.WouldBlock ||  

                    ex.SocketErrorCode == SocketError.IOPending ||  

                    ex.SocketErrorCode == SocketError.NoBufferSpaceAvailable)  

                    {  

// socket buffer is probably full, wait and try again  

                        Thread.Sleep(30);  

                    }  

else  

                    {  

throw ex; // any serious error occurr  

                    }  

                }  

}while (sent < size);  

        }  

///   

/// 发送完成时处理函数  

///   

/// 与发送完成操作相关联的SocketAsyncEventArg对象  

private void ProcessSend(SocketAsyncEventArgs e)  

        {  

if (e.SocketError == SocketError.Success)  

            {  

                Socket s = (Socket)e.UserToken;  

//TODO  

            }  

else  

            {  

                CloseClientSocket(e);  

            }  

        }  

        #endregion  

        #region 接收数据  

///   

///接收完成时处理函数  

///   

/// 与接收完成操作相关联的SocketAsyncEventArg对象  

private void ProcessReceive(SocketAsyncEventArgs e)  

        {  

if (e.SocketError == SocketError.Success)//if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)  

            {  

// 检查远程主机是否关闭连接  

if (e.BytesTransferred > 0)  

                {  

                    Socket s = (Socket)e.UserToken;  

//判断所有需接收的数据是否已经完成  

if (s.Available == 0)  

                    {  

//从侦听者获取接收到的消息。   

//String received = Encoding.ASCII.GetString(e.Buffer, e.Offset, e.BytesTransferred);  

//echo the data received back to the client  

//e.SetBuffer(e.Offset, e.BytesTransferred);  

byte[] data = new byte[e.BytesTransferred];  

Array.Copy(e.Buffer, e.Offset, data, 0, data.Length);//从e.Buffer块中复制数据出来,保证它可重用  

string info=Encoding.Default.GetString(data);  

Log4Debug(String.Format("收到 {0} 数据为 {1}",s.RemoteEndPoint.ToString(),info));  

//TODO 处理数据  

//增加服务器接收的总字节数。  

                    }  

if (!s.ReceiveAsync(e))//为接收下一段数据,投递接收请求,这个函数有可能同步完成,这时返回false,并且不会引发SocketAsyncEventArgs.Completed事件  

                    {  

//同步接收时处理接收完成事件  

                        ProcessReceive(e);  

                    }  

                }  

            }  

else  

            {  

                CloseClientSocket(e);  

            }  

        }  

        #endregion  

        #region 回调函数  

///   

/// 当Socket上的发送或接收请求被完成时,调用此函数  

///   

/// 激发事件的对象  

/// 与发送或接收完成操作相关联的SocketAsyncEventArg对象  

private void OnIOCompleted(object sender, SocketAsyncEventArgs e)  

        {  

// Determine which type of operation just completed and call the associated handler.  

switch (e.LastOperation)  

            {  

case SocketAsyncOperation.Accept:  

                    ProcessAccept(e);  

break;  

case SocketAsyncOperation.Receive:  

                    ProcessReceive(e);  

break;  

default:  

throw new ArgumentException("The last operation completed on the socket was not a receive or send");  

            }  

        }  

        #endregion  

        #region Close  

///   

/// 关闭socket连接  

///   

/// SocketAsyncEventArg associated with the completed send/receive operation.  

private void CloseClientSocket(SocketAsyncEventArgs e)  

        {  

Log4Debug(String.Format("客户 {0} 断开连接!",((Socket)e.UserToken).RemoteEndPoint.ToString()));  

Socket s = e.UserTokenas Socket;  

            CloseClientSocket(s, e);  

        }  

///   

/// 关闭socket连接  

///   

///   

///   

private void CloseClientSocket(Socket s, SocketAsyncEventArgs e)  

        {  

try  

            {  

                s.Shutdown(SocketShutdown.Send);  

            }  

catch (Exception)  

            {  

// Throw if client has closed, so it is not necessary to catch.  

            }  

finally  

            {  

                s.Close();  

            }  

Interlocked.Decrement(ref _clientCount);  

            _maxAcceptedClients.Release();  

_objectPool.Push(e);//SocketAsyncEventArg 对象被释放,压入可重用队列。  

        }  

        #endregion  

        #region Dispose  

///   

/// Performs application-defined tasks associated with freeing,   

/// releasing, or resetting unmanaged resources.  

///   

public void Dispose()  

        {  

Dispose(true);  

GC.SuppressFinalize(this);  

        }  

///   

/// Releases unmanaged and - optionally - managed resources  

///   

/// true to release   

/// both managed and unmanaged resources; false   

/// to release only unmanaged resources.  

protected virtual void Dispose(bool disposing)  

        {  

if (!this.disposed)  

            {  

if (disposing)  

                {  

try  

                    {  

                        Stop();  

if (_serverSock != null)  

                        {  

_serverSock =null;  

                        }  

                    }  

catch (SocketException ex)  

                    {  

//TODO 事件  

                    }  

                }  

disposed =true;  

            }  

        }  

        #endregion  

public void Log4Debug(string msg)  

        {  

Console.WriteLine("notice:"+msg);  

        }  

    }  

}

最新文章

  1. 如何消除移动端a标签点击时的蓝色底色以及a标签link、visited、hover、active的顺序
  2. 自制Unity小游戏TankHero-2D(5)声音+爆炸+场景切换+武器弹药
  3. 框架SpringMVC笔记系列 二 传值
  4. c#实现无标题栏窗口的拖动
  5. asp.net中json格式化及在js中解析json
  6. Mysql 主从复制,读写分离设置
  7. 通过Performance Log确定磁盘有性能问题?
  8. 《java入门第一季》之面向对象(修饰符的概念和总结)
  9. 集合之LinkedList(含JDK1.8源码分析)
  10. Dubbo 在maven项目中的应用
  11. javascript选项卡切换样式
  12. 微信小程序开发(1) 天气预报
  13. 如何在TextView类中创建超链接 Linkify
  14. SqlBulkCopy类(将一个表插入到数据库)
  15. 6月13 ThinkPHP框架基础
  16. hadoop完全分布式的安装
  17. 关于OpenVR
  18. 文档主题生成模型(LDA)
  19. Julia - 函数返回值
  20. rest_framework之访问频率控制

热门文章

  1. php入门(三)
  2. Python基础笔记系列八:字符串的运算和相关函数
  3. no crontab for root 解决方案
  4. [mybatis]Mapper XML 文件——statementType
  5. PIL中文文档
  6. 1-22-shell脚本的基础
  7. Vue实例的生命周期created和mounted的区别
  8. react 子组件改变父组件状态
  9. Project facet Java version 1.8 is not supported.
  10. iOS安全系列之 HTTPS