Java高进进阶之FastThreadLocal源码详解(修复ThreadLocal的缺陷)

2025-05-29 0 71

Java高进进阶之FastThreadLocal源码详解(修复ThreadLocal的缺陷)

前言

ThreadLocalThreadLocalMap中的entry的key弱引用,如果出现GC的情况时,

没有被其他对象引用,会被回收,但是ThreadLocal对应的value却不会回收,容易造成内存泄漏,这也间接导致了内存溢出以及数据假丢失;

那么问题来了,有没有更高效的ThreadLocal有;

今天我们就来分析一波FastThreadLocalThread

一、FastThreadLocalThread源码分析

Netty为了在某些场景下提高性能,改进了jdk ThreadLocal,Netty实现的FastThreadLocal 优化了Java 原生 ThreadLocal 的访问速度,存储速度。避免了检测弱引用带来的 value 回收难问题,和数组位置冲突带来的线性查找问题,解决这些问题并不是没有代价;

Netty实现的 FastThreadLocal 底层也是通过数组存储 value 对象,与Java原生ThreadLocal使用自身作为Entry的key不同,FastThreadLocal通过保存数组的全局唯一下标,实现了对value的快速访问。同时FastThreadLocal 也实现了清理对象的方法;

1、FastThreadLocalThread

在Netty中,要使用 FastThreadLocal 实现线程本地变量需要将线程包装成 FastThreadLocalThread ,如果不是 FastThreadLocalThread ,会使用 slowThreadLocalMap的 ThreadLocal 来存储变量副本;

  1. io.netty.util.concurrent.DefaultThreadFactory
  2. @Override
  3. publicThreadnewThread(Runnabler){
  4. Threadt=newThread(FastThreadLocalRunnable.wrap(r),prefix+nextId.incrementAndGet());
  5. //一般daemon为false,意思是不设置为守护线程
  6. if(t.isDaemon()!=daemon){
  7. t.setDaemon(daemon);
  8. }
  9. //优先级默认为5
  10. if(t.getPriority()!=priority){
  11. t.setPriority(priority);
  12. }
  13. returnt;
  14. }
  15. protectedThreadnewThread(Runnabler,Stringname){
  16. returnnewFastThreadLocalThread(threadGroup,r,name);
  17. }

FastThreadLocalThread 继承自Thread类,有如下成员变量:

  1. io.netty.util.concurrent.FastThreadLocalThread
  2. //任务执行完,是否清除FastThreadLocal的标记
  3. privatefinalbooleancleanupFastThreadLocals;
  4. //类似于Thread类中ThreadLocalMap,为了实现FastThreadLocal
  5. privateInternalThreadLocalMapthreadLocalMap;

2、 InternalThreadLocalMap

Java高进进阶之FastThreadLocal源码详解(修复ThreadLocal的缺陷)

FastThreadLocalThread.threadLocalMap 是 InternalThreadLocalMap 对象实例。在第一次获取FTL数据时,会初始化FastThreadLocalThread.threadLocalMap,调用的构造函数如下:

  1. privateInternalThreadLocalMap(){
  2. //为了简便,InternalThreadLocalMap父类
  3. //UnpaddedInternalThreadLocalMap不展开介绍
  4. super(newIndexedVariableTable());
  5. }
  6. //默认的数组大小为32,且使用UNSET对象填充数组
  7. //如果下标处数据为UNSET,则表示没有数据
  8. privatestaticObject[]newIndexedVariableTable(){
  9. Object[]array=newObject[32];
  10. Arrays.fill(array,UNSET);
  11. returnarray;
  12. }

为了避免写时候影响同一cpu缓冲行的其他数据并发访问,其使用了缓存行填充技术 (cpu 缓冲行填充),在类定义中声明了如下long字段进行填充;

  1. //InternalThreadLocalMap
  2. //Cachelinepadding(mustbepublic)
  3. //WithCompressedOopsenabled,aninstanceofthisclassshouldoccupyatleast128bytes.
  4. publiclongrp1,rp2,rp3,rp4,rp5,rp6,rp7,rp8,rp9;

FTL使用的数组下标是InternalThreadLocalMap中的静态变量nextIndex统一递增生成的:

  1. staticfinalAtomicIntegernextIndex=newAtomicInteger();
  2. publicstaticintnextVariableIndex(){
  3. //Netty中所有FTL数组下标都是通过递增这个静态变量实现的
  4. //采用静态变量生成所有FTL元素在数组中的下标会造成一个问题,
  5. //会造成InternalThreadLocalMap中数组不必要的自动扩容
  6. intindex=nextIndex.getAndIncrement();
  7. if(index<0){
  8. nextIndex.decrementAndGet();
  9. thrownewIllegalStateException("toomanythread-localindexedvariables");
  10. }
  11. returnindex;
  12. }

