0%

缓存设计-注意缓存穿透问题

缓存回源的逻辑都是当缓存中查不到需要的数据时,回源到数据库查询。 这里容易出现的一个漏洞是,缓存中没有数据不一定代表数据没有缓存,还有一种可能是原始数据压根就不存在。

比如下面的例子。数据库中只保存有 ID 介于 0(不含)和 10000(包含)之间的用户,如果从数据库查询 ID 不在这个区间的用户,会得到空字符串,所以缓存中缓存的也是空字符串。如果使用 ID=0 去压接口的话,从缓存中查出了空字符串,认为是缓存中没有数据回源查询,其实相当于每次都回源:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Slf4j
@RestController
@RequestMapping("redis/ct")
public class RedisCtDemo {

private AtomicInteger atomicInteger = new AtomicInteger();

@Autowired
private StringRedisTemplate stringRedisTemplate;

@PostConstruct
public void init() {
// 每隔1秒输出一下回源的 QPS
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
log.info("DB QPS : {}", atomicInteger.getAndSet(0));
}, 0, 1, TimeUnit.SECONDS);
}

@GetMapping("wrong")
public String wrong(@RequestParam("id") int id) {
String key = "user" + id;
String data = stringRedisTemplate.opsForValue().get(key);
//无法区分是无效用户还是缓存失效
if (StringUtils.isEmpty(data)) {
data = getCityFromDb(id);
stringRedisTemplate.opsForValue().set(key, data, 30, TimeUnit.SECONDS);
}
return data;
}

private String getCityFromDb(int id) {
atomicInteger.incrementAndGet();
//注意,只有ID介于0(不含)和10000(包含)之间的用户才是有效用户,可以查询到用户信息
if (id > 0 && id <= 10000) {
return "userdata";
}
//否则返回空字符串
return "";
}
}

压测后数据库的 QPS 达到了几千:

1
2
3
4
5
6
7
8
9
10
11
2022-09-11 23:23:18.279  INFO 56864 --- [pool-1-thread-1] c.s.e.d.interfaces.redis.RedisCtDemo     : DB QPS : 384
2022-09-11 23:23:19.278 INFO 56864 --- [pool-1-thread-1] c.s.e.d.interfaces.redis.RedisCtDemo : DB QPS : 2044
2022-09-11 23:23:20.278 INFO 56864 --- [pool-1-thread-1] c.s.e.d.interfaces.redis.RedisCtDemo : DB QPS : 2656
2022-09-11 23:23:21.279 INFO 56864 --- [pool-1-thread-1] c.s.e.d.interfaces.redis.RedisCtDemo : DB QPS : 2836
2022-09-11 23:23:22.278 INFO 56864 --- [pool-1-thread-1] c.s.e.d.interfaces.redis.RedisCtDemo : DB QPS : 3391
2022-09-11 23:23:23.278 INFO 56864 --- [pool-1-thread-1] c.s.e.d.interfaces.redis.RedisCtDemo : DB QPS : 3907
2022-09-11 23:23:24.278 INFO 56864 --- [pool-1-thread-1] c.s.e.d.interfaces.redis.RedisCtDemo : DB QPS : 4406
2022-09-11 23:23:25.279 INFO 56864 --- [pool-1-thread-1] c.s.e.d.interfaces.redis.RedisCtDemo : DB QPS : 4245
2022-09-11 23:23:26.279 INFO 56864 --- [pool-1-thread-1] c.s.e.d.interfaces.redis.RedisCtDemo : DB QPS : 4524
2022-09-11 23:23:27.278 INFO 56864 --- [pool-1-thread-1] c.s.e.d.interfaces.redis.RedisCtDemo : DB QPS : 4268
2022-09-11 23:23:28.278 INFO 56864 --- [pool-1-thread-1] c.s.e.d.interfaces.redis.RedisCtDemo : DB QPS : 3920

如果这种漏洞被恶意利用的话,就会对数据库造成很大的性能压力。这就是缓存穿透。

这里需要注意,缓存穿透和缓存击穿的区别:

  • 缓存穿透是指,缓存没有起到压力缓冲的作用;
  • 而缓存击穿是指,缓存失效时瞬时的并发打到数据库。

解决缓存穿透有以下两种方案。

