0%

Redis 缓存的使用和分布式锁实现

1. 本地缓存

关于缓存可以有多种实现,例如下图,在单服务节点项目中,可以实现本地缓存。简单的本地缓存实现比如可以在类中创建一个静态的 Map 集合,然后存储缓存对象即可。等到其他请求再次访问时,可以直接将 Map 中的对象直接进行返回。

单服务本地缓存

但是本地缓存有个严重的弊端,其仅支持少量缓存和单节点应用。例如下图的分布式集群项目中,产品中心 gulimall-product 有多个服务。当请求经过第一个节点,然后更新了对应的数据和缓存,此时第二个节点的缓存则无法对应同步更新,那么就会造成多个节点本地缓存数据一致性问题。另外,请求第一次经过第一个节点时,会发现此时没有本地缓存从而将数据库中的数据加入到本地缓存中。但是在分布式集群项目中,其他请求可能会经过其他服务节点,由于其他服务节点也可能本地缓存为空而需要再次查询数据库,导致缓存没有有效利用。

分布式集群

2. 分布式缓存

由于本地缓存在分布式集群项目中存在多种问题,如下图我们将缓存作为一个公共的资源,并使用缓存中间件来实现:

Redis 缓存

3. 高并发下缓存失效问题

3.1 缓存穿透

缓存穿透: 指查询一个一定不存在的数据,由于缓存是不命中,将去查询数据库,但是数据库也无此记录,我们没有将这次查询的null写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。

风险: 利用不存在的数据进行攻击,数据库瞬时压力增大,最终导致崩溃。

解决: null结果缓存,并加入短暂过期时间。

缓存穿透

观察如下示例,第 13 行时会判断从数据库中查询的对象存不存在,如果存在那么就存入到缓存中,但是这样会导致缓存穿透的问题,导致多个请求去查询数据库。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 测试缓存穿透
* @param catId
* @return
*/
@Override
public CategoryEntity testCachePenetrate(Long catId) {
// 1. 查询缓存
String catalogString = redisTemplate.opsForValue().get("catalog");
if (StringUtils.isEmpty(catalogString)) {
// 2. 缓存对象不存在则查询数据库
CategoryEntity entityFromDb = getCategoryEntityFromDb(catId);
if (entityFromDb != null) {
// 3. 查询对象存在时存入缓存
String jsonString = JSON.toJSONString(entityFromDb);
redisTemplate.opsForValue().set("catalog", jsonString);
}
return entityFromDb;
}
// 4. 缓存对象存在那么反序列后返回
CategoryEntity categoryEntity = JSON.parseObject(catalogString, CategoryEntity.class);
return categoryEntity;
}

更改如上代码,具体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 测试缓存穿透
* @param catId
* @return
*/
@Override
public CategoryEntity testCachePenetrate(Long catId) {
// 11 ~ 50 的随机数
int r = (int) (Math.random() * 40) + 11;
// 1. 查询缓存
String catalogString = redisTemplate.opsForValue().get("catalog");
if (StringUtils.isEmpty(catalogString)) {
// 2. 缓存对象不存在则查询数据库,并序列化后存入缓存中
CategoryEntity entityFromDb = getCategoryEntityFromDb(catId);
if (entityFromDb != null) {
String jsonString = JSON.toJSONString(entityFromDb);
redisTemplate.opsForValue().set("catalog", jsonString);
return entityFromDb;
}
// 3. 为避免缓存穿透,设置 null 值和随机过期时间
redisTemplate.opsForValue().set("catalog", "null", r, TimeUnit.SECONDS);
log.info("catalog 会在{}秒后过期", r);
return null;
}
// 4. 缓存对象存在那么反序列后返回
CategoryEntity categoryEntity = JSON.parseObject(catalogString, CategoryEntity.class);
return categoryEntity;
}

3.2 缓存雪崩

缓存雪崩: 缓存雪崩是指在我们设置缓存时key采用了相同的过期时间, 导致缓存在某一时刻同时失效,请求全部转发到DB,DB瞬时压力过重雪崩。

解决: 原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。

3.3 缓存击穿

缓存穿透:

  • 对于一些设置了过期时间的key,如果这些key可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。
  • 如果这个key在大量请求同时进来前正好失效,那么所有对这个key的数据查询都落到db,我们称为缓存击穿。

解决: 加锁 ,大量并发只让一个去查,其他人等待,查到以后释放锁,其他人获取到锁,先查缓存,就会有数据,不用去db查询。

4. 分布式锁基本原理和实现

为了解决缓存击穿问题,我们可以对缓存加上分布式锁。如下图,在分布式环境下,如果仅仅是加上本地锁,比如 synchronized 是无法起作用的,下面可以做一下演示示例。

分布式锁

4.1 synchronized 本地锁

