目录

1、Redis基本概念

2、Redis的5种基本类型

3、Jedis整合redis操作

4、Springboot整合redis

5、Redis主从复制

5.1、概念

5.2、原理

6、开启主从复制

6.1、一主多仆

6.2、多主多仆分级备份

6.3、哨兵模式

6.3.1、哨兵模式实现

6.4、Redis集群

6.4.1、集群的概念

2.4.2、搭建一个集群

6.4.3、故障恢复

6.4.4、使用jedis操作集群

6.5、Redis应用问题

6.5.1、缓存穿透

6.5.2、缓存击穿

6.5.3、缓存雪崩

6.6、分布式锁

6.6.1、概述

6.6.2、redis实现分布式锁

7、redis6新特性

7.1、在redis中添加新用户并设置操作权限

8、总结

9、参考资料


1、Redis基本概念

Redis是一个开源的key-value存储系统。Redis的应用场景为配合关系型数据库做高速缓存,降低数据库IO。Redis的五种基本类型:String(字符串),list(链表),set(集合),zset(有序集合),hash,stream(Redis5.0后的新数据结构)需要注意的是,Redis是单线程的,如果一次批量处理命令过多,会造成Redis阻塞或网络拥塞(传输数据量大)。

在redis中默认是有16个数据库,可以在redis配置文件中看到。可以通过修改redis的配置文件来修改redis中数据库的数量。redis中数据库都是通过序号进行标识的。

默认是使用0号数据库,可以通过select “编号” 切换redis数据库。

CONFIG GET databases  #获取当前redis中一共有多少个数据库

select xx  #切换数据库

2、Redis的5种基本类型

1、String

string 是Redis的最基本的数据类型,可以理解为与 Memcached 一模一样的类型,一个key 对应一个 value。string 类型是二进制安全的,意思是 Redis 的 string 可以包含任何数据,比如图片或者序列化的对象,一个 redis 中字符串 value 最多可以是 512M。

Incr:可以给key中的value值加1,如果key不存在则先初始化为0后再加1

Decr:可以给key中的value值减1,如果key不存在则先初始化为0后再减1

Incrby: 可以给key中的value值加指定的值,如果key不存在则先初始化为0后再加指定的值

典型使用场景

  一、计数

  由于Redis单线程的特点,我们不用考虑并发造成计数不准的问题,通过 incrby 命令,我们可以正确的得到我们想要的结果。

  二、限制次数

  比如登录次数校验,错误超过三次5分钟内就不让登录了,使用setex每次登录设置key自增一次,并设置该key的过期时间为5分钟后,每次登录检查一下该key的值来进行限制登录。

2、hash数据类型

hash 是一个键值对集合,是一个 string 类型的 key和 value 的映射表,key 还是key,但是value是一个键值对(key-value)。类比于 Java里面的 Map<String,Map<String,Object>> 集合。

用法:

典型使用场景

  查询的时间复杂度是O(1),用于缓存一些信息。

3、list数据类型

list 列表,它是简单的字符串列表,按照插入顺序排序,你可以添加一个元素到列表的头部(左边)或者尾部(右边),它的底层实际上是个链表。

列表有两个特点:1、有序。2、可重复

用法:

 

典型使用场景

  一、栈:通过命令 lpush+lpop

  二、队列:命令 lpush+rpop

  三、有限集合:命令 lpush+ltrim

  四、消息队列:命令 lpush+brpop

4、set数据类型

Redis 的 set 是 string 类型的无序集合。

相对于列表,集合也有两个特点:1、无序  2、不可重复

用法:

 

典型使用场景

利用集合的交并集特性,比如在社交领域,我们可以很方便的求出多个用户的共同好友,共同感兴趣的领域等。

5、zset数据类型

zset(sorted set 有序集合),和上面的set 数据类型一样,也是 string 类型元素的集合,但是它是有序的。

用法:

典型使用场景

  和set数据结构一样,zset也可以用于社交领域的相关业务,并且还可以利用zset 的有序特性,还可以做类似排行榜的业务。

6、stream(Redis5.0新数据结构)

redis stream主要用于消息队列(MQ,Message Queue),Redis本身是有一个Redis发布订阅(pub/sub)来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis宕机等,消息就会被丢弃。

3、Jedis整合redis操作

