

  1 生产者消息并没有发送出去,

  2 或者消息发送相对于消息生成来说过于缓慢导致


You cannot share producer instances across processes, only threads. I expect that is why the master process pattern is failing.

Second, producer.send() is async but is not guaranteed to deliver if you close the producer abruptly. In your final example I suspect that your producer instances are so short-lived that they are being reaped before flushing all pending messages. To guarantee delivery (or exception) call producer.send().get(timeout) or producer.flush() . otherwise you'll need to figure out how to get a producer instance per-uwsgi-thread and have it shared across requests (you would still want to flush before thread shutdown to guarantee no messages are dropped)


  1 多进程共享同一个生产者实例有问题

  2 send方法是异步的,当执行完send后立即关闭生产者实例的话可能会导致发送失败。


Aha, thanks! After looking more closely at uWSGI options I discovered the lazy-apps option, which causes each worker to load the entire app itself. This seems to have resolved my issue.


默认情况下,uWSGI在第一个进程中加载整个应用,然后在加载完应用之后,会多次 fork() 自己。


  1 使用lazy-apps,这样就可以了。

  2 不使用lazy-apps,在代码层面解决问题: 

# producer.py文件
import json
from kafka import KafkaProducer class Single(object):
def __new__(cls, *args, **kwargs):
if not hasattr(cls, "_instance"):
cls._instance = super().__new__(cls)
if hasattr(cls, "initialize"):
cls._instance.initialize(*args, **kwargs)
return cls._instance class MsgQueue(Single):
这个整成单例模式是因为:uwsgi配合kafka-python在多进程下会有问题,这里希望每个进程单独享有一个kafka producer实例,
app = None def initialize(self):
self.producer = KafkaProducer(bootstrap_servers=self.app.config["MQ_URI"],
api_version=self.app.config["KAFKA_API_VERSION"]) @classmethod
def init_app(cls, app):
cls.app = app def send(self, topic, data):
:param topic:
:param data:
data = json.dumps(data, ensure_ascii=True)
self.producer.send(topic, data.encode()) # app.py文件
from producer import MsgQueue
MsgQueue.init_app(app) # 业务逻辑中用到生产者的文件
from producer import MsgQueue


