浅谈Java(SpringBoot)基于zookeeper的分布式锁实现

2025-05-29 0 23

通过zookeeper实现分布式锁

1、创建zookeeper的client

首先通过curatorframeworkfactory创建一个连接zookeeper的连接curatorframework client

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75
public class curatorfactorybean implements factorybean<curatorframework>, initializingbean, disposablebean {

private static final logger logger = loggerfactory.getlogger(contractfileinfocontroller.class);

private string connectionstring;

private int sessiontimeoutms;

private int connectiontimeoutms;

private retrypolicy retrypolicy;

private curatorframework client;

public curatorfactorybean(string connectionstring) {

this(connectionstring, 500, 500);

}

public curatorfactorybean(string connectionstring, int sessiontimeoutms, int connectiontimeoutms) {

this.connectionstring = connectionstring;

this.sessiontimeoutms = sessiontimeoutms;

this.connectiontimeoutms = connectiontimeoutms;

}

@override

public void destroy() throws exception {

logger.info("closing curator framework...");

this.client.close();

logger.info("closed curator framework.");

}

@override

public curatorframework getobject() throws exception {

return this.client;

}

@override

public class<?> getobjecttype() {

return this.client != null ? this.client.getclass() : curatorframework.class;

}

@override

public boolean issingleton() {

return true;

}

@override

public void afterpropertiesset() throws exception {

if (stringutils.isempty(this.connectionstring)) {

throw new illegalstateexception("connectionstring can not be empty.");

} else {

if (this.retrypolicy == null) {

this.retrypolicy = new exponentialbackoffretry(1000, 2147483647, 180000);

}

this.client = curatorframeworkfactory.newclient(this.connectionstring, this.sessiontimeoutms, this.connectiontimeoutms, this.retrypolicy);

this.client.start();

this.client.blockuntilconnected(30, timeunit.milliseconds);

}

}

public void setconnectionstring(string connectionstring) {

this.connectionstring = connectionstring;

}

public void setsessiontimeoutms(int sessiontimeoutms) {

this.sessiontimeoutms = sessiontimeoutms;

}

public void setconnectiontimeoutms(int connectiontimeoutms) {

this.connectiontimeoutms = connectiontimeoutms;

}

public void setretrypolicy(retrypolicy retrypolicy) {

this.retrypolicy = retrypolicy;

}

public void setclient(curatorframework client) {

this.client = client;

}

}

2、封装分布式锁

根据curatorframework创建interprocessmutex(分布式可重入排它锁)对一行数据进行上锁

?

1

2

3
public interprocessmutex(curatorframework client, string path) {

this(client, path, new standardlockinternalsdriver());

}

使用 acquire方法
1、acquire() :入参为空,调用该方法后,会一直堵塞,直到抢夺到锁资源,或者zookeeper连接中断后,上抛异常。
2、acquire(long time, timeunit unit):入参传入超时时间、单位,抢夺时,如果出现堵塞,会在超过该时间后,返回false。

?

1

2

3

4

5

6

7

8

9
public void acquire() throws exception {

if (!this.internallock(-1l, (timeunit)null)) {

throw new ioexception("lost connection while trying to acquire lock: " + this.basepath);

}

}

public boolean acquire(long time, timeunit unit) throws exception {

return this.internallock(time, unit);

}

释放锁 mutex.release();

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21
public void release() throws exception {

thread currentthread = thread.currentthread();

interprocessmutex.lockdata lockdata = (interprocessmutex.lockdata)this.threaddata.get(currentthread);

if (lockdata == null) {

throw new illegalmonitorstateexception("you do not own the lock: " + this.basepath);

} else {

int newlockcount = lockdata.lockcount.decrementandget();

if (newlockcount <= 0) {

if (newlockcount < 0) {

throw new illegalmonitorstateexception("lock count has gone negative for lock: " + this.basepath);

} else {

try {

this.internals.releaselock(lockdata.lockpath);

} finally {

this.threaddata.remove(currentthread);

}

}

}

}

}

封装后的dlock代码
1、调用interprocessmutex processmutex = dlock.mutex(path);

2、手动释放锁processmutex.release();

3、需要手动删除路径dlock.del(path);