创建一个maven项目,在pom.xml中引入Jedis依赖

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.2.0</version>
</dependency>
public class Jedis_RedisTest {
public static void main(String[] args) {
Jedis jedis = new Jedis("192.168.1.2", 6379);
//jedis.auth("123456"); //redis账户密码(默认账户默认是没有密码的
String rs = jedis.ping();
System.out.println("连接成功:" + rs);
jedis.close();
}

Jedis对redis常用数据类型的一些操作(Jedis对于redis中数据类型操作方法同直接在redis中对数据类型操作的方法是一样的)

//操作key String
@Test
public void demo1() {
Jedis jedis = new Jedis("192.168.1.2", 6379); //设置单个key-value
jedis.set("name", "张三");
String name = jedis.get("name");
System.out.println(name); //设置多个key-value
jedis.mset("k1", "v1", "k2", "v2", "k3", "v3");
List<String> mgetlist = jedis.mget("k1", "k2", "k3");
for (String ss : mgetlist) {
System.out.println(ss);
}
Set<String> keys = jedis.keys("*");//获取redis中所有的set
for (String s : keys) {
System.out.println(s);
}
jedis.del("name"); //删除key
jedis.setex("name",10,"李四");
} //操作list
@Test
public void demo2() {
Jedis jedis = new Jedis("192.168.1.2", 6379);
jedis.lpush("team1", "张三", "李四", "王五");
jedis.lpush("team2", "赵六", "刘七", "关八");
List<String> values = jedis.lrange("team1", 0, -1);
System.out.println(values.toString());
}

4、Springboot整合redis

代码下载地址:https://gitee.com/wulinchun/redis-learning.git     (dev分支)

创建一个maven项目,在pom.xml中引入redis相关依赖

 <!-- redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency> <!-- spring2.X集成redis所需common-pool2-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.6.0</version>
</dependency>

application.propertiesp配置

#Redis服务器地址
spring.redis.host=192.168.1.2
#Redis服务器连接端口
spring.redis.port=6379
#Redis数据库索引(默认为0)
spring.redis.database=0
#连接超时时间(毫秒)
spring.redis.timeout=1800000
#连接池最大连接数(使用负值表示没有限制)
spring.redis.lettuce.pool.max-active=20
#最大阻塞等待时间(负数表示没限制)
spring.redis.lettuce.pool.max-wait=-1
#连接池中的最大空闲连接
spring.redis.lettuce.pool.max-idle=5
#连接池中的最小空闲连接
spring.redis.lettuce.pool.min-idle=0
# 关闭超时时间
#pring.redis.lettuce.shutdown-timeout=100
#配置spring启动端口号
server.port=8080

yml配置

server:
port: 8080 redis:
port: 6379
database: 0
host: 192.168.1.2
password:
lettuce:
pool:
#连接池最大连接数(使用负值表示没有限制)
max-active: 300
# 连接池最大阻塞等待时间(使用负值表示没有限制)
max-wait: -1
# 连接池中的最大空闲连接
max-idle: 8
# 连接池中的最小空闲连接
min-idle: 0

配置类

package com.springboot_redis.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer; import java.time.Duration; /**
* @author WuL2
* @create 2021-05-25 17:12
* @desc
**/
@EnableCaching
@Configuration
public class RedisConfig extends CachingConfigurerSupport { @Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
template.setConnectionFactory(factory);
template.setKeySerializer(redisSerializer); //key序列化方式
template.setValueSerializer(jackson2JsonRedisSerializer); //value序列化
template.setHashValueSerializer(jackson2JsonRedisSerializer); //value hashmap序列化
return template;
} @Bean(name = "cacheManager")
public CacheManager cacheManager(RedisConnectionFactory factory) {
RedisSerializer<String> redisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
//解决查询缓存转换异常的问题
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
// 配置序列化(解决乱码的问题),过期时间600秒
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofSeconds(600))
.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer))
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
.disableCachingNullValues();
RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
.cacheDefaults(config)
.build();
return cacheManager;
}
}

测试类