方案一,对于不存在的数据,同样设置一个特殊的 Value 到缓存中,比如当数据库中查出的用户信息为空的时候,设置 NODATA 这样具有特殊含义的字符串到缓存中。这样下次请求缓存的时候还是可以命中缓存,即直接从缓存返回结果,不查询数据库:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@GetMapping("right")
public String right(@RequestParam("id") int id) {
String key = "user" + id;
String data = stringRedisTemplate.opsForValue().get(key);
if (StringUtils.isEmpty(data)) {
data = getCityFromDb(id);
//校验从数据库返回的数据是否有效
if (StringUtils.isNotEmpty(data)) {
stringRedisTemplate.opsForValue().set(key, data, 30, TimeUnit.SECONDS);
} else {
//如果无效,直接在缓存中设置一个NODATA,这样下次查询时即使是无效用户还是可以命中缓存
stringRedisTemplate.opsForValue().set(key, "NODATA", 30, TimeUnit.SECONDS);
}
}
return data;
}

但,这种方式可能会把大量无效的数据加入缓存中,如果担心大量无效数据占满缓存的话还可以考虑方案二,即使用布隆过滤器做前置过滤。

布隆过滤器是一种概率型数据库结构,由一个很长的二进制向量和一系列随机映射函数组成。它的原理是,当一个元素被加入集合时,通过 k 个散列函数将这个元素映射成一个 m 位 bit 数组中的 k 个点,并置为 1。

检索时,我们只要看看这些点是不是都是 1 就(大概)知道集合中有没有它了。如果这些点有任何一个 0,则被检元素一定不在;如果都是 1,则被检元素很可能在。

原理如下图所示:

布隆过滤器不保存原始值,空间效率很高,平均每一个元素占用 2.4 字节就可以达到万分之一的误判率。这里的误判率是指,过滤器判断值存在而实际并不存在的概率。我们可以设置布隆过滤器使用更大的存储空间,来得到更小的误判率。

你可以把所有可能的值保存在布隆过滤器中,从缓存读取数据前先过滤一次:

  • 如果布隆过滤器认为值不存在,那么值一定是不存在的,无需查询缓存也无需查询数据库;
  • 对于极小概率的误判请求,才会最终让非法 Key 的请求走到缓存或数据库。

要用上布隆过滤器,我们可以使用 Google 的 Guava 工具包提供的 BloomFilter 类改造一下程序:启动时,初始化一个具有所有有效用户 ID 的、10000 个元素的 BloomFilter,在从缓存查询数据之前调用其 mightContain 方法,来检测用户 ID 是否可能存在;如果布隆过滤器说值不存在,那么一定是不存在的,直接返回:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Slf4j
@RestController
@RequestMapping("redis/ct/bloom")
public class RedisCtBloomDemo {
private AtomicInteger atomicInteger = new AtomicInteger();

@Autowired
private StringRedisTemplate stringRedisTemplate;

private BloomFilter<Integer> bloomFilter;

@PostConstruct
public void init() {
//创建布隆过滤器,元素数量10000,期望误判率1%
bloomFilter = BloomFilter.create(Funnels.integerFunnel(), 10000, 0.01);
//填充布隆过滤器
IntStream.rangeClosed(1, 10000).forEach(bloomFilter::put);

// 每隔1秒输出一下回源的 QPS
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
log.info("DB QPS : {}", atomicInteger.getAndSet(0));
}, 0, 1, TimeUnit.SECONDS);
}

@GetMapping("right2")
public String right2(@RequestParam("id") int id) {
String data = "";
//通过布隆过滤器先判断
if (bloomFilter.mightContain(id)) {
String key = "user" + id;
//走缓存查询
data = stringRedisTemplate.opsForValue().get(key);
if (StringUtils.isEmpty(data)) {
//走数据库查询
data = getCityFromDb(id);
stringRedisTemplate.opsForValue().set(key, data, 30, TimeUnit.SECONDS);
}
}
return data;
}

private String getCityFromDb(int id) {
atomicInteger.incrementAndGet();
//注意,只有ID介于0(不含)和10000(包含)之间的用户才是有效用户,可以查询到用户信息
if (id > 0 && id <= 10000) {
return "userdata";
}
//否则返回空字符串
return "";
}
}

对于方案二,我们需要同步所有可能存在的值并加入布隆过滤器,这是比较麻烦的地方。如果业务规则明确的话,你也可以考虑直接根据业务规则判断值是否存在。

其实,方案二可以和方案一同时使用,即将布隆过滤器前置,对于误判的情况再保存特殊值到缓存,双重保险避免无效数据查询请求打到数据库。

------ 本文结束------