Client端代码:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
import uuid
import time class FibonacciRpcClient(object):
def __init__(self):
#生成socket
self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#生成管道
self.channel = self.connection.channel()
#声明一个随机queue,exclusive=True会在此queue的消费者断开后,自动将queue删除
result = self.channel.queue_declare(exclusive=True)
#获取随机queue名
self.callback_queue = result.method.queue
#定义收到消息后的动作
self.channel.basic_consume(self.on_response, #回调函数on_response
no_ack=True,
queue=self.callback_queue) #获取随机queue名 def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id: #判断uuid是否是否一致
self.response = body #队列返回 def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4()) #生成uuid,等会发送给服务端
#发送消息给服务端
self.channel.basic_publish(exchange='',
routing_key='rpc_queue', #路由键
properties=pika.BasicProperties(reply_to=self.callback_queue, #告诉服务端将返回发到哪个队列
correlation_id=self.corr_id),
body=str(n)) #发送的消息
while self.response is None:
self.connection.process_data_events() #非阻塞版的start_consuming(),如果收到消息就执行on_response回调函数
print("no msg....")
time.sleep(0.5) #这里可以执行其他命令
return int(self.response) #返回结果 #生成实例
fibonacci_rpc = FibonacciRpcClient() print("[x] Requesting fib(30)") #调用call函数
response = fibonacci_rpc.call(30) print("[x] got %r " % response)

server端代码:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
import time #生成socket
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#生成管道
channel = connection.channel()
#声明一个queue防止启动报错
channel.queue_declare(queue='rpc_queue') def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n - 1) + fib(n - 2) def on_request(ch, method, props, body):
n = int(body) print("[.] fib(%s)" % n)
response = fib(n) ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=props.correlation_id),
body=str(response)) ch.basic_ack(delivery_tag=method.delivery_tag) #回复确认消息 #处理完这条再发下一条
channel.basic_qos(prefetch_count=1)
#定义收到消息动作
channel.basic_consume(on_request,queue='rpc_queue') channel.start_consuming()

最新文章

  1. java 文件按行读写
  2. Doxygen给C程序生成注释文档
  3. Linux 下解压大全
  4. [Android Pro] DES加密 version1
  5. Yii源码阅读笔记(三)
  6. ruby 编写迭代器
  7. 怎样区分JQuery对象和Dom对象 常用的写法
  8. 恢复误删的procedure
  9. Android学习笔记(五)Fragment简介
  10. 面向对象程序设计-C++ Default constructor & Copy constructor& Destructor & Operator Overloading【第九次上课笔记】
  11. Vue keep-alive实践总结
  12. java小入门的感觉
  13. WPF项目学习.二
  14. redis3.0.7集群部署手册
  15. HDFS基本原理及数据存取实战
  16. Flutter 读写本地文件
  17. 解决 Visual Studio 点击添加引用无反应的问题
  18. 从零开始学 Web 系列教程
  19. 自动生成实体类和xml
  20. LINQ中in的实现方法-LINQ To Entities如何实现查询 select * from tableA where id in (1,2,3,4)

热门文章

  1. py---------网络编程
  2. dpkg dependency problems prevent configuration
  3. Abbreviation ---- hackerrank
  4. 高可用数据同步方案-SqlServer迁移Mysql实战
  5. 行高:line-height图文解析
  6. 【Windows】命令行查询占用端口信息
  7. PHPCMS的自增长标签
  8. 第6章 传输层(详解TCP的三次握手与四次挥手)
  9. Visual Studio 2010 vs2010 英文版 使用 已有的中文版 MSDN 帮助文档
  10. 飞塔NGFW-FortiGate-5.2(BYOL)