Redisson分布式锁源码解析

2025-05-29 0 42

redisson锁继承implements reentrant lock,所以具备 reentrant lock 锁中的一些特性:超时,重试,可中断等。加上redisson中redis具备分布式的特性,所以非常适合用来做java中的分布式锁。 下面我们对其加锁、解锁过程中的源码细节进行一一分析。

锁的接口定义了一下方法:

Redisson分布式锁源码解析

分布式锁当中加锁,我们常用的加锁接口:

boolean trylock(long waittime, long leasetime, timeunit unit) throws interruptedexception;

下面我们来看一下方法的具体实现:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66
public boolean trylock(long waittime, long leasetime, timeunit unit) throws interruptedexception {

long time = unit.tomillis(waittime);

long current = system.currenttimemillis();

final long threadid = thread.currentthread().getid();

long ttl = tryacquire(leasetime, unit, threadid);

// lock acquired

if (ttl == null) {

return true;

}

time -= (system.currenttimemillis() - current);

if (time <= 0) {

acquirefailed(threadid);

return false;

}

current = system.currenttimemillis();

final rfuture subscribefuture = subscribe(threadid);

if (!await(subscribefuture, time, timeunit.milliseconds)) {

if (!subscribefuture.cancel(false)) {

subscribefuture.addlistener(new futurelistener() {

@override

public void operationcomplete(future future) throws exception {

if (subscribefuture.issuccess()) {

unsubscribe(subscribefuture, threadid);

}

}

});

}

acquirefailed(threadid);

return false;

}

try {

time -= (system.currenttimemillis() - current);

if (time <= 0) {

acquirefailed(threadid);

return false;

}

while (true) {

long currenttime = system.currenttimemillis();

ttl = tryacquire(leasetime, unit, threadid);

// lock acquired

if (ttl == null) {

return true;

}

time -= (system.currenttimemillis() - currenttime);

if (time = 0 && ttl < time) {

getentry(threadid).getlatch().tryacquire(ttl, timeunit.milliseconds);

} else {

getentry(threadid).getlatch().tryacquire(time, timeunit.milliseconds);

}

time -= (system.currenttimemillis() - currenttime);

if (time <= 0) {

acquirefailed(threadid);

return false;

}

}

} finally {

unsubscribe(subscribefuture, threadid);

}

// return get(trylockasync(waittime, leasetime, unit));

}

首先我们看到调用tryacquire尝试获取锁,在这里是否能获取到锁,是根据锁名称的过期时间ttl来判定的(ttl

下面我们接着看一下tryacquire的实现:

?

1

2

3
private long tryacquire(long leasetime, timeunit unit, long threadid) {

return get(tryacquireasync(leasetime, unit, threadid));

}

可以看到真正获取锁的操作经过一层get操作里面执行的,这里为何要这么操作,本人也不是太理解,如有理解错误,欢迎指正。

?

1
get 是由commandasyncexecutor(一个线程executor)封装的一个executor

设置一个单线程的同步控制器countdownlatch,用于控制单个线程的中断信息。个人理解经过中间的这么一步:主要是为了支持线程可中断操作。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32
public v get(rfuture future) {

if (!future.isdone()) {

final countdownlatch l = new countdownlatch(1);

future.addlistener(new futurelistener() {

@override

public void operationcomplete(future future) throws exception {

l.countdown();

}

});

boolean interrupted = false;

while (!future.isdone()) {

try {

l.await();

} catch (interruptedexception e) {

interrupted = true;

}

}

if (interrupted) {

thread.currentthread().interrupt();

}

}

// commented out due to blocking issues up to 200 ms per minute for each thread:由于每个线程的阻塞问题,每分钟高达200毫秒

// future.awaituninterruptibly();

if (future.issuccess()) {

return future.getnow();

}

throw convertexception(future);

}

我们进一步往下看:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21
private rfuture tryacquireasync(long leasetime, timeunit unit, final long threadid) {

if (leasetime != -1) {

return trylockinnerasync(leasetime, unit, threadid, rediscommands.eval_long);

}

rfuture ttlremainingfuture = trylockinnerasync(commandexecutor.getconnectionmanager().getcfg().getlockwatchdogtimeout(), timeunit.milliseconds, threadid, rediscommands.eval_long);

ttlremainingfuture.addlistener(new futurelistener() {

@override

public void operationcomplete(future future) throws exception {

if (!future.issuccess()) {

return;

}

long ttlremaining = future.getnow();

// lock acquired

if (ttlremaining == null) {

scheduleexpirationrenewal(threadid);

}

}

});

return ttlremainingfuture;

}

首先判断锁是否有超时时间,有过期时间的话,会在后面获取锁的时候设置进去。没有过期时间的话,则会用默认的

?

1
private long lockwatchdogtimeout = 30 * 1000;

下面我们在进一步往下分析真正获取锁的操作:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17
rfuture trylockinnerasync(long leasetime, timeunit unit, long threadid, redisstrictcommand command) {

internallockleasetime = unit.tomillis(leasetime);

return commandexecutor.evalwriteasync(getname(), longcodec.instance, command,

"if (redis.call('exists', keys[1]) == 0) then " +

"redis.call('hset', keys[1], argv[2], 1); " +

"redis.call('pexpire', keys[1], argv[1]); " +

"return nil; " +

"end; " +

"if (redis.call('hexists', keys[1], argv[2]) == 1) then " +

"redis.call('hincrby', keys[1], argv[2], 1); " +

"redis.call('pexpire', keys[1], argv[1]); " +

"return nil; " +

"end; " +

"return redis.call('pttl', keys[1]);",

collections.singletonlist(getname()), internallockleasetime, getlockname(threadid));

}

我把里面的重点信息做了以下三点总结:

1:真正执行的是一段具有原子性的lua脚本,并且最终也是由commandasynexecutor去执行。

2:锁真正持久化到redis时,用的hash类型key field value

3:获取锁的三个参数:getname()是逻辑锁名称,例如:分布式锁要锁住的methodname+params;internallockleasetime是毫秒单位的锁过期时间;getlockname则是锁对应的线程级别的名称,因为支持相同线程可重入,不同线程不可重入,所以这里的锁的生成方式是:uuid+":"threadid。有的同学可能会问,这样不是很缜密:不同的jvm可能会生成相同的threadid,所以redission这里加了一个区分度很高的uuid;

lua脚本中的执行分为以下三步:

1:exists检查redis中是否存在锁名称;如果不存在,则获取成功;同时把逻辑锁名称keys[1],线程级别的锁名称[argv[2],value=1,设置到redis。并设置逻辑锁名称的过期时间argv[2],返回;

2:如果检查到存在keys[1],[argv[2],则说明获取成功,此时会自增对应的value值,记录重入次数;并更新锁的过期时间

3:key不存,直接返回key的剩余过期时间(-2)

原文链接:https://www.roncoo.com/article/detail/133572

收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

快网idc优惠网 建站教程 Redisson分布式锁源码解析 https://www.kuaiidc.com/111529.html

相关文章

发表评论
暂无评论