python基础篇 26-redis操作
2024-10-22 04:59:29
redis的基本操作:
redis_conf ={
'host':'192.168.64.128',
'password':'Aa123456',
'db':'0',
'port':6379,
'decode_responses':True # 自动将bytes转换为 string 类型
}
r = redis.Redis(**redis_conf) # string 入参只能为字符串
# key-value形式 增加key 修改
r.set('矿泉水','{"price":111,"count":11}') # 根据key查询
data1 = r.get('矿泉水')
print(type(data1),data1) # 根据key删除
r.delete('矿泉水') # 删除key-value # 设置失效时间
expire_time = 5
r.set('tl_session','asdfdfd',expire_time) # hash 类型
# {"kqs":{xxx},"笔":{xxx}}
# 新增 修改
r.hset('students','fd','{"money":19999}')
r.hset('students','ds','{"money":19111999}')
r.hset('students','lhy','{"money":19111999}')
r.hset('students','ljj','{"money":19111999}') # 查询
print(r.hget('students','ds'))
d = r.hgetall('students')
print(d)
# 如果没有decode_responses=True 则 需要 每次decode
print({k.decode():v.decode() for k,v in d.items()}) # 删除
r.hdel('students','ds') # 删除某个小key
r.delete('students') # 删除大key # 过期时间设置
r.expire('students',10)#指定某个key的过期时间 只能对大key设置 小key不能设置 # 从redis获取数据 需要知道key是什么 和 key的类型是什么 # 获取当前数据库中 的 所有keys
print(r.keys()) # 获取当前数据库中 s开头的keys
print(r.keys('s*')) # 查看key的类型
print(r.type('stu'))
print(r.type('students')) #清空当前数据库里面所有的key
print(r.flushdb()) #清空所有数据库里面所有的key
print(r.flushall()) # 管道,批量操作
p = r.pipeline()#建立管道
p.set('xiaoming','shabi')
p.get('xiaoming')
p.execute() # 批量执行
p.exists('students123') # 判断key是否存在
s = p.execute()
print(s)
print(r.exists('students')) # 直接返回结果 p.hgetall('students')
p.hset('students','fd','{"money":19999}')
p.hset('students','ds','{"money":19111999}')
p.hset('students','lhy','{"money":19111999}')
p.hset('students','ljj','{"money":19111999}')
s = p.execute() #执行,返回一个list,这个list里面是每个命令执行的结果
print(s) # 管道批量执行 和 单个执行的速度快慢比较
start_time = time.time()
for i in range(100):
r.set(f"key{i}",f"{i}")
print('不用管道的时间',time.time() - start_time) start_time = time.time()
p = r.pipeline()
for i in range(100):
p.set(f"pipline_key{i}", f"{i}")
p.execute()
print('用管道的时间',time.time() - start_time) # string类型 展示带文件夹的
r.set('product:kqs','{"count":1,"price":5555}')
r.set('product:apple','{"count":1,"price":5555}')
r.set('product:banana','{"count":1,"price":5555}') #a服务器 -》迁移
#b服务器,redis r = redis.Redis(host='192.168.64.128',
password='Aa123456',
port=6379,
db=0,
decode_responses=True
)
r2 = redis.Redis(host='192.168.64.128',
password='Aa123456',
port=6379,
db=15,
decode_responses=True
) p = r2.pipeline() for k in r.keys():
key_type = r.type(k)
if key_type == 'string':
value = r.get(k)
p.set(k,value)
elif key_type=='hash':
hash_data = r.hgetall(k) #{'xx':xxx}
for field,data in hash_data.items():
p.hset(k,field,data) p.execute() #bytes #字节类型 b'{"price":111,"count":11}'
data = r.get('矿泉水').decode() #bytes类型 转换为 string类型
s = '你好啊'
print(s.encode()) #string类型 转换为 bytes类型
封装的基本工具:
import hashlib import pymysql,traceback,xlwt
import redis redis_conf ={
'host':'192.168.64.128',
'password':'Aa123456',
'db':'0',
'port':6379,
'decode_responses':True # 自动将bytes转换为 string 类型
}
MYSQL_INFO = {
'host':'192.168.64.128',
'user':'root',
'password':'Aa123456@',
'db':'besttest_db',
'port':3306,
'charset':'utf8',
'autocommit':True
}
def execute_sql(sql,only=False):
conn = pymysql.connect(**MYSQL_INFO)
cur = conn.cursor(pymysql.cursors.DictCursor)
try:
cur.execute(sql)
except:
print("sql不正确:",traceback.format_exc())
else:
if only:
return cur.fetchone()
return cur.fetchall()
finally:
cur.close()
conn.close()
def write_excel(name,data):
book = xlwt.Workbook()
sheet = book.add_sheet('sheet1')
for index, key in enumerate(data[0]): # 写表头
sheet.write(0, index, key)
for row, item in enumerate(data, 1): # 写数据
for col, value in enumerate(item.values()):
sheet.write(row, col, value) book.save(f"{name}.xls") def my_md5(s):
m = hashlib.md5(str(s).encode())
return m.hexdigest() class Ope_Redis:
def __init__(self,redis_conf):
self.r = redis.Redis(**redis_conf)
self.p = self.r.pipeline() # 建立管道
def set_key(self,key,value,ttl):
self.p.set(key,value,ttl)
def get_key(self,key):
return self.r.get(key)
def set_hash_key(self,field,key,value):
self.p.hset(field,key,value)
def get_hash_key(self,field=None,key=None):
if key is not None and field is not None:
return self.r.hget(field,key)
elif key is None and field is not None:
return self.r.hgetall(field)
def p_execute(self):
self.p.execute() def redis_str(key,value=False,expire_time=None):
r = redis.Redis(**redis_conf)
if value:
r.set(key,value,expire_time)
else:
return r.get(key) def redis_hash():
pass def check_session(session_id):
result = redis_str(session_id)
return result if result else False
最新文章
- android之ViewPager的使用
- jQuery之Deferred对象的使用
- Dynamics AX 4.0 多表looup
- 查看linux命令类型
- 论前端css初始化的重要性
- Tcl语言笔记之一
- Array of Objects
- Contest - 多校训练(985专场) Problem C: 985的方格难题
- [iOS11] contentInsetAdjustmentBehavior 问题, push back时, 界面会上下移动.
- java多线程系列12 ConcurrentHashMap CopyOnWriteArrayList 简介
- Libgdx学习记录28——创建Desktop程序
- vue之导入Bootstrap以及jQuery的两种方式
- libcurl 错误码总结
- Spring boot整合Mongodb
- 23、redis如何实现主从复制?以及数据同步机制?
- 左连接、右连接、内连接和where
- 使用mysqldump迁移数据
- angular2.x 多选框事件
- 【Python】urlopen小结
- Java使用imageio、awt生成图片验证码
热门文章
- ASP.NET中maxRequestLength和maxAllowedContentLength的区别;上传大文件设置IIS7文件上传的最大大小
- 浏览器是如何区分http和https协议的
- linux环境";ModuleNotFoundError: No module named 'Cryptodome'";
- ElasticSearch入门学习笔记
- 查找大文件-清理linux磁盘
- Winfrom ComboBox中的性能探索
- uniapp开发的app打开微信小程序
- 关于新手在使用git过程中的基本问题--前端开发篇
- 三本书带您快速深入掌握Spring Boot应用开发《Spring Boot从零开始学(视频教学版)》
- maven加载本地的jar包