using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace AA
{
    public class AsynQueue<T>
    {
        //队列是否正在处理数据
        private int isProcessing;
        //有线程正在处理数据
        private const int Processing = 1;
        //没有线程处理数据
        private const int UnProcessing = 0;
        //队列是否可用
        private volatile bool enabled = true;
        private Task currentTask;
        public event Action<T> ProcessItemFunction;
        public event EventHandler<EventArgs<Exception>> ProcessException;
        private ConcurrentQueue<T> queue;

public AsynQueue()
        {
            queue = new ConcurrentQueue<T>();
            Start();
        }

public int Count
        {
            get
            {
                return queue.Count;
            }
        }

private void Start()
        {
            Thread process_Thread = new Thread(PorcessItem);
            process_Thread.IsBackground = true;
            process_Thread.Start();
        }

public void Enqueue(T items)
        {
            if (items == null)
            {
                throw new ArgumentException("items");
            }

queue.Enqueue(items);
            DataAdded();
        }

//数据添加完成后通知消费者线程处理
        private void DataAdded()
        {
            if (enabled)
            {
                if (!IsProcessingItem())
                {
                    currentTask = Task.Factory.StartNew(ProcessItemLoop);
                }
            }
        }

//判断是否队列有线程正在处理
        private bool IsProcessingItem()
        {
            return !(Interlocked.CompareExchange(ref isProcessing, Processing, UnProcessing) == 0);
        }

private void ProcessItemLoop()
        {

if (!enabled && queue.IsEmpty)
            {
                Interlocked.Exchange(ref isProcessing, 0);
                return;
            }
            T publishFrame;

if (queue.TryDequeue(out publishFrame))
            {

try
                {
                    ProcessItemFunction(publishFrame);
                }
                catch (Exception ex)
                {
                    OnProcessException(ex);
                }
            }

if (enabled && !queue.IsEmpty)
            {
                currentTask = Task.Factory.StartNew(ProcessItemLoop);
            }
            else
            {
                Interlocked.Exchange(ref isProcessing, UnProcessing);
            }
        }

/// <summary>
        ///定时处理线程调用函数 
        ///主要是监视入队的时候线程 没有来的及处理的情况
        /// </summary>
        private void PorcessItem(object state)
        {
            int sleepCount = 0;
            int sleepTime = 1000;
            while (enabled)
            {
                //如果队列为空则根据循环的次数确定睡眠的时间
                if (queue.IsEmpty)
                {
                    if (sleepCount == 0)
                    {
                        sleepTime = 1000;
                    }
                    else if (sleepCount <= 3)
                    {
                        sleepTime = 1000 * 3;
                    }
                    else
                    {
                        sleepTime = 1000 * 50;
                    }
                    sleepCount++;
                    Thread.Sleep(sleepTime);
                }
                else
                {
                    //判断是否队列有线程正在处理
                    if (enabled && Interlocked.CompareExchange(ref isProcessing, Processing, UnProcessing) == 0)
                    {
                        if (!queue.IsEmpty)
                        {
                            currentTask = Task.Factory.StartNew(ProcessItemLoop);
                        }
                        else
                        {
                            Interlocked.Exchange(ref isProcessing, 0);
                        }
                        sleepCount = 0;
                        sleepTime = 1000;
                    }
                }
            }
        }

public void Flsuh()
        {
            Stop();

if (currentTask != null)
            {
                currentTask.Wait();
            }

while (!queue.IsEmpty)
            {
                try
                {
                    T publishFrame;
                    if (queue.TryDequeue(out publishFrame))
                    {
                        ProcessItemFunction(publishFrame);
                    }
                }
                catch (Exception ex)
                {
                    OnProcessException(ex);
                }
            }
            currentTask = null;
        }

public void Stop()
        {
            this.enabled = false;
        }

private void OnProcessException(System.Exception ex)
        {
            var tempException = ProcessException;
            Interlocked.CompareExchange(ref ProcessException, null, null);

if (tempException != null)
            {
                ProcessException(ex, new EventArgs<Exception>(ex));
            }
        }

[Serializable]
        public class EventArgs<T> : System.EventArgs
        {

public T Argument;

public EventArgs() : this(default(T))
            {
            }

public EventArgs(T argument)
            {
                Argument = argument;
            }
        }
    }
}

最新文章

  1. Model-View-ViewModel for iOS [译]
  2. eclipse下项目死活不编译
  3. sql语句格式化数字(前面补0)、替换字符串
  4. 错误描述:请求“System.Data.SqlClient.SqlClientPermission, System.Data, Version=2.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089”类型的权限已失败
  5. 每天一个linux命令--退出&lt;符号
  6. 1920.154s 0.309s 30817
  7. ECSHOP 用户中心 我的订单前台显视订单每张商品图片及收货人
  8. ios中autolayout
  9. PHP 设计模式 笔记与总结(9)数据对象映射模式
  10. WPF学习笔记-如何按ESC关闭窗口
  11. C语言的本质(37)——makefile之隐含规则和模式规则
  12. sphinx分域搜索【不】需要在conf文件中使用sql_field_string
  13. greedy算法(python版)
  14. NOIP2000普及组 T1计算器的改良
  15. 关于Android中使用BottomNavigationView切换横屏导致返回主页的问题
  16. struts2之配置文件struts.xml详解
  17. 小程序 切换到tabBar页面不刷新问题
  18. Tomcat7注册为Linux服务
  19. BZOJ.1758.[WC2010]重建计划(分数规划 点分治 单调队列/长链剖分 线段树)
  20. ios多播委托

热门文章

  1. C#解决关闭多线程的form主窗体时抛出ObjectDisposedException 异常
  2. 【hdu 3478】Catch
  3. 【JavaScript】--JavaScript总结一览无余
  4. Android网络框架OkHttp之get请求(源码初识)
  5. C# List 复制克隆副本
  6. Altium Designer如何改两个原件之间的安全距离
  7. 编程——C语言的问题,堆栈
  8. (转)bat批处理的注释语句
  9. ThreadLocal深入理解与内存泄露分析
  10. 对Linux下常用头文件总结