在工作中,使用uwsgi部署项目,其中uwsgi设置为多进程,并且python中使用了kafka-python模块作为生产者不断产生数据,但上线不久后几乎所有的生产者消息都报:KafkaTimeoutError这个错误,并且在kafka服务器中并没有发现收到任何消息。

于是看了看kafka-python源码,发现在执行send方法后,消息并没有立即发送,而是放到本地的缓存中,在生成KafkaProducer实例时,有个选项buffer_memory设置了缓存的大小,默认为32M,然后如果这个buffer满了就会报KafkaTimeoutError,所以初步判断两个原因:

  1 生产者消息并没有发送出去,

  2 或者消息发送相对于消息生成来说过于缓慢导致

同时又因为看到kafka服务器中并没有接收到任何消息,遂排除第二个原因。也就是说生产者消息没有发送出去。于是采用同样的配置用写了一个脚本发现kafka服务器可以接收到消息,鉴定是我的生产者有问题,遂谷歌解决问题,找到该帖子:https://github.com/dpkp/kafka-python/issues/721。发布人情况和我差不多,作者回复到:

You cannot share producer instances across processes, only threads. I expect that is why the master process pattern is failing.

Second, producer.send() is async but is not guaranteed to deliver if you close the producer abruptly. In your final example I suspect that your producer instances are so short-lived that they are being reaped before flushing all pending messages. To guarantee delivery (or exception) call producer.send().get(timeout) or producer.flush() . otherwise you'll need to figure out how to get a producer instance per-uwsgi-thread and have it shared across requests (you would still want to flush before thread shutdown to guarantee no messages are dropped)

大体上说明了两点:

  1 多进程共享同一个生产者实例有问题

  2 send方法是异步的,当执行完send后立即关闭生产者实例的话可能会导致发送失败。

第二点错误我没有犯,沾沾自喜,继续看评论:

Aha, thanks! After looking more closely at uWSGI options I discovered the lazy-apps option, which causes each worker to load the entire app itself. This seems to have resolved my issue.

提问者说他解决了该问题,于是查一查uwsgi中的lazy-apps,发现改文章:https://uwsgi-docs-zh.readthedocs.io/zh_CN/latest/articles/TheArtOfGracefulReloading.html#preforking-vs-lazy-apps-vs-lazy,其中说到:

默认情况下,uWSGI在第一个进程中加载整个应用,然后在加载完应用之后,会多次 fork() 自己。

我看看了我自己的代码我确实是在app生成之前生成了生产者实例,这就导致该实例被父进程与其子进程共享。问题终于明白,开始解决:

  1 使用lazy-apps,这样就可以了。

  2 不使用lazy-apps,在代码层面解决问题: 

# producer.py文件
import json
from kafka import KafkaProducer class Single(object):
"""单例模式"""
def __new__(cls, *args, **kwargs):
if not hasattr(cls, "_instance"):
cls._instance = super().__new__(cls)
if hasattr(cls, "initialize"):
cls._instance.initialize(*args, **kwargs)
return cls._instance class MsgQueue(Single):
"""
这个整成单例模式是因为:uwsgi配合kafka-python在多进程下会有问题,这里希望每个进程单独享有一个kafka producer实例,
也就是说当初始化app对象后,并不会生成producer实例,而是在运行时再生成,
具体参考:https://github.com/dpkp/kafka-python/issues/721
"""
app = None def initialize(self):
self.producer = KafkaProducer(bootstrap_servers=self.app.config["MQ_URI"],
api_version=self.app.config["KAFKA_API_VERSION"]) @classmethod
def init_app(cls, app):
cls.app = app def send(self, topic, data):
"""
:param topic:
:param data:
:return:
"""
data = json.dumps(data, ensure_ascii=True)
self.producer.send(topic, data.encode()) # app.py文件
from producer import MsgQueue
...
MsgQueue.init_app(app) # 业务逻辑中用到生产者的文件
from producer import MsgQueue
...
MsgQueue().send(msg)

最新文章

  1. webpack 使用教程--实时刷新测试
  2. Jmeter学习(一)
  3. VUE 入门基础(4)
  4. JSP开发模式2_JSP/Servlet/JavaBean(简单注册功能)
  5. 【BZOJ】1100: [POI2007]对称轴osi
  6. 剑指offer系列60---第一个只出现一次的字符
  7. [Everyday Mathematics]20150121
  8. [转载]C#实现POST提交方式
  9. 模拟接听电话的方法,兼容华为android5.0以上设备
  10. boost::xml——基本操作以及中文乱码解决方案 (续)
  11. Request.QueryString 不能像使用方法那样使用不可调用
  12. java实现的Trie树数据结构
  13. Codeforces 442C Artem and Array(stack+贪婪)
  14. Angular React 和 Vue的比较
  15. python爬虫从入门到放弃前奏之学习方法
  16. ASP.NET MVC5+EF6+EasyUI 后台管理系统(85)-Quartz 作业调度用法详解二
  17. LeetCode算法题-Fibonacci Number(Java实现)
  18. [物理学与PDEs]第2章习题3 Laplace 方程的 Neumann 问题
  19. Elasticsearch .net client NEST 空字符/null值查询
  20. Confluence 6 配置 Windows 服务

热门文章

  1. “Linux内核分析”第五周报告
  2. 我的github地址 https://github.com/1010de/Test.git
  3. pandas修改全列的时间格式 无需使用apply
  4. iOS 模拟器“安装”app
  5. PRML读书笔记_绪论
  6. memcache 分布式缓存
  7. 暂时刷完leetcode的一点小体会
  8. 软件开的目录规范+sys,os,time模块
  9. jmeter 创建web测试计划
  10. 2.18比赛(T2,T3留坑)