查看如下代码,这里我们使用 synchronized 本地锁进行加锁。在单服务项目时,下面这种写法面对并发请求不会导致并发问题。但是在多服务项目环境下,第 9 行的 getCatalogJsonFromDb 方法可能会调用多次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public Map<String, List<Catelog2Vo>> getCatalogJsonWithSynchronized() {
String catalogJsonString = null;
synchronized (this) {
// 1. 查询缓存
catalogJsonString = redisTemplate.opsForValue().get("catalogJson");
if (StringUtils.isEmpty(catalogJsonString)) {
// 2. 缓存对象不存在则查询数据库,并序列化后存入缓存中
Map<String, List<Catelog2Vo>> catalogJson = getCatalogJsonFromDb();
String jsonString = JSON.toJSONString(catalogJson);
redisTemplate.opsForValue().set("catalogJson", jsonString);
return catalogJson;
}
}
// 3. 缓存对象存在那么反序列后返回
TypeReference<Map<String, List<Catelog2Vo>>> typeReference = new TypeReference<Map<String, List<Catelog2Vo>>>(){};
Map<String, List<Catelog2Vo>> catalogJson = JSON.parseObject(catalogJsonString, typeReference);
return catalogJson;
}

4.2 分布式锁

如下图,可以将多个服务的本地锁抽取为一个公共锁。我们可以同时去一个地方“占坑”,如果占到,就执行逻辑。否则就必须等待,直到释放锁。 “占坑”可以去redis,可以去数据库,可以去任何大家都能访问的地方,而等待可以使用自旋的方式进行。

4.2.1 阶段一

在 redis 中存在命令 setnx,如下图,当执行 setnx(“lock”, 1111) 时,如果 redis 中没有 ‘lock’ 这个 key,也就是当前还没有缓存还没有加锁,那么就返回 ture 并将 ‘lock’ 到缓存中充当锁。那么当下一个请求过来时会发现缓存中已经存在锁,则会进行等待直到第一个请求的锁释放。

setnx加锁

代码示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Override
public Map<String, List<Catelog2Vo>> getCatalogJson() {
// 1. 查询缓存
String catalogJsonString = redisTemplate.opsForValue().get("catalogJson");
if (StringUtils.isEmpty(catalogJsonString)) {
// 2. 缓存对象不存在则查询数据库,并序列化后存入缓存中
return getCatalogJsonWithRedisLock();
}
// 3. 缓存对象存在那么反序列后返回
Map<String, List<Catelog2Vo>> catalogJson = JSON.parseObject(catalogJsonString, new TypeReference<Map<String, List<Catelog2Vo>>>() {});
return catalogJson;
}

/**
* 采用 redis lock 实现分布式锁(双重检查锁)
* @return
*/
private Map<String, List<Catelog2Vo>> getCatalogJsonWithRedisLock() {
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "1111");
Map<String, List<Catelog2Vo>> dataFromDb;
if (lock) {
log.info("获取分布式锁成功...");
String catalogJSON = redisTemplate.opsForValue().get("catalogJson");
if (StringUtils.isNotEmpty(catalogJSON)) {
dataFromDb = JSON.parseObject(catalogJSON, new TypeReference<Map<String, List<Catelog2Vo>>>() {});
} else {
dataFromDb = getCatalogJsonFromDb();
String s = JSON.toJSONString(dataFromDb);
redisTemplate.opsForValue().set("catalogJson", s, 1, TimeUnit.DAYS);
}
// 删除锁
redisTemplate.delete("lock");
return dataFromDb;
} else {
log.info("获取分布式锁失败...等待重试");
try {
Thread.sleep(200);
} catch (Exception e) {
e.printStackTrace();
}
return getCatalogJsonWithRedisLock();
}
}

观察如上代码,第 32 行代码会将锁进行释放。但是在程序运行过程中,可能由于断电或者程序退出等原因导致线程在获取到锁之后可能没有释放锁,从而造成死锁。初步解决办法就是为锁设置过期时间,因此即便锁没有及时释放,到期后也会自动进行释放。

4.2.2 阶段二

如下代码第 7 行,我们为锁设置过期时间,比如 30s 后锁会自动失效。但是现在依然存在一个问题,即在执行 expire 方法之前程序崩溃断电了导致设置锁过期时间没有成功的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private Map<String, List<Catelog2Vo>> getCatalogJsonWithRedisLock() {
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "1111");
Map<String, List<Catelog2Vo>> dataFromDb;
if (lock) {
log.info("获取分布式锁成功...");
// 为锁设置过期时间,30s 后过期
redisTemplate.expire("lock", 30, TimeUnit.SECONDS);
String catalogJSON = redisTemplate.opsForValue().get("catalogJson");
if (StringUtils.isNotEmpty(catalogJSON)) {
dataFromDb = JSON.parseObject(catalogJSON, new TypeReference<Map<String, List<Catelog2Vo>>>() {});
} else {
dataFromDb = getCatalogJsonFromDb();
String s = JSON.toJSONString(dataFromDb);
redisTemplate.opsForValue().set("catalogJson", s, 1, TimeUnit.DAYS);
}
// 删除锁
redisTemplate.delete("lock");
return dataFromDb;
} else {
log.info("获取分布式锁失败...等待重试");
try {
Thread.sleep(200);
} catch (Exception e) {
e.printStackTrace();
}
return getCatalogJsonWithRedisLock();
}

}

