前言
ThreadLocal被ThreadLocalMap中的entry的key弱引用,如果出现GC的情况时,
没有被其他对象引用,会被回收,但是ThreadLocal对应的value却不会回收,容易造成内存泄漏,这也间接导致了内存溢出以及数据假丢失;
那么问题来了,有没有更高效的ThreadLocal有;
今天我们就来分析一波FastThreadLocalThread
一、FastThreadLocalThread源码分析
Netty为了在某些场景下提高性能,改进了jdk ThreadLocal,Netty实现的FastThreadLocal 优化了Java 原生 ThreadLocal 的访问速度,存储速度。避免了检测弱引用带来的 value 回收难问题,和数组位置冲突带来的线性查找问题,解决这些问题并不是没有代价;
Netty实现的 FastThreadLocal 底层也是通过数组存储 value 对象,与Java原生ThreadLocal使用自身作为Entry的key不同,FastThreadLocal通过保存数组的全局唯一下标,实现了对value的快速访问。同时FastThreadLocal 也实现了清理对象的方法;
1、FastThreadLocalThread
在Netty中,要使用 FastThreadLocal 实现线程本地变量需要将线程包装成 FastThreadLocalThread ,如果不是 FastThreadLocalThread ,会使用 slowThreadLocalMap的 ThreadLocal 来存储变量副本;
- io.netty.util.concurrent.DefaultThreadFactory
- @Override
- publicThreadnewThread(Runnabler){
- Threadt=newThread(FastThreadLocalRunnable.wrap(r),prefix+nextId.incrementAndGet());
- //一般daemon为false,意思是不设置为守护线程
- if(t.isDaemon()!=daemon){
- t.setDaemon(daemon);
- }
- //优先级默认为5
- if(t.getPriority()!=priority){
- t.setPriority(priority);
- }
- returnt;
- }
- protectedThreadnewThread(Runnabler,Stringname){
- returnnewFastThreadLocalThread(threadGroup,r,name);
- }
FastThreadLocalThread 继承自Thread类,有如下成员变量:
- io.netty.util.concurrent.FastThreadLocalThread
- //任务执行完,是否清除FastThreadLocal的标记
- privatefinalbooleancleanupFastThreadLocals;
- //类似于Thread类中ThreadLocalMap,为了实现FastThreadLocal
- privateInternalThreadLocalMapthreadLocalMap;
2、 InternalThreadLocalMap
FastThreadLocalThread.threadLocalMap 是 InternalThreadLocalMap 对象实例。在第一次获取FTL数据时,会初始化FastThreadLocalThread.threadLocalMap,调用的构造函数如下:
- privateInternalThreadLocalMap(){
- //为了简便,InternalThreadLocalMap父类
- //UnpaddedInternalThreadLocalMap不展开介绍
- super(newIndexedVariableTable());
- }
- //默认的数组大小为32,且使用UNSET对象填充数组
- //如果下标处数据为UNSET,则表示没有数据
- privatestaticObject[]newIndexedVariableTable(){
- Object[]array=newObject[32];
- Arrays.fill(array,UNSET);
- returnarray;
- }
为了避免写时候影响同一cpu缓冲行的其他数据并发访问,其使用了缓存行填充技术 (cpu 缓冲行填充),在类定义中声明了如下long字段进行填充;
- //InternalThreadLocalMap
- //Cachelinepadding(mustbepublic)
- //WithCompressedOopsenabled,aninstanceofthisclassshouldoccupyatleast128bytes.
- publiclongrp1,rp2,rp3,rp4,rp5,rp6,rp7,rp8,rp9;
FTL使用的数组下标是InternalThreadLocalMap中的静态变量nextIndex统一递增生成的:
- staticfinalAtomicIntegernextIndex=newAtomicInteger();
- publicstaticintnextVariableIndex(){
- //Netty中所有FTL数组下标都是通过递增这个静态变量实现的
- //采用静态变量生成所有FTL元素在数组中的下标会造成一个问题,
- //会造成InternalThreadLocalMap中数组不必要的自动扩容
- intindex=nextIndex.getAndIncrement();
- if(index<0){
- nextIndex.decrementAndGet();
- thrownewIllegalStateException("toomanythread-localindexedvariables");
- }
- returnindex;
- }
InternalThreadLocalMap.nextVariableIndex()方法获取FTL在该FastThreadLocalThread.threadLocalMap数组下标,因为InternalThreadLocalMap.nextVariableIndex() 使用静态域 nextIndex 递增维护所有FTL的下标,会造成后面实例化的 FTL 下标过大,如果FTL下标大于其对应 FastThreadLocalThread.threadLocalMap 数组的长度,会进行数组的自动扩容,如下:
- privatevoidexpandIndexedVariableTableAndSet(intindex,Objectvalue){
- Object[]oldArray=indexedVariables;
- finalintoldCapacity=oldArray.length;
- //下面复杂的实现是为了将newCapacity规范为最接近的一个2的指数,
- //这段代码在早期的jdkHashMap中见过
- intnewCapacity=index;
- newCapacity|=newCapacity>>>1;
- newCapacity|=newCapacity>>>2;
- newCapacity|=newCapacity>>>4;
- newCapacity|=newCapacity>>>8;
- newCapacity|=newCapacity>>>16;
- newCapacity++;
- Object[]newArray=Arrays.copyOf(oldArray,newCapacity);
- Arrays.fill(newArray,oldCapacity,newArray.length,UNSET);
- newArray[index]=value;
- indexedVariables=newArray;
- }
3、FastThreadLocal
构造函数:
有两个重要的下标域,FTL不仅在FastThreadLocalThread.threadLocalMap中保存了用户实际使用的value(在数组中的下标为index),还在数组中保存为了实现清理记录的相关数据,也即下标variablesToRemoveIndex,一般情况 variablesToRemoveIndex = 0;因为variablesToRemoveIndex 是静态变量,所以全局唯一;
- //如果在该FTL中放入了数据,也就实际调用了其set或get函数,会在
- //该FastThreadLocalThread.threadLocalMap数组的
- //variablesToRemoveIndex下标处放置一个IdentityHashMap,
- //并将该FTL放入IdentityHashMap中,在后续清理时会取出
- //variablesToRemoveIndex下标处的IdentityHashMap进行清理
- privatestaticfinalintvariablesToRemoveIndex=InternalThreadLocalMap.nextVariableIndex();
- //在threadLocalMap数组中存放实际数据的下标
- privatefinalintindex;
- publicFastThreadLocal(){
- index=InternalThreadLocalMap.nextVariableIndex();
- }
用户可扩展的函数:
- //初始化value函数
- protectedVinitialValue()throwsException{
- returnnull;
- }
- //让使用者在该FTL被移除时可以有机会做些操作。
- protectedvoidonRemoval(@SuppressWarnings("UnusedParameters")Vvalue)throwsException{}
FastThreadLocalThread
cleanupFastThreadLocals 字段在 4.1 的最新版本中已经没有在用到了
- /**
- *true,表示FTL会在线程结束时被主动清理见FastThreadLocalRunnable类
- *false,需要将FTL放入后台清理线程的队列中
- */
- //ThiswillbesettotrueifwehaveachancetowraptheRunnable.
- //这个字段则用于标识该线程在结束时是否会主动清理FTL
- privatefinalbooleancleanupFastThreadLocals;
- //次对象将在第一次FastThreadLocal.get和FastThreadLocal.set时候创建
- privateInternalThreadLocalMapthreadLocalMap;
- publicFastThreadLocalThread(Runnabletarget){
- super(FastThreadLocalRunnable.wrap(target));
- cleanupFastThreadLocals=true;
- }
4、 set 方法
- publicfinalvoidset(Vvalue){
- //判断设置的value值是否是缺省值
- if(value!=InternalThreadLocalMap.UNSET){
- //获取当前线程的InternalThreadLocalMap,如果当前线程为FastThreadLocalThread,那么直接通过threadLocalMap引用获取
- //否则通过jdk原生的threadLocal获取
- InternalThreadLocalMapthreadLocalMap=InternalThreadLocalMap.get();
- //FastThreadLocal对应的index下标的value替换成新的value
- setKnownNotUnset(threadLocalMap,value);
- }else{
- //如果放置的对象为UNSET,则表示清理,会对该FTL进行清理,类似毒丸对象
- remove();
- }
- }
这里扩容方会调用 InternalThreadLocalMap.expandIndexedVariableTableAndSet
- privatevoidsetKnownNotUnset(InternalThreadLocalMapthreadLocalMap,Vvalue){
- //在数组下标index处放置实际对象,如果index大于数组length,会进行数组扩容.
- if(threadLocalMap.setIndexedVariable(index,value)){
- //放置成功之后,将该FTL加入到variablesToRemoveIndex下标的
- //IdentityHashMap,等待后续清理
- addToVariablesToRemove(threadLocalMap,this);
- }
- }
- /**
- *该FTL加入到variablesToRemoveIndex下标的IdentityHashMap
- *IdentityHashMap的特性可以保证同一个实例不会被多次加入到该位置
- */
- @SuppressWarnings("unchecked")
- privatestaticvoidaddToVariablesToRemove(InternalThreadLocalMapthreadLocalMap,FastThreadLocal<?>variable){
- //获取variablesToRemoveIndex下标处的IdentityHashMap
- Objectv=threadLocalMap.indexedVariable(variablesToRemoveIndex);
- Set<FastThreadLocal<?>>variablesToRemove;
- //如果是第一次获取,则variablesToRemoveIndex下标处的值为UNSET
- if(v==InternalThreadLocalMap.UNSET||v==null){
- //新建一个新的IdentityHashMap并
- variablesToRemove=Collections.newSetFromMap(newIdentityHashMap<FastThreadLocal<?>,Boolean>());
- //放入到下标variablesToRemoveIndex处
- threadLocalMap.setIndexedVariable(variablesToRemoveIndex,variablesToRemove);
- }else{
- variablesToRemove=(Set<FastThreadLocal<?>>)v;
- }
- //将该FTL放入该IdentityHashMap中
- variablesToRemove.add(variable);
- }
下面看InternalThreadLocalMap.get()实现:
- publicstaticInternalThreadLocalMapget(){
- Threadthread=Thread.currentThread();
- //首先看当前thread是否为FastThreadLocalThread实例
- //如果是的话,可以快速通过引用,获取到其threadLocalMap
- if(threadinstanceofFastThreadLocalThread){
- returnfastGet((FastThreadLocalThread)thread);
- }else{
- //如果不是,则jdk原生慢速获取到其threadLocalMap
- returnslowGet();
- }
- }
5、 get 方法
get方法极为简单,实现如下:
- ===========================FastThreadLocal==========================
- publicfinalVget(){
- InternalThreadLocalMapthreadLocalMap=InternalThreadLocalMap.get();
- Objectv=threadLocalMap.indexedVariable(index);
- if(v!=InternalThreadLocalMap.UNSET){
- return(V)v;
- }
- returninitialize(threadLocalMap);
- }
首先获取当前线程的map,然后根据 FastThreadLocal的index 获取value,然后返回,如果是空对象,则通过 initialize 返回,initialize 方法会将返回值设置到 map 的槽位中,并放进 Set 中;
- initialize
- ============================FastThreadLocal==========================
- privateVinitialize(InternalThreadLocalMapthreadLocalMap){
- Vv=null;
- try{
- //1、获取初始值
- v=initialValue();
- }catch(Exceptione){
- thrownewRuntimeException(e);
- }
- //2、设置value到InternalThreadLocalMap中
- threadLocalMap.setIndexedVariables(index,v);
- //3、添加当前的FastThreadLocal到InternalThreadLocalMap的Set<FastThreadLocal<?>>中
- addToVariablesToRemove(threadLocalMap,this);
- returnv;
- }
- //初始化参数:由子类复写
- protectedVinitialValue()throwsException{
- returnnull;
- }
- 获取 ThreadLocalMap
- 直接通过索引取出对象
- 如果为空那么调用初始化方法初始化
6、ftl的资源回收机制
netty中ftl的两种回收机制回收机制:
自动:使用ftlt执行一个被FastThreadLocalRunnable wrap的Runnable任务,在任务执行完毕后会自动进行ftl的清理;
手动:ftl和InternalThreadLocalMap都提供了remove方法,在合适的时候用户可以(有的时候也是必须,例如普通线程的线程池使用ftl)手动进行调用,进行显示删除;
- FastThreadLocalRunnable
- finalclassFastThreadLocalRunnableimplementsRunnable{
- privatefinalRunnablerunnable;
- @Override
- publicvoidrun(){
- try{
- runnable.run();
- }finally{
- FastThreadLocal.removeAll();
- }
- }
- staticRunnablewrap(Runnablerunnable){
- returnrunnableinstanceofFastThreadLocalRunnable
- ?runnable:newFastThreadLocalRunnable(runnable);
- }
- }
如果将线程执行的任务包装成 FastThreadLocalRunnable,那么在任务执行完后自动删除ftl的资源。
- ===============================FastThreadLocal===========================
- publicstaticvoidremoveAll(){
- //获取到map
- InternalThreadLocalMapthreadLocalMap=InternalThreadLocalMap.getIfSet();
- if(threadLocalMap==null){
- return;
- }
- try{
- //获取到Set<FastThreadLocal>集合
- Objectv=threadLocalMap.indexedVariable(variablesToRemoveIndex);
- if(v!=null&&v!=InternalThreadLocalMap.UNSET){
- @SuppressWarnings("unchecked")
- Set<FastThreadLocal<?>>variablesToRemove=(Set<FastThreadLocal<?>>)v;
- //将Set转换为数组
- FastThreadLocal<?>[]variablesToRemoveArray=
- variablesToRemove.toArray(newFastThreadLocal[variablesToRemove.size()]);
- //遍历数组,删除每一个FastThreadLocal对应的value
- for(FastThreadLocal<?>tlv:variablesToRemoveArray){
- tlv.remove(threadLocalMap);
- }
- }
- }finally{
- //删除当前线程的InternalThreadLocalMap
- InternalThreadLocalMap.remove();
- }
- }
- publicstaticvoidremove(){
- Threadthread=Thread.currentThread();
- if(threadinstanceofFastThreadLocalThread){
- //将FastThreadLocalThread内部的map置位null
- ((FastThreadLocalThread)thread).setThreadLocalMap(null);
- }else{
- //将ThreadLocal内部ThreadLocalMap中的value置位null
- slowThreadLocalMap.remove();
- }
- }
remove方法:
- ===============================FastThreadLocal==========================
- privatevoidremove(){
- remove(InternalThreadLocalMap.getIfSet());
- }
- privatevoidremove(InternalThreadLocalMapthreadLocalMap){
- if(threadLocalMap==null){
- return;
- }
- //从InternalThreadLocalMap中删除当前的FastThreadLocal对应的value并设UNSET
- Objectv=threadLocalMap.removeIndexedVariable(index);
- //从InternalThreadLocalMap中的Set<FastThreadLocal<?>>中删除当前的FastThreadLocal对象
- removeFromVariablesToRemove(threadLocalMap,this);
- //如果删除的是有效值,则进行onRemove方法的回调
- if(v!=InternalThreadLocalMap.UNSET){
- try{
- //回调子类复写的onRemoved方法,默认为空实现
- onRemoved((V)v);
- }catch(Exceptione){
- thrownewRuntimeException(e);
- }
- }
- }
总结
只有不断的学习,不断的找到自己的缺点才可以进步;
一起加油;
原文地址:https://mp.weixin.qq.com/s/WNfFAgnXHyBSbCGwo1pyJQ




