基于Python实现环形队列高效定时器
2024-09-03 09:39:48
定时器Python实现代码
import time
import redis
import multiprocessing class Base: """
redis配置
"""
redis_conf = {} """
环形队列使用redis进行存储
"""
_ri = None """
定时器轮盘大小
"""
slot_num = 15 """
存储环形队列使用的redis缓存key
"""
cache_key = 'wheel:slot_' def __init__(self, **kwargs):
for k in kwargs:
if hasattr(self, k):
setattr(self, k, kwargs[k]) self._ri = redis.Redis(**self.redis_conf) class Timer(Base):
"""
当前slot的下标
"""
_current = 0 """
事件处理
"""
event_handler = None def worker(self):
"""
# TODO 测试每个卡槽有1W事件ID的处理效率
独立进程,分发事件id给事件处理器
:return:
"""
key = self.cache_key + str(self._current) # 获取当前卡槽中需要触发的事件ID
event_ids = self._ri.zrangebyscore(key, 0, 0) # 删除当前卡槽中需要触发的事件ID
self._ri.zremrangebyscore(key, 0, 0) # 把当前卡槽剩下的事件ID全部遍历出来,减少一次剩余循环次数
surplus_event_ids = self._ri.zrange(key, 0, -1) for mid in surplus_event_ids:
self._ri.zincrby(key, mid, -1) # 把事件ID转交给handler处理
for mid in event_ids:
self.event_handler(eid=mid) exit(0) def run(self):
"""
启动进程
:return:
"""
while True:
p = multiprocessing.Process(target=self.worker)
p.start() time.sleep(1) self._current = int(time.time()) % self.slot_num class TimerEvent(Base): def add(self, event_id, emit_time):
"""
添加事件ID到定时器
:param event_id: 事件ID
:param emit_time: 触发时间
:return:
"""
current_time = int(time.time())
diff = emit_time - current_time if diff > 0:
# 计算循环次数
cycle = int(diff / self.slot_num)
# 计算要存入的slot的索引
index = (diff % self.slot_num + current_time % self.slot_num) % self.slot_num res = self._ri.zadd(self.cache_key + str(index), str(event_id), cycle)
return True if res else False return False # TODO 批量添加同一时间,不同事件ID # TODO 批量添加不同时间,不同事件ID
通过环形队列实现高效任务触发的设计说明
- redis集合【slot】
- 以redis多个有规律的键名的有序集合组成环形数组
key_1
key_2
....
key_n
- 有序集合
命令
ZADD key score member
有序集合中包含两部分, 一个是score, 一个是member score作为剩余循环次数
meber作为事件ID
- python多进程
计算当前时间应该处理的卡槽
当前slot索引 = (当前时间 % 卡槽总数 + 当前时间戳 % 卡槽总数) % 卡槽总数
"%"为取余数操作
创建独立子进程处理
当前子进程需要快速读取的剩余循环次数为0事件ID
删除当前slot已取出的事件ID
开始把事件ID依次转交给事件handler处理
应用说明
- 启动定时器
import Timer
import time def event_handler(eid):
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())), eid) t = Timer(redis_conf={
'host': '127.0.0.1',
'port': 6379,
'password': '123456',
'db': 0
}, event_handler=event_handler) times = int(time.time()) print('Current Time is ' + time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(times))) t.run()
- 添加需要延时触发事件ID
import TimerEvent
import time te = TimerEvent(redis_conf={
'host': '127.0.0.1',
'port': 6379,
'password': '123456',
'db': 0
}) times = int(time.time()) print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(times))) after_seconds_alert = 20 for x in range(100):
te.add(x, times + after_seconds_alert + x) print('Firs Emit will happened at ' + time.strftime(
'Start:%Y-%m-%d %H:%M:%S',
time.localtime(times + after_seconds_alert))
)
本文的文字及图片来源于网络,仅供学习、交流使用,不具有任何商业用途,如有问题请及时联系我们以作处理
想要获取更多Python学习资料可以加
QQ:2955637827私聊
或加Q群630390733
大家一起来学习讨论吧!
最新文章
- 标准C++中的string类的用法总结
- FastDFS分布式文件系统安装与使用(单节点)
- Android——进度对话框
- 编码,加解密,签名,Hash
- jQuery封装的表单验证,模仿网易或者腾讯登录的风格
- input解决浏览器记住密码问题
- MySql获取所有表名
- BZOJ 2969: 矩形粉刷(期望)
- 巨高兴,偶的文章 “如何在服务器上配置ODBC来访问本机DB2for Windows服务器”被推荐至CSDN博客首页
- Lambda Expression
- MongoDB:索引操作
- 三:使用docker-machine安装虚拟机上的docker
- 关于怎么解决java.lang.NoClassDefFoundError错误
- js 控制不同客户端 访问不同CSS js
- FastDFS图片服务器
- 编写高质量代码改善C#程序的157个建议——建议107:区分静态类和单例
- 模仿jq里的选择器和color样式
- JDK1.7配置及测试
- 入职第一个项目bug总结-2018/07/28
- react-slick无法显示预期效果问题