跳到主要内容

分布式锁几种方式

分布式锁?

  • 在单体应用的时候,如果多个线程要访问共享资源的时候,我们通常线程间加锁的机制,在某一个时刻,只有一个线程可以对这个资源进行操作,其他线程需要等待。比如synchronized,ReentrantLock。
  • 而到了分布式的环境中,当某个资源可以被多个系统访问使用到的时候,为了保证大家访问这个数据是一致性的,那么就要求再同一个时刻,只能被一个系统使用,这时候线程之间的锁机制就无法起到作用了,因为分布式环境中,系统是会部署到不同的机器上面的,那么就需要分布式锁。

具备的特点:

  • 互斥性:和本地锁一样互斥性是最基本,但是分布式锁需要保证在不同节点的不同线程的互斥
  • 可重入性:同一个节点上的同一个线程如果获取了锁之后那么也可以再次获取这个锁
  • 锁超时:支持锁超时,防止死锁
  • 高效,高可用:加锁和解锁需要高效,同时也需要保证高可用防止分布式锁失效,可以增加降级
  • 支持阻塞和非阻塞:和ReentrantLock一样支持lock和trylock以及tryLock(long timeOut)

数据库实现分布式锁

xx

Zookeeper实现分布式锁

CAP模型属于CP,ZAB( ZooKeeper Atomic Broadcast )一致性算法实现,比较稳定。通常使用ZK集群。

ZK 数据结构跟 Unix 文件系统非常类似,可以看做是一颗树,每个节点叫做 znode,每一个节点可以通过路径来标识,znode 节点分为两种类型:

  • 持久节点:该数据节点被创建后,就会一直存在于 ZooKeeper 服务器上,直到有删除操作来主动删除这个节点,详细来说还可以分为普通持久节点和带顺序号的持久节点。
  • 临时节点:临时节点的生命周期和客户端会话绑定在一起,客户端会话失效,则这个节点就会被自动清除,同样分为普通临时节点和带顺序号的临时节点。

Springboot整合Curator

所需依赖

<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.3</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.1</version>
</dependency>

注意zookeeper的版本,可以使用命令 echo stat|nc 127.0.0.1 2181 查看版本号。

执行该命名需要:

  • 安装命令:yum install -y nc
  • conf/zoo.cfg 配置文件增加配置项目:4lw.commands.whitelist=*

application.yml 增加客户端需要的配置

server:
port: 8900
zookeeper:
curator:
hosts: 192.168.1.240:2181,192.168.1.240:2182,192.168.1.240:2183
sessionTimeOut: 50000
sleepMsBetweenRetry: 1000
maxRetries: 3
namespace: zookeeper_lock_test
connectionTimeoutMs: 50000

配置curator客户端

/**
* CuratorFramework配置
*/
@Configuration
@ConfigurationProperties(prefix = "zookeeper.curator")
@Data
public class ZookeeperConfig {
/**
* 集群地址
*/
private String hosts;
/**
* 连接超时时间
*/
private Integer connectionTimeoutMs;
/**
* 会话超时时间
*/
private Integer sessionTimeOut;
/**
* 重试机制时间参数
*/
private Integer sleepMsBetweenRetry;
/**
* 重试机制重试次数
*/
private Integer maxRetries;
/**
* 命名空间(父节点名称)
*/
private String namespace;

@Bean
public CuratorFramework curatorClient() {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(hosts)
.connectionTimeoutMs(connectionTimeoutMs)
.sessionTimeoutMs(sessionTimeOut)
.retryPolicy(new ExponentialBackoffRetry(sleepMsBetweenRetry, maxRetries))
.namespace(namespace)
.build();
client.start();
return client;
}
}

单元测试下

@SpringBootTest
@RunWith(SpringRunner.class)
public class CuratorTest {
@Resource
private CuratorFramework zkClient;

@Test
public void createNode() throws Exception {
// 持久节点
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/a/a");
// 临时节点
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/a/b");
}
}

Curator实现排他锁

InterProcessMutex实现的锁机制是公平且互斥的,公平的方式是按照每个请求的顺序进行排队的。

在指定节点下创建临时节点,谁的序号最小谁就获得了锁。其他节点监听比自己小的节点,如果比自己小的节点被删除了那么判断自己是否为最小的节点,如果是那么就获得了锁。

