EventManager.py

 # -*- encoding: UTF-8 -*-

 # 系统模块
from queue import Queue, Empty
from threading import * class EventManager:
def __init__(self):
"""初始化事件管理器"""
# 事件对象列表
self.__eventQueue = Queue()
# 事件管理器开关
self.__active = False
# 事件处理线程
self.__thread = Thread(target = self.__Run) # 这里的__handlers是一个字典,用来保存对应的事件的响应函数
# 其中每个键对应的值是一个列表,列表中保存了对该事件监听的响应函数,一对多
self.__handlers = {} # {事件类型:[处理事件的方法]} def __Run(self):
"""引擎运行"""
while self.__active == True:
try:
# 获取事件的阻塞时间设为1秒
event = self.__eventQueue.get(block = True, timeout = 1)
self.__EventProcess(event)
except Empty:
pass def __EventProcess(self, event):
"""处理事件"""
# 检查是否存在对该事件进行监听的处理函数
if event.type_ in self.__handlers:
# 若存在,则按顺序将事件传递给处理函数执行
for handler in self.__handlers[event.type_]:
handler(event) def Start(self):
"""启动"""
# 将事件管理器设为启动
self.__active = True
# 启动事件处理线程
self.__thread.start() def Stop(self):
"""停止"""
# 将事件管理器设为停止
self.__active = False
# 等待事件处理线程退出
self.__thread.join() def AddEventListener(self, type_, handler):
"""绑定事件和监听器处理函数"""
# 尝试获取该事件类型对应的处理函数列表,若无则创建
try:
handlerList = self.__handlers[type_]
except KeyError:
handlerList = [] self.__handlers[type_] = handlerList
# 若要注册的处理器不在该事件的处理器列表中,则注册该事件
if handler not in handlerList:
handlerList.append(handler) def RemoveEventListener(self, type_, handler):
"""移除监听器的处理函数"""
#读者自己试着实现 def SendEvent(self, event):
"""发送事件,向事件队列中存入事件"""
self.__eventQueue.put(event) """事件对象"""
class Event:
def __init__(self, type_=None):
self.type_ = type_ # 事件类型
self.dict = {} # 字典用于保存具体的事件数据

test.py

 # -*- encoding: UTF-8 -*-

 from threading import *
from EventManager import *
import time #事件名称 新文章
EVENT_ARTICAL = "Event_Artical" #事件源 公众号
class PublicAccounts:
def __init__(self,eventManager):
self.__eventManager = eventManager def WriteNewArtical(self):
#事件对象,写了新文章
event = Event(type_=EVENT_ARTICAL)
event.dict["artical"] = u'如何写出更优雅的代码\n'
#发送事件
self.__eventManager.SendEvent(event)
print(u'公众号发送新文章') #监听器 订阅者
class Listener:
def __init__(self,username):
self.__username = username #监听器的处理函数 读文章
def ReadArtical(self,event):
print(u'%s 收到新文章' % self.__username)
print(u'正在阅读新文章内容:%s' % event.dict["artical"]) """测试函数"""
def test():
listner1 = Listener("thinkroom") #订阅者1
listner2 = Listener("steve")#订阅者2 eventManager = EventManager() #绑定事件和监听器响应函数(新文章)
eventManager.AddEventListener(EVENT_ARTICAL, listner1.ReadArtical)
eventManager.AddEventListener(EVENT_ARTICAL, listner2.ReadArtical)
eventManager.Start() publicAcc = PublicAccounts(eventManager)
while True:
publicAcc.WriteNewArtical()
time.sleep(2) if __name__ == '__main__':
test()

最新文章

  1. javascript中的原型和继承
  2. 简单的 MessageBox
  3. 【温故而知新-Javascript】使用拖放
  4. asp.net MVC 源码分析
  5. 手动设定实例变量的KVO实现监听
  6. Sql server中左连接语句
  7. StringBuilder与StringBuffer的区别(转)
  8. Libpcap
  9. visual studio 配置OpenGL环境
  10. unity 调用android函数
  11. redis34--string 操作
  12. jetty服务器原理与maven集成
  13. 【转载】网站遭遇DDoS攻击怎么办
  14. Dubbo -- 系统学习 笔记 -- 示例 -- 参数验证
  15. Laravel渴求式加载(比较容易理解理解load与with关系)
  16. Intervals POJ - 3680 (MCMF)
  17. 用Python读写Excel文件的方式比较
  18. redis知识积累
  19. Xamarin iOS教程之页面控件
  20. 【Hibernate】Hibernate3.x独立执行时的Failed to load class "org.slf4j.impl.StaticLoggerBinder"错误

热门文章

  1. Java for LeetCode 095 Unique Binary Search Trees II
  2. Java中String的设计
  3. Zookeeper四字命令
  4. Swift - 修改导航栏的样式(文字颜色,背景颜色,背景图片)
  5. 用ant编译打包时 警告:编码 GBK 的不可映射字符
  6. SpringBoot2.0之整合Dubbo
  7. python绘制圆和椭圆
  8. CI框架上传csv文件
  9. physoft.net网站暂停 www.physoft.cn 正式开通 (菲烁科技, physoft)
  10. RabbitMQ消息队列随笔