redis发布和订阅

发布者一旦发送消息,那么所有订阅者都会收到。

RedisHelper


#!/usr/bin/env python
#-*- coding:utf-8 -*-
import redis
class redishelper:
def __init__(self):
self.__conn = redis.Redis(host='192.168.11.87')
def public(self, msg, chan):
self.__conn.publish(chan, msg)
return msg,chan
def subscribe(self, chan):
pub = self.__conn.pubsub()
pub.subscribe(chan)
pub.parse_response()
return pub

发布者
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import b1
obj = b1.redishelper() #实例化方法
obj.public('aaaaaa','fm111.7') #执行发布
订阅者
obj = b1.redishelper()           #实例化方法
data = obj.subscribe('fm111.7') #调用订阅方法
while True:
msg = data.parse_response() #接收发布消息
print(msg)

RabbitMQ

RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

安装


#安装配置epel源
$ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
#安装erlang
$ yum -y install erlang
#安装RabbitMQ
$ yum -y install rabbitmq-server
#启动、关闭服务
service rabbitmq-server start/stop
API
python -m pip install pika 1.基于RabbitMQ实现生产者消费者模型。对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。
生产者代码

#!/usr/bin/env python
#-*- coding:utf-8 -*-
import pika # ######################### 发消息 #########################
#连接rabbitmq服务器
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='192.168.11.158'))
#创建频道
channel = connection.channel() #如果没有这个队列会创建一个
channel.queue_declare(queue='hello') #向队列插入数值 routing_key是队列名 body是要插入的内容
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!') print(" [x] Sent 'Hello World!'") #关闭连接
connection.close()

消费者代码


#!/usr/bin/env python
#-*- coding:utf-8 -*-
import pika # ##########################取消息 ########################## #连接rabbitmq服务器
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='192.168.11.158'))
#创建频道
channel = connection.channel() #如果生产者没有运行创建队列,那么消费者创建队列
channel.queue_declare(queue='hello') def callback(ch, method, properties, body):
print(" [x] Received %r" % body) channel.basic_consume(callback,
queue='hello',
no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

当生产者生成一条数据,被消费者接收,消费者中断后如果不超过10秒,连接的时候数据还在。当超过10秒之后,重新链接,数据将消失。消费者等待链接。

2.消息不丢失(数据持久化)

1.当把no_ack=false时,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,RabbitMQ会重新将该任务添加到队列中。

2.durable

生产者代码

#!/usr/bin/env python
import pika
#链接rabbit服务器
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
#创建频道
channel = connection.channel()
#创建队列,使用durable方法
channel.queue_declare(queue='hello', durable=True)
#如果想让队列实现持久化那么加上durable=True
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!',
properties=pika.BasicProperties(
delivery_mode=2,
#标记我们的消息为持久化的 - 通过设置 delivery_mode 属性为 2
#这样必须设置,让消息实现持久化
))
#这个exchange参数就是这个exchange的名字. 空字符串标识默认的或者匿名的exchange:如果存在routing_key, 消息路由到routing_key指定的队列中。
print(" [x] 开始队列'")
connection.close()

消费者代码


#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
#创建频道
channel = connection.channel()
#创建队列,使用durable方法
channel.queue_declare(queue='hello', durable=True) def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
import time
time.sleep(10)
print('ok')
ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback,
queue='hello',
no_ack=False) print(' [*] 等待队列. To exit press CTRL+C')
channel.start_consuming()

注:标记消息为持久化的并不能完全保证消息不会丢失,尽管告诉RabbitMQ保存消息到磁盘,当RabbitMQ接收到消息还没有保存的时候仍然有一个 短暂的时间窗口. RabbitMQ不会对每个消息都执行同步fsync(2) --- 可能只是保存到缓存cache还没有写入到磁盘中,这个持久化保证不是很强,但这比我们简单的任务queue要好很多,如果你想很强的保证你可以使用 publisher confirms

3.消息获取顺序

默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。

channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列

消费者代码

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel() # make message persistent
channel.queue_declare(queue='hello') def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
import time
time.sleep(10)
print 'ok'
ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) #表示谁来谁取,不再按照奇偶排列 channel.basic_consume(callback,
queue='hello',
no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

发布与订阅

发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

发布者发送的消息实际是先发送到exchange,然后由exchange发送到相对应的队列。

exchange类型可用: direct , topic , headers 和 fanout 。

exchange type = fanout

发布者

#!/usr/bin/env python
import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.exchange_declare(exchange='logs',
type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
print(" [x] Sent %r" % message)
connection.close()

订阅者

#!/usr/bin/env python
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.exchange_declare(exchange='logs',
type='fanout') result = channel.queue_declare(exclusive=True) #队列断开后自动删除临时队列
queue_name = result.method.queue # 队列名采用服务端分配的临时队列 channel.queue_bind(exchange='logs',
queue=queue_name) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body):
print(" [x] %r" % body) channel.basic_consume(callback,
queue=queue_name,
no_ack=True) channel.start_consuming()

关键字发送( exchange type = direct)

之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

生产者

#!/usr/bin/env python
#-*- coding:utf-8 -*-
##########发送消息 import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters(
host='192.168.11.87'))
channel = connection.channel() channel.exchange_declare(exchange='direct_logs',
type='direct') severity = 'info'
message = 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
消费者
#!/usr/bin/env python
#-*-coding=utf-8-*-
import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters(
host='192.168.11.87'))
channel = connection.channel() channel.exchange_declare(exchange='direct_logs',
type='direct') result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue severities = ['error','warning','info']
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1) for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback,
queue=queue_name,
no_ack=True) channel.start_consuming()
模糊匹配( exchange type = topic)

在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