@Test
public void interProcessMutexA() {
InterProcessMutex interProcessMutex = new InterProcessMutex(zkClient, "/distributed_lock/lockA");
try {
interProcessMutex.acquire(1000, TimeUnit.SECONDS);
for (int i = 0; i < 5; i++) {
TimeUnit.SECONDS.sleep(2);
System.out.println("A独占锁中....." + i);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
interProcessMutex.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}

Curator实现读锁

创建⼀个临时序号节点,节点的名称会包含READ的字母,表示是读锁,⼤家都可以读,要想上读锁的前提:之前的锁没有写锁

上锁过程:

  • 创建⼀个临时序号节点
  • 获取当前zk中序号⽐⾃⼰⼩的所有节点
  • 判断最⼩节点是否是读锁:
    • 如果不是读锁的话,则上锁失败,为最⼩节点设置监听。阻塞等待,zk的watch机制会当最⼩节点发⽣变化时通知当前节点,于是再执⾏第⼆步的流程
    • 如果是读锁的话,则上锁成功

@Test
public void readLock() {
// 读写锁
InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(zkClient, "/distributed_lock/lock1");
InterProcessReadWriteLock.ReadLock readLock = interProcessReadWriteLock.readLock();
try {
readLock.acquire();
System.out.println("获取到了读锁!");
for (int i = 1; i <= 2; i++) {
Thread.sleep(3000);
System.out.println(i);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放
try {
readLock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}

Curator实现写锁

创建⼀个临时序号节点,节点的名称会包含WRIT的字母,表示是写锁,只有得到写锁的才能写。要想上写锁的前提是,之前没有任何锁

上锁过程:

  • 创建⼀个临时序号节点
  • 获取zk中所有的⼦节点
  • 判断⾃⼰是否是最⼩的节点:
    • 如果是,则上写锁成功
    • 如果不是,说明前⾯还有锁,则上锁失败,监听最⼩的节点,如果最⼩节点有变化,则回到第⼆步。

@Test
public void writeLock() {
// 写锁
InterProcessReadWriteLock interProcessReadWriteLock = new InterProcessReadWriteLock(zkClient, "/distributed_lock/lock1");
InterProcessReadWriteLock.WriteLock writeLock = interProcessReadWriteLock.writeLock();
try {
writeLock.acquire();
System.out.println("获取到了写锁锁!");
for (int i = 1; i <= 5; i++) {
Thread.sleep(3000);
System.out.println(i);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放
try {
writeLock.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}

Redis实现分布式锁

Redis实现锁的方式

SETNX + EXPIRE (❌)

先用setnx来抢锁,如果抢到之后,再用expire给锁设置一个过期时间,防止锁忘记了释放。

@Test
public void lock01() {
if (jedis.setnx(lock_key, "myLock-0001") == 1) {
jedis.expire(lock_key, 100);
try {
System.out.println("do somethings ....");
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
jedis.del(lock_key);
}
}
}

可以看出这两个操作并非原子操作。如果执行完setnx加锁,正要执行expire设置过期时间时,进程crash或者要重启维护了,那么这个锁就“长生不老”了,「别的线程永远获取不到锁」

SETNX + VALUE(系统时间+过期时间) (❌)

「发生异常锁得不到释放的场景」,可以把过期时间放到setnx的value值里面。如果加锁失败,再拿出value值校验一下即可。这样吧上面的操作合并成了一个操作。

public boolean getLock() {
// 一分钟过期
long expires = System.currentTimeMillis() + 60 * 1000;
if (jedis.setnx(lock_key, String.valueOf(expires)) == 1) {
return true;
} else {
String currentValueStr = jedis.get(lock_key);
if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
// 锁已过期,获取上一个锁的过期时间,并设置现在锁的过期时间(不了解redis的getSet命令的小伙伴,可以去官网看下哈)
String oldValueStr = jedis.getSet(lock_key, String.valueOf(expires));
if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
return true;
}
}
}
return false;
}

缺陷:

  • 过期时间是客户端自己生成的(System.currentTimeMillis()是当前系统的时间),必须要求分布式环境下,每个客户端的时间必须同步。
  • 如果锁过期的时候,并发多个客户端同时请求过来,都执行jedis.getSet(),最终只能有一个客户端加锁成功,但是该客户端锁的过期时间,可能被别的客户端覆盖
  • 该锁没有保存持有者的唯一标识,可能被别的客户端释放/解锁。

SET 扩展命令(SET EX PX NX)(❌)

既然默认的命令不能保证原子性,那么就使用扩展的命令,因为他们是原子操作。

SET key value [ EX seconds ] [ PX milliseconds] [ NX | XX ]

  • NX:表示key不存在的时候,才能set成功,也即保证只有第一个客户端请求才能获得锁,而其他客户端请求只能等其释放锁,才能获取
  • EX seconds:设定key的过期时间,时间单位是秒
  • PX milliseconds:设定key的过期时间,单位为毫秒
  • XX:仅当key存在时设置值
@Test
public void lock02() {
SetParams setParams = new SetParams();
setParams.nx();
setParams.ex(10);
String res = jedis.set(lock_key, "myLock-0001", setParams);
if ("OK".equals(res)) {
try {
System.out.println("do somethings ....");
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
jedis.del(lock_key);
System.out.println("do somethings end");
}
}
}

缺陷:

  • 问题一:「锁过期释放了,业务还没执行完」。假设线程a获取锁成功,一直在执行临界区的代码。但是100s过去后,它还没执行完。但是,这时候锁已经过期了,此时线程b又请求过来。显然线程b就可以获得锁成功,也开始执行临界区的代码。那么问题就来了,临界区的业务代码都不是严格串行执行的啦。
  • 问题二:「锁被别的线程误删」。假设线程a执行完后,去释放锁。但是它不知道当前的锁可能是线程b持有的(线程a去释放锁时,有可能过期时间已经到了,此时线程b进来占有了锁)。那线程a就把线程b的锁释放掉了,但是线程b临界区业务代码可能都还没执行完呢。

Lua 脚本(SETNX + EXPIRE 两条指令)

使用Lua脚本来保证 setnx和expire 两条指令原子性。

if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then
   redis.call('expire',KEYS[1],ARGV[2])
else
   return 0
end;

测试

@Test
public void lock03() {
String lua_scripts = "if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then" +
" redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end";
Object result = jedis.eval(lua_scripts, Collections.singletonList(lock_key), Lists.newArrayList("1234", "10"));
if (result.equals(1L)) {
try {
System.out.println("do somethings ....");
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
jedis.del(lock_key);
System.out.println("do somethings end");
}
} else {
System.out.println("获取锁失败!");
}
}

Redisson框架

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

用它解决了哪些问题?

  • 解决了setNx会导致死锁的问题
  • 解决了锁可重入的问题(对比setNx,会存储加锁的线程信息,加锁的次数信息 - 加了几次就要释放几次,通过hash数据结构进行存储)
  • 解决了业务还没执行完,锁被释放的问题(时间轮 + 看门狗)

官网:https://redisson.org/

springboot 快速集成

需要的依赖

<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.13.6</version>
</dependency>

客户端配置

@Configuration
public class RedissonConfig {

@Value("${spring.redis.host}")
private String redisHost;
@Value("${spring.redis.port}")
private String redisPort;

@Bean(destroyMethod = "shutdown")
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer().setAddress("redis://" + redisHost + ":" + redisPort);
return Redisson.create(config);
}
}

单元测试

@Test
public void getLock() {
RLock myLock = redissonClient.getLock("myLock");
// 加锁(阻塞等待),默认过期时间是30秒
myLock.lock();
try {
// 如果业务执行过长,Redisson会自动给锁续期
TimeUnit.SECONDS.sleep(5);
System.out.println("干点啥......");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 解锁
myLock.unlock();
System.out.println("干完了!");
}
}

Redis实现分布式锁的方式

上面说的各种方式都是针对单机redis。生产环境Redis都是集群部署。

分布式锁实现: Redisson

server:
port: 8900
redisson:
addr:
cluster:
hosts: redis://192.168.1.240:6301,redis://192.168.1.240:6302,redis://192.168.1.240:6401,redis://192.168.1.240:6402,redis://192.168.1.240:6501,redis://192.168.1.240:6502

集群配置

@Configuration
public class RedissonConfig {

@Value("${redisson.addr.cluster.hosts}")
private String hosts;

@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useClusterServers().addNodeAddress(hosts.split(","))
.setScanInterval(2000)
.setMasterConnectionPoolSize(128)
.setSlaveConnectionPoolSize(128);
return Redisson.create(config);
}
}

使用方式都是一样的。

同时还支持主从模式,哨兵模式的配置。

分布式锁实现: Redlock + Redisson

如果redis是做的主从、哨兵,会存在问题。主从同步数据是异步的。例如:master节点的key写成功了,马上返回给客户端这把锁加成功了,客户端线程1开始做业务逻辑了,此时redis主节点才会往从节点去同步。如果还没同步这个master就挂了,发生故障转移,其他的slave升级为master,这个时候导致锁丢失。

Redis中针对此种情况,引入了redlcok。redlcok采用主节点过半机制,即获取锁或者释放锁成功的标志为:过半的节点操作成功。

@Test
public void redissonRedLock() {
RLock myLock = redissonClient.getLock("myLock");
RedissonRedLock redissonRedLock = new RedissonRedLock(myLock);
redissonRedLock.lock();
try {
TimeUnit.SECONDS.sleep(5);
System.out.println("干点啥......");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
redissonRedLock.unlock();
System.out.println("干完了!");
}
}

参考文章