C# Redis分布式锁(RedLock) - 多节点
Redis单节点的分布式锁只需要注意三点就可以了:
1.加锁并设置锁的过期时间必须是原子操作;
2.锁的value值必须要有唯一性;
3.释放锁的时候要验证其value值,不是自己加的锁不能释放.
但是单节点分布式锁最大的缺点就是,它只作用在一个Redis节点上,如果该节点挂了,那就挂了.
那可不可以通过哨兵机制来保证高可用呢?
答案是不行.
因为Redis在进行主从复制的时候是异步的.
假设 clientA 拿到锁后,在 master 还没同步到 slave 时,master 发生了故障,这时候 salve 升级为 master,导致锁丢失.
RedLock 的思想是:假设有5个Redis节点.这些节点完全相互独立,不存在主从或者集群机制,都是 master.并且这5个Redis实例运行在5台机器上,这样保证他们不会同时宕掉.
客户端应该按照以下操作来获取锁:
1.获取当前时间戳,假设是T1.
2.依次尝试从这5个Redis实例获取锁.当客户端向Redis请求获取锁时,客户端应该设置超时时间,并且这个超时时间应该小于锁的失效时间.比如你的锁自动失效时间为10秒,则超时时间应该在5-50毫秒之间.这样可以避免Redis已经挂掉的情况下,客户端还在等待响应结果.如果Redis没有在规定时间内响应,客户端应该尽快尝试去另外一个Redis实例请求获取锁.
3.请求完所有的Redis节点后,只有满足如下两点,才算真正的获取到锁:
1)当前时间 - T1 的时间差小于锁的过期时间.比如T1=00:00:00,然后从5个Redis节点都拿到了锁,当前时间是 00:00:05,也就是说获取锁一共用了5秒钟.假设锁的过期时间是3秒,那么这次获取锁的操作就算失败了.
2)从(N/2+1)个Redis节点都获取到锁.这个很好理解,5个节点,你拿2个,我拿2个,到底算谁的?
总结一句话就是:从开始获取锁计时,只要在锁的过期时间内成功获取到一半以上的锁便算成功,否则算失败.
4.当客户端获取到了锁,锁的真正有效时间 = 锁的过期时间 - 获取锁所使用的时间(也就是第3步计算出来的时间).
5.如果客户端由于某些原因(比如获取锁的实例个数小于N/2+1,或者已经超过了有效时间),没有获取到锁,客户端便会在所有的Redis实例上进行解锁(即使某些Redis实例根本就没有加锁成功),因为可能已经获取了小于 N/2+1个锁,必须释放掉,否则会影响其他客户端获取锁.
关于是否启动AOF永久存储,需要有所取舍.
1.永久启动,由于Redis的过期机制是按照unix时间戳走的,所以当我们重启Redis后,依然会按照规定的时间过期.但是永久启动对性能有一定影响;
2.采用默认的1秒1次.如果在1秒内断电,会导致数据丢失,这时候如果立刻重启会导致锁的互斥性实效.
所以有效的解决方案是,采用AOF,1秒1次,不管什么原因宕机后,等待一定时间再重启.这个时间就是锁的过期时间.
Demo:
安装官方提供的 RedLock.net
Startup:
public class Startup
{
private RedLockFactory _redLockFactory; public void ConfigureServices(IServiceCollection services)
{
services.AddControllers(); var endPoints = new List<RedLockEndPoint>
{
new DnsEndPoint("127.0.0.1", 6379),
new DnsEndPoint("127.0.0.1", 6380),
new DnsEndPoint("127.0.0.1", 6381)
};
_redLockFactory = RedLockFactory.Create(endPoints);
services.AddSingleton(typeof(IDistributedLockFactory), _redLockFactory);
} public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IHostApplicationLifetime applicationLifetime)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
} //应用程序结束时释放,因为不是容器创建的对象
applicationLifetime.ApplicationStopping.Register(() =>
{
_redLockFactory.Dispose();
}); app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapControllers();
});
}
}
测试api:
[ApiController]
public class ValuesController : ControllerBase
{
private static int _stock = 10; private readonly IDistributedLockFactory _distributedLockFactory; public ValuesController(IDistributedLockFactory distributedLockFactory)
{
_distributedLockFactory = distributedLockFactory;
} [Route("lockTest")]
[HttpGet]
public async Task<int> DistributedLockTest()
{
// resource 锁定的资源
var resource = "the-thing-we-are-locking-on"; // expiryTime 锁的过期时间
var expiry = TimeSpan.FromSeconds(5); // waitTime 等待时间
var wait = TimeSpan.FromSeconds(1); // retryTime 等待时间内,多久重试一次
var retry = TimeSpan.FromMilliseconds(250); using (var redLock = await _distributedLockFactory.CreateLockAsync(resource, expiry, wait, retry))
{
if (redLock.IsAcquired)
{
// 模拟执行业务逻辑
await Task.Delay(new Random().Next(100, 500));
if (stock > 0)
{
stock--;
return stock;
}
return stock;
}
Console.WriteLine($"{DateTime.Now} : 获取锁失败");
}
return -99;
}
}
测试控制台:
static void Main(string[] args)
{
HttpClient client = new HttpClient();
var result = Parallel.For(0, 20, (i) =>
{
var stopwatch = new Stopwatch();
stopwatch.Start();
var response = client.GetAsync($"http://localhost:5000/locktest").Result;
stopwatch.Stop();
var data = response.Content.ReadAsStringAsync().Result;
Console.WriteLine($"ThreadId:{Thread.CurrentThread.ManagedThreadId}, Result:{data}, Time:{stopwatch.ElapsedMilliseconds}");
});
client.Dispose();
Console.ReadKey();
}
测试结果:
最新文章
- 深入理解SQL注入绕过WAF和过滤机制
- Sql Server中常见的数据类型
- iframe框架嵌套技巧(全屏,去双滚动条)
- python内置函数 1
- Ruby调用Excel相关的函数
- 初试Celery
- Python 的property的实现 .
- poi操作oracle数据库导出excel文件2
- bash 正则表达式匹配,一行文本中 “包含 ABC” 并且 “不包含 XYZ”
- Qt中截图功能的实现
- linq 分组求和
- 浅谈lvs和nginx的一些优点和缺点
- redis 系列18 事件
- [转]在ASP.NET Core中使用百度在线编辑器UEditor
- VS2015环境下的提示语法错误:编号的预期结尾后有多余文本(extra text after expected end of number)
- 【CTF杂项】常见文件文件头文件尾格式总结及各类文件头
- Python3基础-代码阅读系列—优惠码生成
- mysql再探
- Apache 配置Https 转发Tomcat Http
- alter system set events相关知识