InternalThreadLocalMap.nextVariableIndex()方法获取FTL在该FastThreadLocalThread.threadLocalMap数组下标,因为InternalThreadLocalMap.nextVariableIndex() 使用静态域 nextIndex 递增维护所有FTL的下标,会造成后面实例化的 FTL 下标过大,如果FTL下标大于其对应 FastThreadLocalThread.threadLocalMap 数组的长度,会进行数组的自动扩容,如下:

  1. privatevoidexpandIndexedVariableTableAndSet(intindex,Objectvalue){
  2. Object[]oldArray=indexedVariables;
  3. finalintoldCapacity=oldArray.length;
  4. //下面复杂的实现是为了将newCapacity规范为最接近的一个2的指数,
  5. //这段代码在早期的jdkHashMap中见过
  6. intnewCapacity=index;
  7. newCapacity|=newCapacity>>>1;
  8. newCapacity|=newCapacity>>>2;
  9. newCapacity|=newCapacity>>>4;
  10. newCapacity|=newCapacity>>>8;
  11. newCapacity|=newCapacity>>>16;
  12. newCapacity++;
  13. Object[]newArray=Arrays.copyOf(oldArray,newCapacity);
  14. Arrays.fill(newArray,oldCapacity,newArray.length,UNSET);
  15. newArray[index]=value;
  16. indexedVariables=newArray;
  17. }

3、FastThreadLocal

构造函数:

有两个重要的下标域,FTL不仅在FastThreadLocalThread.threadLocalMap中保存了用户实际使用的value(在数组中的下标为index),还在数组中保存为了实现清理记录的相关数据,也即下标variablesToRemoveIndex,一般情况 variablesToRemoveIndex = 0;因为variablesToRemoveIndex 是静态变量,所以全局唯一;

  1. //如果在该FTL中放入了数据,也就实际调用了其set或get函数,会在
  2. //该FastThreadLocalThread.threadLocalMap数组的
  3. //variablesToRemoveIndex下标处放置一个IdentityHashMap,
  4. //并将该FTL放入IdentityHashMap中,在后续清理时会取出
  5. //variablesToRemoveIndex下标处的IdentityHashMap进行清理
  6. privatestaticfinalintvariablesToRemoveIndex=InternalThreadLocalMap.nextVariableIndex();
  7. //在threadLocalMap数组中存放实际数据的下标
  8. privatefinalintindex;
  9. publicFastThreadLocal(){
  10. index=InternalThreadLocalMap.nextVariableIndex();
  11. }

用户可扩展的函数:

  1. //初始化value函数
  2. protectedVinitialValue()throwsException{
  3. returnnull;
  4. }
  5. //让使用者在该FTL被移除时可以有机会做些操作。
  6. protectedvoidonRemoval(@SuppressWarnings("UnusedParameters")Vvalue)throwsException{}

FastThreadLocalThread

Java高进进阶之FastThreadLocal源码详解(修复ThreadLocal的缺陷)

cleanupFastThreadLocals 字段在 4.1 的最新版本中已经没有在用到了

  1. /**
  2. *true,表示FTL会在线程结束时被主动清理见FastThreadLocalRunnable类
  3. *false,需要将FTL放入后台清理线程的队列中
  4. */
  5. //ThiswillbesettotrueifwehaveachancetowraptheRunnable.
  6. //这个字段则用于标识该线程在结束时是否会主动清理FTL
  7. privatefinalbooleancleanupFastThreadLocals;
  8. //次对象将在第一次FastThreadLocal.get和FastThreadLocal.set时候创建
  9. privateInternalThreadLocalMapthreadLocalMap;
  10. publicFastThreadLocalThread(Runnabletarget){
  11. super(FastThreadLocalRunnable.wrap(target));
  12. cleanupFastThreadLocals=true;
  13. }

4、 set 方法

  1. publicfinalvoidset(Vvalue){
  2. //判断设置的value值是否是缺省值
  3. if(value!=InternalThreadLocalMap.UNSET){
  4. //获取当前线程的InternalThreadLocalMap,如果当前线程为FastThreadLocalThread,那么直接通过threadLocalMap引用获取
  5. //否则通过jdk原生的threadLocal获取
  6. InternalThreadLocalMapthreadLocalMap=InternalThreadLocalMap.get();
  7. //FastThreadLocal对应的index下标的value替换成新的value
  8. setKnownNotUnset(threadLocalMap,value);
  9. }else{
  10. //如果放置的对象为UNSET,则表示清理,会对该FTL进行清理,类似毒丸对象
  11. remove();
  12. }
  13. }

