Redis分布式锁实现高并发控制实践
|Word count:2.9k|Reading time:10min|Post View:
Redis分布式锁实现高并发控制实践
分布式锁
分布式锁是控制分布式系统之间同步访问共享资源的一种方式。在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证一致性,在这种情况下,便需要使用到分布式锁。
分布式锁基本功能
1.同一时刻只能存在一个锁
2.需要解决意外死锁问题,也就是锁能超时自动释放
3.支持主动释放锁
分布式锁解决什么问题
多进程并发执行任务时,需要保证任务的有序性或者唯一性
分布式锁适用场景
场景一:从前端界面发起一笔支付请求,如果前端没有做防重处理,那么可能在某一个时刻会有二笔一样的单子同时到达系统后台。
场景二:在App中下订单的时候,点击确认之后,没反应,就又点击了几次。在这种情况下,如果无法保证该接口的幂等性,那么将会出现重复下单问题。
在接收消息的时候,消息推送重复。如果处理消息的接口无法保证幂等,那么重复消费消息产生的影响可能会非常大
场景三:秒杀场景,数据库里有一张表,column分别是商品ID,和商品ID对应的库存量,秒杀成功就将此商品库存量-1。现在假设有1000个线程来秒杀商品,我们来根据这个简单的业务场景来解释一下分布式锁。
代码解释:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
|
@Component @Slf4j public class RedisLock {
@Autowired private StringRedisTemplate stringRedisTemplate;
public boolean lock(String key, String value) { if (stringRedisTemplate.opsForValue().setIfAbsent(key, value)) { return true; } String curValue = stringRedisTemplate.opsForValue().get(key); if (!StringUtils.isEmpty(curValue) && Long.parseLong(curValue) < System.currentTimeMillis()) { String oldValue = stringRedisTemplate.opsForValue().getAndSet(key, value); if (!StringUtils.isEmpty(oldValue) && oldValue.equals(curValue)) { return true; } } return false; }
public void unlock(String key, String value) { try { String curValue = stringRedisTemplate.opsForValue().get(key); if (!StringUtils.isEmpty(curValue) && curValue.equals(value)) { stringRedisTemplate.opsForValue().getOperations().delete(key); } } catch (Exception e) { log.error("[Redis分布式锁] 解锁出现异常了,{}", e); } } }
|
Redis的SETNX命令:
将key
设置值为value
,如果key
不存在,这种情况下等同SET命令。 当key
存在时,什么也不做。SETNX
是”SET if Not eXists”的简写。
返回值
Integer reply, 特定值:
##例子
1 2 3 4 5 6 7
| redis> SETNX mykey "Hello" (integer) 1 redis> SETNX mykey "World" (integer) 0 redis> GET mykey "Hello" redis>
|
Design pattern: Locking with !SETNX
设计模式:使用!SETNX
加锁
Please note that:
请注意:
- 不鼓励以下模式来实现the Redlock algorithm ,该算法实现起来有一些复杂,但是提供了更好的保证并且具有容错性。
- 无论如何,我们保留旧的模式,因为肯定存在一些已实现的方法链接到该页面作为引用。而且,这是一个有趣的例子说明Redis命令能够被用来作为编程原语的。
- 无论如何,即使假设一个单例的加锁原语,但是从 2.6.12 开始,可以创建一个更加简单的加锁原语,相当于使用
SET
命令来获取锁,并且用一个简单的 Lua 脚本来释放锁。该模式被记录在SET
命令的页面中。
也就是说,SETNX
能够被使用并且以前也在被使用去作为一个加锁原语。例如,获取键为foo
的锁,客户端可以尝试一下操作:
1
| SETNX lock.foo <current Unix time + lock timeout + 1>
|
如果客户端获得锁,SETNX
返回1
,那么将lock.foo
键的Unix时间设置为不在被认为有效的时间。客户端随后会使用DEL lock.foo
去释放该锁。
如果SETNX
返回0
,那么该键已经被其他的客户端锁定。如果这是一个非阻塞的锁,才能立刻返回给调用者,或者尝试重新获取该锁,直到成功或者过期超时。
处理死锁
以上加锁算法存在一个问题:如果客户端出现故障,崩溃或者其他情况无法释放该锁会发生什么情况?这是能够检测到这种情况,因为该锁包含一个Unix时间戳,如果这样一个时间戳等于当前的Unix时间,该锁将不再有效。
当以下这种情况发生时,我们不能调用DEL
来删除该锁,并且尝试执行一个SETNX
,因为这里存在一个竞态条件,当多个客户端察觉到一个过期的锁并且都尝试去释放它。
- C1 和 C2 读
lock.foo
检查时间戳,因为他们执行完SETNX
后都被返回了0
,因为锁仍然被 C3 所持有,并且 C3 已经崩溃。
- C1 发送
DEL lock.foo
- C1 发送
SETNX lock.foo
命令并且成功返回
- C2 发送
DEL lock.foo
- C2 发送
SETNX lock.foo
命令并且成功返回
- 错误:由于竞态条件导致 C1 和 C2 都获取到了锁
幸运的是,可以使用以下的算法来避免这种情况,请看 C4 客户端所使用的好的算法:
C4 发送SETNX lock.foo
为了获得该锁
已经崩溃的客户端 C3 仍然持有该锁,所以Redis将会返回0
给 C4
C4 发送GET lock.foo
检查该锁是否已经过期。如果没有过期,C4 客户端将会睡眠一会,并且从一开始进行重试操作
另一种情况,如果因为 lock.foo
键的Unix时间小于当前的Unix时间而导致该锁已经过期,C4 会尝试执行以下的操作:
1
| GETSET lock.foo <current Unix timestamp + lock timeout + 1>
|
由于GETSET
的语意,C4会检查已经过期的旧值是否仍然存储在lock.foo
中。如果是的话,C4 会获得锁
如果另一个客户端,假如为 C5 ,比 C4 更快的通过GETSET
操作获取到锁,那么 C4 执行GETSET
操作会被返回一个不过期的时间戳。C4 将会从第一个步骤重新开始。请注意:即使 C4 在将来几秒设置该键,这也不是问题。
为了使这种加锁算法更加的健壮,持有锁的客户端应该总是要检查是否超时,保证使用DEL
释放锁之前不会过期,因为客户端故障的情况可能是复杂的,不止是崩溃,还会阻塞一段时间,阻止一些操作的执行,并且在阻塞恢复后尝试执行DEL
(此时,该LOCK已经被其他客户端所持有)
Redis的GETSET命令
自动将key对应到value并且返回原来key对应的value。如果key存在但是对应的value不是字符串,就返回错误。
设计模式
GETSET可以和INCR一起使用实现支持重置的计数功能。举个例子:每当有事件发生的时候,一段程序都会调用INCR给key mycounter加1,但是有时我们需要获取计数器的值,并且自动将其重置为0。这可以通过GETSET mycounter “0”来实现:
1 2 3
| INCR mycounter GETSET mycounter "0" GET mycounter
|
返回值
bulk-string-reply: 返回之前的旧值,如果之前Key
不存在将返回nil
。
例子
1 2 3 4 5 6 7
| redis> INCR mycounter (integer) 1 redis> GETSET mycounter "0" "1" redis> GET mycounter "0" redis>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
|
@Service public class SeckillServiceImpl implements SeckillService{
@Autowired private RedisLock redisLock;
private static final int TIMEOUT = 10*1000;
static Map<String,Integer> products; static Map<String,Integer> stock; static Map<String,String> orders; static {
products = new HashMap<>(); stock = new HashMap<>(); orders = new HashMap<>(); products.put("123456",100000); stock.put("123456",100000); }
private String queryMap(String productId){ return "国庆活动,红烧肉特惠,限量" +products.get(productId) +"份,还剩:"+stock.get(productId) +"份,该商品成功下单用户数:" +orders.size()+"人"; } @Override public String querySecKillProductInfo(String productId) { return this.queryMap(productId); }
@Override public void orderProductMockDiffUser(String productId) {
long time = System.currentTimeMillis() + TIMEOUT; if(!redisLock.lock(productId,String.valueOf(time))){ throw new Exception("很抱歉,人太多了!稍后再试!"); }
int stockNum = stock.get(productId); if(stockNum==0){ throw new Exception("活动结束"); }else { orders.put(KeyUtil.genUniqueKey(),productId); stockNum -=1; try{ Thread.sleep(100); }catch (InterruptedException e){ e.printStackTrace(); } stock.put(productId,stockNum); }
redisLock.unlock(productId,String.valueOf(time));
}
}
|
使用Apache ab压力测试工具,500个请求,100个并发
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| H:\软件\Apache24\bin>ab -n 500 -c 100 http://localhost:8080/sell/skill/order/123456 This is ApacheBench, Version 2.3 <$Revision: 1826891 $> Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/ Licensed to The Apache Software Foundation, http://www.apache.org/
Benchmarking localhost (be patient) Completed 100 requests Completed 200 requests Completed 300 requests Completed 400 requests Completed 500 requests Finished 500 requests
Server Software: Server Hostname: localhost Server Port: 8080
Document Path: /sell/skill/order/123456 Document Length: 101 bytes
Concurrency Level: 100 Time taken for tests: 9.222 seconds Complete requests: 500 Failed requests: 494 (Connect: 0, Receive: 0, Length: 494, Exceptions: 0) Total transferred: 95270 bytes HTML transferred: 35680 bytes Requests per second: 54.22 [#/sec] (mean) Time per request: 1844.420 [ms] (mean) Time per request: 18.444 [ms] (mean, across all concurrent requests) Transfer rate: 10.09 [Kbytes/sec] received
Connection Times (ms) min mean[+/-sd] median max Connect: 2 3 0.5 3 8 Processing: 130 1557 1007.6 1012 7236 Waiting: 129 1556 1008.5 1012 7236 Total: 132 1560 1007.6 1015 7238
Percentage of the requests served within a certain time (ms) 50% 1015 66% 1715 75% 1829 80% 2489 90% 2769 95% 3337 98% 4431 99% 5124 100% 7238 (longest request)
|
测试结果
可以看到库存剩余数量和成功下单数一致,说明实现了分布式锁的并发控制