package com.springboot_redis;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.test.context.junit4.SpringRunner; import javax.annotation.Resource;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit; /**
* @author: wu linchun
* @time: 2021/6/28 20:43
* @description:
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class MyTest { @Resource
//因为StringRedisTemplate类是继承RedisTemplate类的,所以StringRedisTemplate和RedisTemplate其实是同一类型的
//如果不加@Qualifier指定具体注入哪个类对象的话,@Autowired是按类型注入,因此系统会不知道注入StringRedisTemplate类还是RedisTemplate类的实例
private RedisTemplate redisTemplate; /**
* @author: Wu Linchun
* @date: 2021/6/28 21:22
* @description: RedisTemplate操作String
*/ @Test //opsForValue操作字符串
public void testString1() {
//设置值到redis
redisTemplate.opsForValue().set("name", "lucy");
//从redis获取值
String name = (String) redisTemplate.opsForValue().get("name");
//从redis删除值
redisTemplate.delete("name");
//判断redis中是否含有key “name”
boolean flag = redisTemplate.hasKey("name");
//给key设置过期时间为60秒,20秒后自动删除
redisTemplate.opsForValue().set("key:sec", "helloworld", 20, TimeUnit.SECONDS);
System.out.println(name);
System.out.println(flag);
} public void testString2() {
//给key设置过期时间为60秒,60秒后自动删除
redisTemplate.boundValueOps("testValue1").set("value1", 60, TimeUnit.SECONDS);
//设置key -value
redisTemplate.boundValueOps("testValue2").set("value2");
//从redis获取值
Object o1 = redisTemplate.boundValueOps("testValue1").get();
Object o2 = redisTemplate.boundValueOps("testValue2").get();
System.out.println(o1 + " " + o2);
} /**
* @author: Wu Linchun
* @date: 2021/6/28 21:22
* @description: RedisTemplate操作Hash
*/
@Test //opsForHash操作hash
public void testHash1() {
redisTemplate.opsForHash().put("hash", "h1", "v1");
redisTemplate.opsForHash().put("hash", "h2", "v2");
redisTemplate.opsForHash().put("hash", "h3", "v3"); Object hashsK = redisTemplate.opsForHash().keys("hash");
Object hashsV = redisTemplate.opsForHash().values("hash");
System.out.println(hashsK);
System.out.println(hashsV);
} @Test //boundHashOps操作hash
public void testHash2() {
redisTemplate.boundHashOps("hash1").put("1", "a");
redisTemplate.boundHashOps("hash1").put("2", "b");
redisTemplate.boundHashOps("hash1").put("3", "c"); Object hashsK = redisTemplate.boundHashOps("hash1").keys();
Object hashsV = redisTemplate.boundHashOps("hash1").values();
System.out.println(hashsK);
System.out.println(hashsV);
} /**
* @author: Wu Linchun
* @date: 2021/6/28 21:24
* @description: RedisTemplate操作list
*/
@Test //opsForList操作list
public void testList1() {
redisTemplate.opsForList().leftPush("list", "v1");
redisTemplate.opsForList().leftPushAll("list", "v2", "xxx", "list", "v3");
redisTemplate.opsForList().rightPush("list", "v1");
redisTemplate.opsForList().rightPushAll("list", "v2", "list", "v3");
// -1表示获取所有
List<String> llist = redisTemplate.opsForList().range("list", 0, -1);
//删除1个v1
redisTemplate.opsForList().remove("list", 1, "v1");
System.out.println(llist);
} @Test //boundListOps操作list
public void testList2() {
redisTemplate.boundListOps("list").leftPush("aaa");
redisTemplate.boundListOps("list").leftPush("aaa");
redisTemplate.boundListOps("list").leftPush("aaa");
redisTemplate.boundListOps("list").leftPush("bbb");
redisTemplate.boundListOps("list").rightPush("ccc");
redisTemplate.boundListOps("list").rightPush("ddd"); //查询,range(0,10)会查询出0-10的元素
List list = redisTemplate.boundListOps("list").range(0, 10);
System.out.println(list);
//获取key值
System.out.println(redisTemplate.boundValueOps("list").getKey());
//删除两个个aaa
redisTemplate.boundListOps("list").remove(2, "aaa"); //查询,range(0,10)会查询出0-10的元素
List list1 = redisTemplate.boundListOps("list").range(0, 10);
System.out.println(list1);
//设置60秒后过期
redisTemplate.boundListOps("list").expire(60, TimeUnit.SECONDS); //根据索引获取值
System.out.println(redisTemplate.boundListOps("list").index(1));
//打印左边起第一个元素值
System.out.println(redisTemplate.boundListOps("list").leftPop());
} /**
* @author: Wu Linchun
* @date: 2021/6/28 21:43
* @description: RedisTemplate操作Set
*/ @Test //opsForSet操作set
public void testSet1() {
//给set添加元素
redisTemplate.opsForSet().add("set", "aaa");
redisTemplate.opsForSet().add("set", "bbb");
redisTemplate.opsForSet().add("set", "ccc");
//删除set中ccc
redisTemplate.opsForSet().remove("set", "ccc");
//获取set所有元素
Set<String> set = redisTemplate.opsForSet().members("set");
System.out.println(set);
} @Test //boundSetOps操作set
public void testSet2() {
//给set添加元素
redisTemplate.boundSetOps("set").add("aaa");
redisTemplate.boundSetOps("set").add("bbb");
redisTemplate.boundSetOps("set").add("ccc");
redisTemplate.boundSetOps("set").add("ddd");
Set set = redisTemplate.boundSetOps("set").members();
System.out.println(set);
//删除aaa这个元素
redisTemplate.boundSetOps("set").remove("aaa");
//删除整个集合
redisTemplate.delete("set");
} /**
* @author: Wu Linchun
* @date: 2021/6/28 22:02
* @description: RedisTemplate操作ZSet
*/
@Test //opsForZSet操作zset
public void testZSet1() {
//给zset添加元素
redisTemplate.opsForZSet().add("zset", "aaa", 1);
redisTemplate.opsForZSet().add("zset", "bbb", 2);
redisTemplate.opsForZSet().add("zset", "ccc", 3);
//删除zset中ccc
redisTemplate.opsForZSet().remove("zset", "ccc");
//删除0-1的元素
redisTemplate.opsForZSet().removeRange("zset", 0, 1);
//获取zset所有元素
Set<String> zset = redisTemplate.opsForZSet().range("zset", 0, -1);
System.out.println(zset);
} @Test //boundZSetOps操作zset
public void testZSet2() {
//给zset添加元素
redisTemplate.boundZSetOps("zset").add("aaa", 1);
redisTemplate.boundZSetOps("zset").add("bbb", 2);
redisTemplate.boundZSetOps("zset").add("ccc", 3); //获取下标0-10的元素
Set zset = redisTemplate.boundZSetOps("zset").range(0, 10);
System.out.println(zset);
//删除0-2的元素,共三个
redisTemplate.boundZSetOps("zset").removeRange(0, 2);
Set zset1 = redisTemplate.boundZSetOps("zset").range(0, 10);
System.out.println(zset1); } }