这里扩容方会调用 InternalThreadLocalMap.expandIndexedVariableTableAndSet

  1. privatevoidsetKnownNotUnset(InternalThreadLocalMapthreadLocalMap,Vvalue){
  2. //在数组下标index处放置实际对象,如果index大于数组length,会进行数组扩容.
  3. if(threadLocalMap.setIndexedVariable(index,value)){
  4. //放置成功之后,将该FTL加入到variablesToRemoveIndex下标的
  5. //IdentityHashMap,等待后续清理
  6. addToVariablesToRemove(threadLocalMap,this);
  7. }
  8. }
  1. /**
  2. *该FTL加入到variablesToRemoveIndex下标的IdentityHashMap
  3. *IdentityHashMap的特性可以保证同一个实例不会被多次加入到该位置
  4. */
  5. @SuppressWarnings("unchecked")
  6. privatestaticvoidaddToVariablesToRemove(InternalThreadLocalMapthreadLocalMap,FastThreadLocal<?>variable){
  7. //获取variablesToRemoveIndex下标处的IdentityHashMap
  8. Objectv=threadLocalMap.indexedVariable(variablesToRemoveIndex);
  9. Set<FastThreadLocal<?>>variablesToRemove;
  10. //如果是第一次获取,则variablesToRemoveIndex下标处的值为UNSET
  11. if(v==InternalThreadLocalMap.UNSET||v==null){
  12. //新建一个新的IdentityHashMap并
  13. variablesToRemove=Collections.newSetFromMap(newIdentityHashMap<FastThreadLocal<?>,Boolean>());
  14. //放入到下标variablesToRemoveIndex处
  15. threadLocalMap.setIndexedVariable(variablesToRemoveIndex,variablesToRemove);
  16. }else{
  17. variablesToRemove=(Set<FastThreadLocal<?>>)v;
  18. }
  19. //将该FTL放入该IdentityHashMap中
  20. variablesToRemove.add(variable);
  21. }

下面看InternalThreadLocalMap.get()实现:

  1. publicstaticInternalThreadLocalMapget(){
  2. Threadthread=Thread.currentThread();
  3. //首先看当前thread是否为FastThreadLocalThread实例
  4. //如果是的话,可以快速通过引用,获取到其threadLocalMap
  5. if(threadinstanceofFastThreadLocalThread){
  6. returnfastGet((FastThreadLocalThread)thread);
  7. }else{
  8. //如果不是,则jdk原生慢速获取到其threadLocalMap
  9. returnslowGet();
  10. }
  11. }

5、 get 方法

get方法极为简单,实现如下:

  1. ===========================FastThreadLocal==========================
  2. publicfinalVget(){
  3. InternalThreadLocalMapthreadLocalMap=InternalThreadLocalMap.get();
  4. Objectv=threadLocalMap.indexedVariable(index);
  5. if(v!=InternalThreadLocalMap.UNSET){
  6. return(V)v;
  7. }
  8. returninitialize(threadLocalMap);
  9. }

首先获取当前线程的map,然后根据 FastThreadLocal的index 获取value,然后返回,如果是空对象,则通过 initialize 返回,initialize 方法会将返回值设置到 map 的槽位中,并放进 Set 中;

  1. initialize
  2. ============================FastThreadLocal==========================
  3. privateVinitialize(InternalThreadLocalMapthreadLocalMap){
  4. Vv=null;
  5. try{
  6. //1、获取初始值
  7. v=initialValue();
  8. }catch(Exceptione){
  9. thrownewRuntimeException(e);
  10. }
  11. //2、设置value到InternalThreadLocalMap中
  12. threadLocalMap.setIndexedVariables(index,v);
  13. //3、添加当前的FastThreadLocal到InternalThreadLocalMap的Set<FastThreadLocal<?>>中
  14. addToVariablesToRemove(threadLocalMap,this);
  15. returnv;
  16. }
  17. //初始化参数:由子类复写
  18. protectedVinitialValue()throwsException{
  19. returnnull;
  20. }
  • 获取 ThreadLocalMap
  • 直接通过索引取出对象
  • 如果为空那么调用初始化方法初始化

