0%

SpringBoot 中 Redisson 的使用

1. SpringBoot 整合

1)导入依赖:

1
2
3
4
5
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.12.0</version>
</dependency>

2)实现配置类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Configuration
public class MyRedissonConfig {

/**
* 所有对Redisson的使用都是通过RedissonClient对象
*
* @return
* @throws IOException
*/
@Bean(destroyMethod = "shutdown")
public RedissonClient redisson(@Value("${spring.redis.host}") String url) throws IOException {
//1、创建配置
//Redis url should start with redis:// or rediss://
Config config = new Config();
config.useSingleServer().setAddress("redis://" + url + ":6379");

//2、根据 Config 创建出 RedissonClient 示例
RedissonClient redissonClient = Redisson.create(config);
return redissonClient;
}
}

2. Lock 锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 测试 lock 锁的使用
* @return
*/
@RequestMapping("/hello")
public String hello() {
// 获取一把锁,只要锁的名字一样,就是同一把锁
RLock lock = redissonClient.getLock("my-lock");

// lock.lock(10,TimeUnit.SECONDS); //10秒自动解锁,自动解锁时间一定要大于业务的执行时间。不会自动过期续期,谨慎使用

//加锁,默认加的锁都是30s时间
lock.lock();
try {
log.info("加锁成功,执行业务..." + Thread.currentThread().getId());
Thread.sleep(15000);
} catch (Exception e) {
e.printStackTrace();
} finally {
//3解锁
log.info("释放锁..." + Thread.currentThread().getId());
lock.unlock();
}
return "hello";
}

观察如上代码,经测试发现如下两点(看门狗原理以预防死锁):

  • RLock 锁系统默认过期时间为 30s;
  • 当业务代码执行时间超过默认过期时间时长时,RLock 锁会每个三分之一个看门狗时间来自动续期,即重新设置为 30s。

3. 读写锁

查看下面代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Autowired
private RedissonClient redissonClient;

@Autowired
private StringRedisTemplate redisTemplate;

/**
* 使用写锁进行写数据
*/
@ResponseBody
@RequestMapping("/writeValue")
public String writeValue() {
// 获取读写锁,并设置写锁
String s = null;
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("rw-lock");
RLock writeLock = readWriteLock.writeLock();
try {
log.info("成功加上写锁...");
writeLock.lock();
s = UuidUtils.generateUuid();
Thread.sleep(20000);
redisTemplate.opsForValue().set("value", s);
} catch (Exception ex) {
ex.printStackTrace();
} finally {
writeLock.unlock();
}
return s;
}

/**
* 使用读锁进行读数据
*/
@ResponseBody
@RequestMapping("/readValue")
public String readValue() {
String s = null;
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("rw-lock");
RLock readLock = readWriteLock.readLock();
readLock.lock();
try {
log.info("获取读锁...");
s = redisTemplate.opsForValue().get("value");
} catch (Exception ex) {
ex.printStackTrace();
} finally {
readLock.unlock();
}
return s;
}

经测试得出以下结论:

  • 写锁 + 写锁:并发互斥
  • 读锁 + 写锁:并发互斥,无论先后
  • 读锁 + 读锁:并发进行,相当于无锁

4. 闭锁

Redisson 的闭锁和 JUC 包下的 CountDownLatch 有一样的效果,即是主线程先等待其他线程执行完成之后再继续执行。闭锁存在一个计数器,当计数等于 0 的时候,表示其他线程已经执行完成,主线程继续执行。查看如下示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 放假,锁门
* 1班没人了,2
* 5个班全部走完,我们可以锁大门
*/
@GetMapping("/lockDoor")
@ResponseBody
public String lockDoor() throws InterruptedException {
RCountDownLatch door = redissonClient.getCountDownLatch("door");
door.trySetCount(5);
door.await(); //等待闭锁都完成
return "放假了...";
}