#表示可以匹配0个或多个单词
*表示只能匹配一个单词 发送者路由值 队列中 old.boy.python old.* -- 不匹配
old.boy.python old.# -- 匹配

生产者

#!/usr/bin/env python
import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.exchange_declare(exchange='topic_logs',
type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

消费者

#!/usr/bin/env python
import pika
import sys connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel() channel.exchange_declare(exchange='topic_logs',
type='topic') result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1) for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback,
queue=queue_name,
no_ack=True) channel.start_consuming()

SQLAlchemy

SQLAlchemy是Python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,简言之便是:将对象转换成SQL,然后使用数据API执行SQL并获取执行结果。

schema/type:定义的一种映射格式,把表映射成类。

sql expression language: 封装了增删改查的sql语句

engine:  引擎

connection pooling: 连接池

dialect:  用于和数据库API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作

MySQL-Python
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
pymysql
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
MySQL-Connector
mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
cx_Oracle
oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]

示例,中间状态 演示一个过程

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy import create_engine #初始化数据库连接
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5) # 执行SQL
# cur = engine.execute(
# "INSERT INTO hosts (host, color_id) VALUES ('1.1.1.22', 3)"
# ) # 新插入行自增ID
# cur.lastrowid # 执行SQL
# cur = engine.execute(
# "INSERT INTO hosts (host, color_id) VALUES(%s, %s)",[('1.1.1.22', 3),('1.1.1.221', 3),]
# ) # 执行SQL
# cur = engine.execute(
# "INSERT INTO hosts (host, color_id) VALUES (%(host)s, %(color_id)s)",
# host='1.1.1.99', color_id=3
# ) # 执行SQL
# cur = engine.execute('select * from hosts')
# 获取第一行数据
# cur.fetchone()
# 获取第n行数据
# cur.fetchmany(3)
# 获取所有数据
# cur.fetchall()

增删改查

#!/usr/bin/env python
# -*- coding:utf-8 -*- from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey metadata = MetaData() user = Table('user', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(20)),
) color = Table('color', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(20)),
)
engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5) conn = engine.connect() # 创建SQL语句,INSERT INTO "user" (id, name) VALUES (:id, :name)
conn.execute(user.insert(),{'id':7,'name':'seven'})
conn.close() # sql = user.insert().values(id=123, name='wu')
# conn.execute(sql)
# conn.close() # sql = user.delete().where(user.c.id > 1) # sql = user.update().values(fullname=user.c.name)
# sql = user.update().where(user.c.name == 'jack').values(name='ed') # sql = select([user, ])
# sql = select([user.c.id, ])
# sql = select([user.c.name, color.c.name]).where(user.c.id==color.c.id)
# sql = select([user.c.name]).order_by(user.c.name)
# sql = select([user]).group_by(user.c.name) # result = conn.execute(sql)
# print result.fetchall()
# conn.close()

完整示例

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import sessionmaker Base = declarative_base() #生成一个SqlORM 基类 engine = create_engine("mysql+mysqldb://root@localhost:3306/test",echo=False) class Host(Base):
__tablename__ = 'hosts'
id = Column(Integer,primary_key=True,autoincrement=True)
hostname = Column(String(64),unique=True,nullable=False)
ip_addr = Column(String(128),unique=True,nullable=False)
port = Column(Integer,default=22) Base.metadata.create_all(engine) #创建所有表结构 if __name__ == '__main__':
SessionCls = sessionmaker(bind=engine) #创建与数据库的会话session class ,注意,这里返回给session的是个class,不是实例
session = SessionCls()
#h1 = Host(hostname='localhost',ip_addr='127.0.0.1')
#h2 = Host(hostname='ubuntu',ip_addr='192.168.2.243',port=20000)
#h3 = Host(hostname='ubuntu2',ip_addr='192.168.2.244',port=20000)
#session.add(h3)
#session.add_all( [h1,h2])
#h2.hostname = 'ubuntu_test' #只要没提交,此时修改也没问题
#session.rollback()
#session.commit() #提交
res = session.query(Host).filter(Host.hostname.in_(['ubuntu2','localhost'])).all()
print(res)
 

最新文章

  1. HTML5移动Web开发(六)——定义一个内容策略
  2. 关于/etc/hosts文件
  3. LCD1602写自定义字符的Verilog源码
  4. 转 修改oracle用户密码永不过期
  5. HDU1711-----Number Sequence-----裸的KMP
  6. 浅谈负载均衡之【tomcat分布式session共享】
  7. java.util.MissingResourceException解决策
  8. [LeetCode]题解(python):027-Remove Element
  9. python 批量删除mysql前缀相同的表
  10. 打开eclipse &quot;Initializing Java Tooling&quot;错误
  11. Django框架详细介绍---Form表单
  12. NE76003单片机调试DS18B20 步骤
  13. js——字符串处理
  14. PHP远程下载图片,微信头像存到本地,本地图片转base64
  15. Glide4 用法总结 MD
  16. 深度学习中交叉熵和KL散度和最大似然估计之间的关系
  17. java集合类图
  18. php交叉合并数组
  19. C++ pair(对组)用法
  20. Docker命令之 run

热门文章

  1. JDK8--02:为什么要使用lambda
  2. jvm字节码和类加载机制
  3. iview表单验证--数字必填+校验
  4. rhel7 rpmbuild 制作二进制程序安装包(.rpm) 简单示例
  5. Python-读取文件的大小
  6. URL编码转换函数:escape()、encodeURI()、encodeURIComponent()讲解
  7. C#几种单例模式
  8. 安装更强大更美观的zsh,配置oh my zsh及插件
  9. Typora上传图片设置
  10. python 追踪函数调用