5、Redis主从复制

5.1、概念

主从复制是主机(master以写为主),会自动同步备份数据到从机(slaver以读为主)的机制。

主从复制读写分离,易于快速扩展。并且具有防灾减损的能力。

如果从机数量过多的化会出现复制延迟,这是由于所有的写操作都是先在Master上操作,然后同步更新到Slave上,所以从Master同步到Slave机器有一定的延迟,当系统很繁忙的时候,延迟问题会更加严重,Slave机器数量的增加也会使这个问题更加严重。

5.2、原理

  1. 当从服务器连接上主服务器后,从服务器向主服务器发送数据同步消息。
  2. 主服务器接收到从服务器发送过来的同步消息后会把主服务器数据持久化为rdb文件。把rdb文件发送到从服务器,从服务器拿到rdb文件进行读取,从而完成数据的备份。
  3. 每次主服务器进行写操作之后,会和从服务器进行数据同步。

6、开启主从复制

6.1、一主多仆

在linux系统中新建一个文件夹redis_conf用来存放redis的配置文件。将redis.conf和dump.rdb文件拷贝进去并把名称都改为xxx6379,xxx6380,xxx6391的形式。

redis.conf是redis的配置文件。

在redis6379.conf中进行如下配置

port 6379
dbfilename dump6379.rdb
daemonize yes #daemonize值yes为要用守护线程的方式启动,让redis在后台运行,即使用redis-server启动时不会出现redis的启动画面了

同理在redis6380.conf和redis6381.conf中也这样配置

port 6380
dbfilename dump6380.rdb
daemonize yes
port 6381
dbfilename dump6381.rdb
daemonize yes

启动三台redis服务器

将redis6379设置为主机,redis6380和redis6381设置为redis6379的从机

在6380和6381的客户端分别输入如下命令:

slaveof 127.0.0.1 6379  #成为本机(127.0.0.1)6379端口的从机

在6379客户端输入info replication打印主从复制信息