@ResponseBody
@GetMapping("/gogogo/{id}")
public String gogogo(@PathVariable("id") Long id) {
RCountDownLatch door = redissonClient.getCountDownLatch("door");
door.countDown();//计数减一;
return id + "班的人都走了...";
}

当 gogogo 方法全部执行五次之后,RCountDownLatch 中的计数器变为 0 ,主线程从第 12 行代码开始继续执行。

5. 信号量

Redisson 也提供信号量机制,查看如下代码示例,把信号量当做一个停车车位,假设现在 redis 中已经存在 key 为 park 且值为 3 的数据,其中 3 表示初始的车位数。调用了三次 park 方法之后,3 会变为 0,此时调用 park 方法将会返回 error表示不可停车,直到调用 go 方法是 0 变为 1 开始。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 车库停车,一个信号量相当于一个车位
*/
@GetMapping("/park")
@ResponseBody
public String park() throws InterruptedException {
RSemaphore park = redissonClient.getSemaphore("park");
//获取一个信号量(车位),会持续等待
// park.acquire();
// 尝试获取一个信息量(车位),获取不到直接得到 false
boolean b = park.tryAcquire();
if (b) {
// 执行业务
} else {
return "error";
}
return "ok=>" + b;
}

/*
* 离开车位
*/
@GetMapping("/go")
@ResponseBody
public String go() throws InterruptedException {
RSemaphore park = redissonClient.getSemaphore("park");
park.release();// 释放一个信号量(车位)
return "ok";
}

6. 缓存一致性问题

缓存一致性问题指的是数据库数据和缓存中的数据不一致的问题,解决缓存一致性问题主要有如下两种模式:

  • 双写模式:在更新数据库数据的时候,同时更新缓存中的数据;
  • 失效模式:在更新数据库数据的时候,将缓存的数据删除掉,等下次查询时重新从数据库获取并写入缓存

但是两者都存在一定的问题,下图为双写模式下可能遇到的问题,简单的将就是写数据库和写缓存两步操作不是原子操作导致的并发问题。

双写模式问题

同理在失效模式下也有相似的问题,观察下图,第一个请求写完数据库之后成功删除了缓存数据,第二个请求写数据库花费的时间较长。当第三个请求过来时发现缓存中数据不存在,此时去读取数据库获取数据库数据(旧数据,此时第二个请求还没写完数据库),并最后将数据库的旧数据更新进缓存。和双写模式类似,这是由于写数据库和删缓存两步操作不是原子操作导致的并发问题。

失效模式问题

6.1 缓存一致性问题解决方案

1、如果是用户纬度数据(订单数据、用户数据),这种并发几率非常小,不用考虑这个问题,缓存数据加上过期时间,每隔一段时间触发读的主动更新即可;

2、如果是菜单,商品介绍等基础数据,也可以去使用canal订阅binlog的方式;

3、缓存数据 + 过期时间也足够解决大部分业务对于缓存的要求;

4、通过加锁保证并发读写,写写的时候按顺序排好队(SpringCache 实现)。读读无所谓。所以适合使用读写锁。(业务不关心髒数据,允许临时髒数据可忽略)

我们能放入缓存的数据本就不应该是实时性、一致性要求超高的。所以缓存数据的时候加上过期时间,保证每天拿到当前最新数据即可。 我们不应该过度设计,增加系统的复杂性 。 遇到实时性、一致性要求高的数据,就应该查数据库,即使慢点。

6.2 canal 订阅 binlog 方式

解决缓存一致性问题也可以使用 Canal。Canal 是一个中间件,当我们更新了数据库的数据时其会自动去更新缓存的数据从而避免缓存一致性问题。观察下图,canal 会去监听数据的操作,其实质是去获取数据库的 binlog 日志记录,然后根据 binlog 对订阅了 canal 缓存 redis 数据进行更新。

Canal 原理图

canal 订阅 binlog 方式的工作原理其实是数据库的主从备份,canal 相当于一个伪装的数据库从服务器。

------ 本文结束------