6、ftl的资源回收机制

netty中ftl的两种回收机制回收机制:

自动:使用ftlt执行一个被FastThreadLocalRunnable wrap的Runnable任务,在任务执行完毕后会自动进行ftl的清理;

手动:ftl和InternalThreadLocalMap都提供了remove方法,在合适的时候用户可以(有的时候也是必须,例如普通线程的线程池使用ftl)手动进行调用,进行显示删除;

  1. FastThreadLocalRunnable
  2. finalclassFastThreadLocalRunnableimplementsRunnable{
  3. privatefinalRunnablerunnable;
  4. @Override
  5. publicvoidrun(){
  6. try{
  7. runnable.run();
  8. }finally{
  9. FastThreadLocal.removeAll();
  10. }
  11. }
  12. staticRunnablewrap(Runnablerunnable){
  13. returnrunnableinstanceofFastThreadLocalRunnable
  14. ?runnable:newFastThreadLocalRunnable(runnable);
  15. }
  16. }

如果将线程执行的任务包装成 FastThreadLocalRunnable,那么在任务执行完后自动删除ftl的资源。

  1. ===============================FastThreadLocal===========================
  2. publicstaticvoidremoveAll(){
  3. //获取到map
  4. InternalThreadLocalMapthreadLocalMap=InternalThreadLocalMap.getIfSet();
  5. if(threadLocalMap==null){
  6. return;
  7. }
  8. try{
  9. //获取到Set<FastThreadLocal>集合
  10. Objectv=threadLocalMap.indexedVariable(variablesToRemoveIndex);
  11. if(v!=null&&v!=InternalThreadLocalMap.UNSET){
  12. @SuppressWarnings("unchecked")
  13. Set<FastThreadLocal<?>>variablesToRemove=(Set<FastThreadLocal<?>>)v;
  14. //将Set转换为数组
  15. FastThreadLocal<?>[]variablesToRemoveArray=
  16. variablesToRemove.toArray(newFastThreadLocal[variablesToRemove.size()]);
  17. //遍历数组,删除每一个FastThreadLocal对应的value
  18. for(FastThreadLocal<?>tlv:variablesToRemoveArray){
  19. tlv.remove(threadLocalMap);
  20. }
  21. }
  22. }finally{
  23. //删除当前线程的InternalThreadLocalMap
  24. InternalThreadLocalMap.remove();
  25. }
  26. }
  27. publicstaticvoidremove(){
  28. Threadthread=Thread.currentThread();
  29. if(threadinstanceofFastThreadLocalThread){
  30. //将FastThreadLocalThread内部的map置位null
  31. ((FastThreadLocalThread)thread).setThreadLocalMap(null);
  32. }else{
  33. //将ThreadLocal内部ThreadLocalMap中的value置位null
  34. slowThreadLocalMap.remove();
  35. }
  36. }

remove方法:

  1. ===============================FastThreadLocal==========================
  2. privatevoidremove(){
  3. remove(InternalThreadLocalMap.getIfSet());
  4. }
  5. privatevoidremove(InternalThreadLocalMapthreadLocalMap){
  6. if(threadLocalMap==null){
  7. return;
  8. }
  9. //从InternalThreadLocalMap中删除当前的FastThreadLocal对应的value并设UNSET
  10. Objectv=threadLocalMap.removeIndexedVariable(index);
  11. //从InternalThreadLocalMap中的Set<FastThreadLocal<?>>中删除当前的FastThreadLocal对象
  12. removeFromVariablesToRemove(threadLocalMap,this);
  13. //如果删除的是有效值,则进行onRemove方法的回调
  14. if(v!=InternalThreadLocalMap.UNSET){
  15. try{
  16. //回调子类复写的onRemoved方法,默认为空实现
  17. onRemoved((V)v);
  18. }catch(Exceptione){
  19. thrownewRuntimeException(e);
  20. }
  21. }
  22. }

总结

只有不断的学习,不断的找到自己的缺点才可以进步;

一起加油;

原文地址:https://mp.weixin.qq.com/s/WNfFAgnXHyBSbCGwo1pyJQ

收藏 (0) 打赏

感谢您的支持,我会继续努力的!

打开微信/支付宝扫一扫,即可进行扫码打赏哦,分享从这里开始,精彩与您同在
点赞 (0)

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。

快网idc优惠网 建站教程 Java高进进阶之FastThreadLocal源码详解(修复ThreadLocal的缺陷) https://www.kuaiidc.com/106318.html

相关文章

发表评论
暂无评论