info replication   #打印主从复制信息

测试一下:

主机(端口6379可以写也可以读)而从机(端口6380和6381不可以写,只能读)

主机上写入的数据,主机从机都可以读取。

如果从机挂掉了,那么再次启动从机,则从机又单独变为了主机。把它作为从机重新加到原来的主机后,它会将主机的数据全部复制过来(包括在它挂掉的时候主机新增的数据)主从同步

当然如果主机挂掉了,从机的主机还是原来的那个,是不会变的。从机也不会自动升级为主机。除非设置了哨兵模式。

如果想要把从机变为主机,可以使用slaveof no one命令。

slaveof no one

以上这种方式叫做一主多仆,一个主机会把写入它的数据备份到多个从机中去。但是如果从机数量太多的话,会给主机备份数据到从机带来很大压力。

6.2、多主多仆分级备份

一主多仆的方式会给主机带来很大的压力,因此在集群中会采用多主多仆分级备份的方式,即依次会在主机的从机中选部分从机作为其他从机的主机。主机把数据备份到部分升级为“主机”的“从机”,这样那些升级为“主机”的“从机”会继续完成数据备份到其他从机的工作。给主机减轻压力,并起到去中心化降低风险的作用。

实现方法为用在从机中用slaveof ,将从机作为其他从机的从机。

6.3、哨兵模式

哨兵模式会自动监测主机是否故障,如果主机故障,则会根据首先根据优先级,如果优先级相同则根据偏移量或者runid选择相应的从机作为主机。

6.3.1、哨兵模式实现

新建一个sentinel.conf的配置文件,在配置文件中添加:

sentinel monitor mymaster 127.0.0.1 6379 1
daemonize no

启动哨兵模式

redis-sentinel sentinel.conf

注意!!!如果在启动过程中出现如下报错,表示端口被占用,需杀掉占用该端口的相应进程。

哨兵模式启动成功

如果主机6379挂掉了

注:如果此时6379又重新启动了,在哨兵模式下6379还是会作为6380的从机,除非6380挂掉了,6379才有可能竞争成为主机!

当主机挂掉后,哨兵会在其余从机中根据优先级选择新的主机,优先级的配置在redis.conf中。

6.4、Redis集群

6.4.1、集群的概念

Redis 集群实现了对Redis的水平扩容,即启动N个redis节点,将整个数据库分布存储在这N个节点中,每个节点存储总数据的1/N。

Redis 集群通过分区(partition)来提供一定程度的可用性(availability): 即使集群中有一部分节点失效或者无法进行通讯, 集群也可以继续处理命令请求。

2.4.2、搭建一个集群

这里搭建6个集群,共用一个服务器,但是是不同的端口:6379、6380、6381、6389、6390、6391。

增加相关配置文件(以redis.conf为模板)

在配置文件中分别添加如下配置:

cluster-enabled yes  #打开集群模式
cluster-config-file "nodes-6379.conf" #设定节点配置文件名(启动后会自动生成节点配置文件)
cluster-node-timeout 15000 #设定节点失联时间,超过该时间(毫秒),集群自动进行主从切换

其余的6380、6381、6389、6390、6391也照此配置,只是把6379分别替换成6380、6381、6389、6390、6391。

依次启动6个redis服务器实例

redis-server rediscluster_conf/redis6379.conf
redis-server rediscluster_conf/redis6380.conf
redis-server rediscluster_conf/redis6381.conf
redis-server rediscluster_conf/redis6389.conf
redis-server rediscluster_conf/redis6390.conf
redis-server rediscluster_conf/redis6391.conf

进入src目录

将6个节点合成一个集群

redis-cli --cluster create --cluster-replicas 1 192.168.43.2:6379 192.168.43.2:6380 192.168.43.2:6381 192.168.43.2:6389 192.168.43.2:6390 192.168.43.2:6391

注意:在合成前一定要确保6个启动的redis服务器实例都有生成node.conf配置文件。

集群创建成功

采用集群方式登录redis

redis-cli -c -p 6379

查看集群信息

cluster nodes

redis中集群的分配为尽量保证每个主服务器处于不同的ip上,而每个主服务器所对应的从服务器则和主服务器不在同一个ip上,这样是为了保证如果某台主服务器挂了,那么其从服务器不受其影响,可以接管主服务器的工作。

redis集群的一些操作