推荐 使用:
都是 函数式编程
在业务代码执行完毕后 会释放锁和删除path
1、这个有返回结果
public t mutex(string path, zklockcallback zklockcallback, long time, timeunit timeunit)
2、这个无返回结果
public void mutex(string path, zkvoidcallback zklockcallback, long time, timeunit timeunit)

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112
public class dlock {

private final logger logger;

private static final long timeout_d = 100l;

private static final string root_path_d = "/dlock";

private string lockrootpath;

private curatorframework client;

public dlock(curatorframework client) {

this("/dlock", client);

}

public dlock(string lockrootpath, curatorframework client) {

this.logger = loggerfactory.getlogger(dlock.class);

this.lockrootpath = lockrootpath;

this.client = client;

}

public interprocessmutex mutex(string path) {

if (!stringutils.startswith(path, "/")) {

path = constant.keybuilder(new object[]{"/", path});

}

return new interprocessmutex(this.client, constant.keybuilder(new object[]{this.lockrootpath, "", path}));

}

public <t> t mutex(string path, zklockcallback<t> zklockcallback) throws zklockexception {

return this.mutex(path, zklockcallback, 100l, timeunit.milliseconds);

}

public <t> t mutex(string path, zklockcallback<t> zklockcallback, long time, timeunit timeunit) throws zklockexception {

string finalpath = this.getlockpath(path);

interprocessmutex mutex = new interprocessmutex(this.client, finalpath);

try {

if (!mutex.acquire(time, timeunit)) {

throw new zklockexception("acquire zk lock return false");

}

} catch (exception var13) {

throw new zklockexception("acquire zk lock failed.", var13);

}

t var8;

try {

var8 = zklockcallback.doinlock();

} finally {

this.releaselock(finalpath, mutex);

}

return var8;

}

private void releaselock(string finalpath, interprocessmutex mutex) {

try {

mutex.release();

this.logger.info("delete zk node path:{}", finalpath);

this.deleteinternal(finalpath);

} catch (exception var4) {

this.logger.error("dlock", "release lock failed, path:{}", finalpath, var4);

// logutil.error(this.logger, "dlock", "release lock failed, path:{}", new object[]{finalpath, var4});

}

}

public void mutex(string path, zkvoidcallback zklockcallback, long time, timeunit timeunit) throws zklockexception {

string finalpath = this.getlockpath(path);

interprocessmutex mutex = new interprocessmutex(this.client, finalpath);

try {

if (!mutex.acquire(time, timeunit)) {

throw new zklockexception("acquire zk lock return false");

}

} catch (exception var13) {

throw new zklockexception("acquire zk lock failed.", var13);

}

try {

zklockcallback.response();

} finally {

this.releaselock(finalpath, mutex);

}

}

public string getlockpath(string custompath) {

if (!stringutils.startswith(custompath, "/")) {

custompath = constant.keybuilder(new object[]{"/", custompath});

}

string finalpath = constant.keybuilder(new object[]{this.lockrootpath, "", custompath});

return finalpath;

}

private void deleteinternal(string finalpath) {

try {

((errorlistenerpathable)this.client.delete().inbackground()).forpath(finalpath);

} catch (exception var3) {

this.logger.info("delete zk node path:{} failed", finalpath);

}

}

public void del(string custompath) {

string lockpath = "";

try {

lockpath = this.getlockpath(custompath);

((errorlistenerpathable)this.client.delete().inbackground()).forpath(lockpath);

} catch (exception var4) {

this.logger.info("delete zk node path:{} failed", lockpath);

}

}

}

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22
@functionalinterface

public interface zklockcallback<t> {

t doinlock();

}

@functionalinterface

public interface zkvoidcallback {

void response();

}

public class zklockexception extends exception {

public zklockexception() {

}

public zklockexception(string message) {

super(message);

}

public zklockexception(string message, throwable cause) {

super(message, cause);

}

}

配置curatorconfig

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25
@configuration

public class curatorconfig {

@value("${zk.connectionstring}")

private string connectionstring;

@value("${zk.sessiontimeoutms:500}")

private int sessiontimeoutms;

@value("${zk.connectiontimeoutms:500}")

private int connectiontimeoutms;

@value("${zk.dlockroot:/dlock}")

private string dlockroot;

@bean

public curatorfactorybean curatorfactorybean() {

return new curatorfactorybean(connectionstring, sessiontimeoutms, connectiontimeoutms);

}

@bean

@autowired

public dlock dlock(curatorframework client) {

return new dlock(dlockroot, client);

}

}

测试代码

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55
@restcontroller

@requestmapping("/dlock")

public class lockcontroller {

@autowired

private dlock dlock;

@requestmapping("/lock")

public map testdlock(string no){

final string path = constant.keybuilder("/test/no/", no);

long mutex=0l;

try {

system.out.println("在拿锁:"+path+system.currenttimemillis());

mutex = dlock.mutex(path, () -> {

try {

system.out.println("拿到锁了" + system.currenttimemillis());

thread.sleep(10000);

system.out.println("操作完成了" + system.currenttimemillis());

} finally {

return system.currenttimemillis();

}

}, 1000, timeunit.milliseconds);

} catch (zklockexception e) {

system.out.println("拿不到锁呀"+system.currenttimemillis());

}

return collections.singletonmap("ret",mutex);

}

@requestmapping("/dlock")

public map testdlock1(string no){

final string path = constant.keybuilder("/test/no/", no);

long mutex=0l;

try {

system.out.println("在拿锁:"+path+system.currenttimemillis());

interprocessmutex processmutex = dlock.mutex(path);

processmutex.acquire();

system.out.println("拿到锁了" + system.currenttimemillis());

thread.sleep(10000);

processmutex.release();

system.out.println("操作完成了" + system.currenttimemillis());

} catch (zklockexception e) {

system.out.println("拿不到锁呀"+system.currenttimemillis());

e.printstacktrace();

}catch (exception e){

e.printstacktrace();

}

return collections.singletonmap("ret",mutex);

}

@requestmapping("/del")

public map deldlock(string no){

final string path = constant.keybuilder("/test/no/", no);

dlock.del(path);

return collections.singletonmap("ret",1);

}

}

以上所述是小编给大家介绍的java(springboot)基于zookeeper的分布式锁实现详解整合,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对快网idc网站的支持!

原文链接:https://blog.csdn.net/LJY_SUPER/article/details/87807091

收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

快网idc优惠网 建站教程 浅谈Java(SpringBoot)基于zookeeper的分布式锁实现 https://www.kuaiidc.com/109592.html

相关文章

发表评论
暂无评论