设置锁过期时间

针对这一问题,我们需要将获取锁和设置锁的过期时间作为一个原子操作来执行。redis 也提供了相应的方法 setnxex。修改后的代码如下所示,主要实现为第 3 行代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private Map<String, List<Catelog2Vo>> getCatalogJsonWithRedisLock() {
// 加锁的同时并设置过期时间
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "1111", 30, TimeUnit.SECONDS);
Map<String, List<Catelog2Vo>> dataFromDb;
if (lock) {
log.info("获取分布式锁成功...");
String catalogJSON = redisTemplate.opsForValue().get("catalogJson");
if (StringUtils.isNotEmpty(catalogJSON)) {
dataFromDb = JSON.parseObject(catalogJSON, new TypeReference<Map<String, List<Catelog2Vo>>>() {});
} else {
dataFromDb = getCatalogJsonFromDb();
String s = JSON.toJSONString(dataFromDb);
redisTemplate.opsForValue().set("catalogJson", s, 1, TimeUnit.DAYS);
}
// 删除锁
redisTemplate.delete("lock");
return dataFromDb;
} else {
log.info("获取分布式锁失败...等待重试");
try {
Thread.sleep(200);
} catch (Exception e) {
e.printStackTrace();
}
return getCatalogJsonWithRedisLock();
}

}

4.2.3 阶段三

但是现在依然存在问题,观察下图,第一个请求在获取到锁的时候,该锁会在 10s 后过期。但是该请求执行业务时间较长,比如可能花了 30s。那么第一个请求再还没释放锁之前其获取到的锁就已经失效了。这是第二个请求会获取到锁并执行业务。等第一个请求开始删除锁时,这时候会删除掉第二个请求获取的锁,从而出现把别人的锁删除的情况。

锁过期时长问题

为了避免这种情况,在占锁的时候,值指定为uuid,每个人匹配是自己的锁才删除。

4.2.4 阶段四

观察如下代码,我们使用 UUID 来避免用户删除掉其他人的锁的情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private Map<String, List<Catelog2Vo>> getCatalogJsonWithRedisLock() {
String uuid = UUID.randomUUID().toString();
// 加锁的同时并设置过期时间
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 30, TimeUnit.SECONDS);
Map<String, List<Catelog2Vo>> dataFromDb;
if (lock) {
log.info("获取分布式锁成功...");
String catalogJSON = redisTemplate.opsForValue().get("catalogJson");
if (StringUtils.isNotEmpty(catalogJSON)) {
dataFromDb = JSON.parseObject(catalogJSON, new TypeReference<Map<String, List<Catelog2Vo>>>() {});
} else {
dataFromDb = getCatalogJsonFromDb();
String s = JSON.toJSONString(dataFromDb);
redisTemplate.opsForValue().set("catalogJson", s, 1, TimeUnit.DAYS);
}
String lockValue = redisTemplate.opsForValue().get("lock");
if (uuid.equals(lockValue)) {
//删除自己的锁
redisTemplate.delete("lock");
}
return dataFromDb;
} else {
log.info("获取分布式锁失败...等待重试");
try {
Thread.sleep(200);
} catch (Exception e) {
e.printStackTrace();
}
return getCatalogJsonWithRedisLock();
}

}

但是现在依然存在一个问题,观察 16 - 20 行代码,其中获取锁和删除锁并不是原子操作。那么在并发的情况下,可能依然会出现删除掉其他人的锁的问题。下面我们可以将获取当前线程的锁和删除锁作为一个原子操作,也就是使用 redis 提供的 lua 脚本来执行,具体代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private Map<String, List<Catelog2Vo>> getCatalogJsonWithRedisLock() {
String uuid = UUID.randomUUID().toString();
// 加锁的同时并设置过期时间
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 30, TimeUnit.SECONDS);
Map<String, List<Catelog2Vo>> dataFromDb;
if (lock) {
log.info("获取分布式锁成功...");
try {
String catalogJSON = redisTemplate.opsForValue().get("catalogJson");
if (StringUtils.isNotEmpty(catalogJSON)) {
dataFromDb = JSON.parseObject(catalogJSON, new TypeReference<Map<String, List<Catelog2Vo>>>() {});
} else {
dataFromDb = getCatalogJsonFromDb();
String s = JSON.toJSONString(dataFromDb);
redisTemplate.opsForValue().set("catalogJson", s, 1, TimeUnit.DAYS);
}
} finally {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
//删除当前值为 uuid 的锁
Long lock1 = redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), uuid);
}
return dataFromDb;
} else {
log.info("获取分布式锁失败...等待重试");
try {
Thread.sleep(200);
} catch (Exception e) {
e.printStackTrace();
}
return getCatalogJsonWithRedisLock();
}
}

代码执行过程如下所示:

当前线程锁删除原子

至此我们便完成了 Redis 分布式锁的实现,另外 Redis 也提供了相关的类 Redisson 来实现分布式锁。

------ 本文结束------