1、在redis集群中录入一个值(每次录入都会为不同的key分配一个不同的插槽)

set k1 v1
get k1

2、在redis集群中以组的形式录入(如果是同一组的key-value则共享同一个插槽)

mset sex{student} male class{student} 2016222
get sex{student}

3、统计一个插槽里存了多少key

cluster countkeysinslot 5249

4、获取某个插槽中的指定数量的key

cluster getkeysinslot 5249 10

5、查询某个key对应的插槽值

cluster keyslot student

6.4.3、故障恢复

当主节点挂掉后,其从节点会自动升级为主节点。

redis.conf中的相关配置

cluster-require-full-coverage yes #如果某一段插槽的主从都挂掉,那么整个集群都挂掉。

cluster-require-full-coverage no  #如果某一段插槽的主从都挂掉,那么该插槽数据全都不能使用,也无法存储。

6.4.4、使用jedis操作集群

<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.2.0</version>
</dependency>
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster; import java.util.HashSet;
import java.util.Set; /**
* @author: wu linchun
* @time: 2021/6/24 14:45
* @description: jedis操作集群
*/ public class Jedis_RedisClusterTest {
public static void main(String[] args) {
Set<HostAndPort> set = new HashSet<HostAndPort>();
set.add(new HostAndPort("192.168.43.2", 6379));
JedisCluster jedisCluster = new JedisCluster(set);
jedisCluster.set("k1", "v1");
jedisCluster.mset("name{teacher}", "wangwu", "age{teacher}", "20");
System.out.println(jedisCluster.get("k1"));
System.out.println(jedisCluster.get("name{teacher}"));
System.out.println(jedisCluster.get("age{teacher}"));
}
}

6.5、Redis应用问题

6.5.1、缓存穿透

缓存穿透就是指在短时间大量的请求服务器,但所请求的key在redis中不存在查不到,同时redis的命中率降低导致一些在redis中存在的也都查不到了。既然在redis中查不到就会去查数据库了。短时间大量访问数据库使数据库压力过大有可能造成数据库奔溃。

解决方法

  1. 对空值缓存:如果一个查询返回的数据为空(不管数据是真的没有还是说只是没有命中到),我们都把这个空结果null进行缓存。以便下次查询时可以在redis中查到而不用去直接访问数据库了。
  2. 设置白名单:使用bitmaps类型定义一个可以访问的名单,名单id作为bitmaps的偏移量,每次访问和bitmap里面的id进行比较,如果访问id不在bitmaps里面,进行拦截,不允许访问。
  3. 采用布隆过滤器:(布隆过滤器(Bloom Filter)是1970年由布隆提出的。它实际上是一个很长的二进制向量(位图)和一系列随机映射函数(哈希函数)。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。)将所有可能存在的数据哈希到一个足够大的bitmaps中,一个一定不存在的数据会被 这个bitmaps拦截掉,从而避免了对底层存储系统的查询压力。
  4. 进行实时监控:当发现Redis的命中率开始急速降低,需要排查访问对象和访问的数据,和运维人员配合,可以设置黑名单限制服务。

redis命中率查看方式:打开redis-cli客户端,执行info命令。



缓存命中率 = keyspace_hits / (keyspace_hits + keyspace_misses)

6.5.2、缓存击穿

缓存击穿是指key-value存在,但是已经过期了。此时若有大量并发请求过来,发现redis中key已经过期了。那么就会绕过redis直接去查数据库了并且把查到的数据再加载回缓存。这时大量的并发请求可能瞬间把数据库压垮。

解决方法

  1. 预先设置热门数据:某些key在某一时间点可能会被超高并发的访问,这些key是“热点数据”。因此在redis高峰访问之前,把一些热门数据提前存入到redis里面,加大这些热门数据key的时长。
  2. 实时调整:现场监控哪些数据热门,实时调整key的过期时长。
  3. 使用锁:
  • 就是在缓存失效的时候(判断拿出来的值为空),不是立即去load db。
  • 先使用缓存工具的某些带成功操作返回值的操作(比如Redis的SETNX)去set一个mutex key。
  • 当操作返回成功时,再进行load db的操作,并回设缓存,最后删除mutex key。
  • 当操作返回失败,证明有线程在load db,当前线程睡眠一段时间再重试整个get缓存的方法。

6.5.3、缓存雪崩

缓存雪崩是指短时间内有大量的key过期,不同于缓存击穿的只是某些“热点key”过期。若此时有大量并发请求过来发现redis中大量key过期,那么就会直接从数据库里面拿数据,造成数据库负荷过大崩溃。

注意:缓存失效时的雪崩效应对底层系统的冲击非常可怕!!!

解决方法

  1. 构建多级缓存架构:nginx缓存 + redis缓存 +其他缓存(ehcache等)。
  2. 使用锁或队列:用加锁或者队列的方式保证来保证不会有大量的线程对数据库一次性进行读写,从而避免失效时大量的并发请求落到底层存储系统上。不适用高并发情况。
  3. 设置过期标志更新缓存:记录缓存数据是否过期(设置提前量),如果过期会触发通知另外的线程在后台去更新实际key的缓存。
  4. 将缓存失效时间分散开:比如我们可以在原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。

6.6、分布式锁

6.6.1、概述

分布式锁是为了应对多线程操作所造成的数据不一致性。通过对key进行加锁,确保对key的操作是原子性的。

分布式锁主流实现方案:

  1. 基于数据库实现分布式锁。
  2. 基于缓存(Redis等)。
  3. 基于Zookeeper。

基于redis的性能最高,基于Zookeeper的可靠性最强。

6.6.2、redis实现分布式锁

setnx key:lock test1  #创建key并对其进行分布式加锁

expire key:lock 20  #设置锁的过期时间为20秒

ttl key:lock  #查看锁的剩余时间

del key:lock  #删除key

set key:lock testSSS nx ex 30  #上锁的同时设置过期时间

setnx key:lock testABC会对key进行加锁,此时再对其使用setnx将该key设置为其他值是不被允许的。

expire key:lock 20会设置该key上分布式锁的过期时间,在没有过期时是无法使用setnx修改该key的值的。

7、redis6新特性

7.1、在redis中添加新用户并设置操作权限

acl list  #显示所有用户及其相关权限

acl cat  #查看权限指令类别

acl cat string  #查看string都有哪些操作

acl whoami  #查看当前用户

acl setuser Atlantide on >123456 ~* +@all  #创建新用户,权限为所有key的所有操作

auth Atlantide 123456  #切换用户

8、总结

这是我观看b站尚硅谷的redis的课程做了一些整理和笔记。目前在工作中对redis的应用十分有限,主要就是Springboot整合redis这块,常用的就是使用redisTemplate在redis中设置key-value缓存对几种基本类型的操作,设置过期时间这些操作。对于redis更深层次的应用,如集群,主从复制,分布式锁暂时还都没有涉及。

9、参考资料

最新文章

  1. C#读写config配置文件
  2. 股票交易(洛谷U6084)
  3. sql server 2008 登录 4064 错误解决办法
  4. lighttpd mod_status模块
  5. 把消息送到默认窗口函数里,并非一点用都没有,可能会产生新的消息(以WM_WINDOWPOSCHANGED为例)
  6. Rotate List —— LeetCode
  7. acm - cry for no one
  8. asp.net mvc4中自定义404
  9. Jpa中设置OneToMany插入报异常解决办法
  10. Python中的 sort 和 sorted
  11. Java开发笔记(六十六)映射:HashMap和TreeMap
  12. Linux根目录下各个目录的用途及含义
  13. V先生:信息流广告标题党必备的500个热词
  14. B - Frogger 最短路变形('最长路'求'最短路','最短路'求'最长路')
  15. 关于JAVA 中的Configuration类
  16. react-router 4.3 js实现跳转
  17. 内存不够怎么办? 1.5.1 关于隔离 1.5.2 分段(Segmention) 1.5.3 分页(Paging)
  18. C++中的结构体的认识
  19. linux系统编程之信号(八):三种时间结构及定时器setitimer()详解
  20. 【ERROR】Oracle11g两个监听同名进程的故障

热门文章

  1. POJ1741 tree (点分治模板)
  2. Jedis测试redis。(redis在linux虚拟机中)
  3. go-zero docker-compose搭建课件服务(四):生成Dockerfile
  4. DevOps | 如何快速提升团队软件开发成熟度,快速提升研发效能?
  5. C#结构体大小问题
  6. .NET实现堆排序
  7. golang 简书
  8. 一行代码实现shell if else逻辑
  9. 本地JS文件批量压缩
  10. MyEclipse 中自动安插作者、注释日期等快捷键方法