您当前的位置: 首页 > 

恐龙弟旺仔

暂无认证

  • 0浏览

    0关注

    282博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Netty之FastThreadLocal

恐龙弟旺仔 发布时间:2021-09-28 18:40:36 ,浏览量:0

前言:

    在学习Netty的时候,总有很多之前从没接触过的知识点。比如今天要分享的主题FastThreadLocal一样。从名字看来,应该跟JDK的ThreadLocal差不多,但是多了一个Fast,那么相对ThreadLocal而言,它的存储及获取应该会更快点。

    话不多少,我们先来复习一下ThreadLocal的基本使用,然后从源码角度来解析下,最后再来分析下FastThreadLocal的过人之处。

1.ThreadLocal
// This class provides thread-local variables
public class ThreadLocal {}

从其注解我们可以了解到,ThreadLocal本质上就是用来存储线程的私有变量。线程私有的含义就是:Thread在ThreadLocal中存储的变量在当前Thread中是共享的,在不同Thread之间是非共享的,不同线程之间无法通过ThreadLocal互相查询变量信息。

1.1 ThreadLocal的使用

    我们可以通过ThreadLocal存储线程变量,下面提供一个示例

public void test1() {
    // 这里创建两个ThreadLocal
    final ThreadLocal threadLocal_1 = new ThreadLocal();
    final ThreadLocal threadLocal_2 = new ThreadLocal();

    // 创建两个Thread
    Thread thread1 = new Thread(new Runnable() {
        @Override
        public void run() {

            // 在线程内部调用ThreadLocal相关方法,先存储后获取
            try {
                threadLocal_1.set(Thread.currentThread().getName() + "_threadlocal_1");
                threadLocal_2.set(Thread.currentThread().getName() + "_threadlocal_2");

                System.out.println(threadLocal_1.get());
                System.out.println(threadLocal_2.get());
            } finally {
                // 注意在这里,run方法执行完成后,调用ThreadLocal.remove()方法
                threadLocal_1.remove();
                threadLocal_2.remove();
            }
        }
    }, "thread1");


    Thread thread2 = new Thread(new Runnable() {
        @Override
        public void run() {

            try {
                    threadLocal_1.set(Thread.currentThread().getName() + "_threadlocal_1");
                    threadLocal_2.set(Thread.currentThread().getName() + "_threadlocal_2");

                    System.out.println(threadLocal_1.get());
                    System.out.println(threadLocal_2.get());
                } finally {
                    threadLocal_1.remove();
                    threadLocal_2.remove();
                }
        }
    }, "thread2");

    thread1.start();
    thread2.start();

    try {
        thread1.join();
        thread2.join();

        System.out.println("mainThread " + threadLocal_1.get());
        System.out.println("mainThread " + threadLocal_2.get());

    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

// res:
thread1_threadlocal_1
thread1_threadlocal_2
thread2_threadlocal_1
thread2_threadlocal_2
mainThread null
mainThread null

    示例比较简单,创建两个ThreadLocal,再创建两个Thread,Thread的run方法中先在ThreadLocal中设置属性,后续再从ThreadLocal中获取属性。

从结果中我们可以得出以下几个结论

* 线程可以操作多个ThreadLocal,单个线程在每个ThreadLocal中设置的属性互不影响。当我们在线程中有多个变量需要被存储使用时可以考虑创建多个ThreadLocal;

* 一个ThreadLocal也可以被多个线程所使用,单个ThreadLocal中每个线程设置的属性互不影响;

1.2 ThreadLocal源码解析

    我们直接从方法开始分析

1.2.1 ThreadLocal.set()

public class ThreadLocal {
 
    public void set(T value) {
        // 获取当前线程
        Thread t = Thread.currentThread();
        // 本质上是获取Thread.threadLocals属性,具体见1.2.3
        ThreadLocalMap map = getMap(t);
        // 所以最终还是操作ThreadLocalMap,见1.2.1.1
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
    }
    
    ThreadLocalMap getMap(Thread t) {
        return t.threadLocals;
    }
}

1.2.2 ThreadLocalMap.set() 设置线程变量值

static class ThreadLocalMap {
	private void set(ThreadLocal key, Object value) {

        Entry[] tab = table;
        int len = tab.length;
        // 通过hashcode获取key的对应位置,此时的key为ThreadLocal对象
        int i = key.threadLocalHashCode & (len-1);

        for (Entry e = tab[i];
             e != null;
             e = tab[i = nextIndex(i, len)]) {
            ThreadLocal k = e.get();

            // 如果两次的key相同,则将value值重置
            // 什么样的操作会使两次的key相同呢?我们在一个线程中多次调用同一个ThreadLocal的set()方法,则最后一次的调用存放值会覆盖所有之前的值
            if (k == key) {
                e.value = value;
                return;
            }

            // 获取到key为null时,则直接存放该键值对对应的Entry对象
            // 此时还会清理下Key=null的旧数据
            // key为甚为null了呢?因为key所对应的ThreadLocal被GC掉了,具体在1.2.4中分析
            if (k == null) {
                replaceStaleEntry(key, value, i);
                return;
            }
        }

        // 什么时候跳出循环执行到当前呢,就是从i开始,获取entry[i++]时得到的Entry为空时则跳出循环执行到当前,
        tab[i] = new Entry(key, value);
        int sz = ++size;
        if (!cleanSomeSlots(i, sz) && sz >= threshold)
            rehash();
    }
    
    private static int nextIndex(int i, int len) {
        return ((i + 1 < len) ? i + 1 : 0);
    }
}

代码很有意思,为了给新的Entry找一个合适的位置,利用hash & (len -1)得到的i值开始慢慢向后找,如果当前i位置有Entry对象,说明发生了冲突,如果key相同则覆盖,如果key为null,则清除原value数据,并将新的Entry放置到当前i位置,否则的话,则继续向后寻找,一直到找到空的位置为止。

相对于HashMap的Hash冲突处理策略而言,ThreadLocal使用的是线性探测法,而HashMap使用的是拉链法

如果空位都被占满了怎么办?那么就执行rehash()进行扩容操作。

1.2.3 ThreadLocalMap.get() 获取线程变量值

static class ThreadLocalMap {
	public T get() {
        Thread t = Thread.currentThread();
        // 获取当前线程的ThreadLocalMap
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            // 根据Entry.key,也就是当前ThreadLocal对象获取具体Entry信息,后续从Entry中直接获取value值
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        return setInitialValue();
    }
}

1.2.4 ThreadLocal.remove() 删除变量值 

public class ThreadLocal {
	public void remove() {
         ThreadLocalMap m = getMap(Thread.currentThread());
         if (m != null)
             // 调用ThreadLocalMap.remove()
             m.remove(this);
     }
}

static class ThreadLocalMap {
    private void remove(ThreadLocal key) {
        Entry[] tab = table;
        int len = tab.length;
        // 通过hash获取key的index位置
        int i = key.threadLocalHashCode & (len-1);
        for (Entry e = tab[i];
             e != null;
             e = tab[i = nextIndex(i, len)]) {
            if (e.get() == key) {
                // 调用Entry.clear()清除值
                // 这里的实现比较好玩,具体看1.2.4.1
                e.clear();
                // key删除后,将其value同样置为null
                expungeStaleEntry(i);
                return;
            }
        }
    }
}

remove方法用来删除我们在ThreadLocal中设置的参数,也是通过key获取在Entry[]中的具体index后,调用Entry.clear()方法进行清除。

1.2.4.1 Entry.clear()

// Entry继承了WeakReference
static class Entry extends WeakReference> {
        /** The value associated with this ThreadLocal. */
        Object value;

        Entry(ThreadLocal k, Object v) {
            super(k);
            value = v;
        }
    }
    // 最终将数据存在该
    private Entry[] table;
}

源码并不算复杂,我们可以得到如下结论:

* 每个Thread都有一个独立的ThreadLocalMap,用来存放线程变量

* ThreadLocalMap中使用Entry[]来存放变量值,Entry的key为ThreadLocal,故一个线程可以操作多个ThreadLocal,在各个ThreadLocal中存放不同的变量值信息

* 在同一个Thread中多次调用同一个ThreadLocal.set()方法设置值时,只会最后一次生效,之前的值都会被覆盖掉

* 一个ThreadLocal可供多个线程调用,多个线程设置的值互不影响

* ThreadLocalMap中的Entry是以线性探测的方式来寻找合适的index来插入的

2.FastThreadLocal

    既然有了ThreadLocal,为什么Netty还会造一个新轮子呢?看名字,目测应该比ThreadLocal快一些。具体是怎样的呢,我们一起来看看

2.1 FastThreadLocal使用示例

    实际FastThreadLocal的使用和ThreadLocal的使用是一样的,API没有任何差别,这就是Netty的高明之处,新造了一个轮子,对用户而言,使用没有任何差别

@Test
public void test2() {
    final FastThreadLocal threadLocal_1 = new FastThreadLocal();
    final FastThreadLocal threadLocal_2 = new FastThreadLocal();

    Thread thread1 = new FastThreadLocalThread(new Runnable() {
        @Override
        public void run() {
            try {
                threadLocal_1.set(Thread.currentThread().getName() + "_threadlocal_1");
                threadLocal_2.set(Thread.currentThread().getName() + "_threadlocal_2");

                System.out.println(threadLocal_1.get());
                System.out.println(threadLocal_2.get());
            } finally {
                threadLocal_1.remove();
                threadLocal_2.remove();
            }
        }
    }, "thread1");


    Thread thread2 = new FastThreadLocalThread(new Runnable() {
        @Override
        public void run() {
            try {
                threadLocal_1.set(Thread.currentThread().getName() + "_threadlocal_1");
                threadLocal_2.set(Thread.currentThread().getName() + "_threadlocal_2");

                System.out.println(threadLocal_1.get());
                System.out.println(threadLocal_2.get());
            } finally {
                threadLocal_1.remove();
                threadLocal_2.remove();
            }
        }
    }, "thread2");

    thread1.start();
    thread2.start();

    try {
        thread1.join();
        thread2.join();

        System.out.println("mainThread " + threadLocal_1.get());
        System.out.println("mainThread " + threadLocal_2.get());

    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
// res
thread1_threadlocal_1
thread2_threadlocal_1
thread2_threadlocal_2
thread1_threadlocal_2
mainThread null
mainThread null

可以看到,在使用方面,FastThreadLocal与ThreadLocal没有任何区别,就是在创建线程时需要使用特定的FastThreadLocalThread。

2.2 FastThreadLocal与ThreadLocal结构对应分析

    ThreadLocal各对象之间的关系很明朗:Thread -->[1:1] ThreadLocalMap --> [1:N]Entry[]

    综合通过ThreadLocal来set/get各线程变量信息

    而FastThreadLocal与其结构基本类似: FastThreadLocalThread -->[1:1] InternalThreadLocalMap --> [1:N] Object[]

    综合通过FastThreadLocal来set/get个线程变量信息

    既然结构是一致的,那我们直接分析set/get方法

2.3 FastThreadLocal方法分析

2.3.1 FastThreadLocal.set

public class FastThreadLocal {
    // 不同于ThreadLocal,使用hash的方式来确定index,FastThreadLocal直接使用Integer递增的方式来获取index,按顺序添加即可
    private final int index;
    public FastThreadLocal() {
        // 这里就是AtomicInteger的递增操作
        index = InternalThreadLocalMap.nextVariableIndex();
    }
    
    public final void set(V value) {
        // UNSET对象是什么呢?我们可以参考下InternalThreadLocalMap的构造方法,具体见2.4
        // 如果不等于UNSET对象,说明非默认填充值,是需要保存下来的val值
        if (value != InternalThreadLocalMap.UNSET) {
            // 获取到当前线程对应的InternalThreadLocalMap,具体内容见2.4.1
            InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
            // 具体方法内容见下面
            setKnownNotUnset(threadLocalMap, value);
        } else {
            remove();
        }
    }
}

// setKnownNotUnset
private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
    // 设置对应index位置的值为value,这里寻找index的方式与ThreadLocal有所不同
    if (threadLocalMap.setIndexedVariable(index, value)) {
        addToVariablesToRemove(threadLocalMap, this);
    }
}

总结:针对FastThreadLocal的set方法,寻找插入位置的方式不同于ThreadLocal的哈希寻找的方式,其直接使用递增的index来逐个添加到Object[]中

2.3.2 FastThreadLocal.get()

public class FastThreadLocal {
	public final V get() {
        // 获取当前线程的InternalThreadLocalMap
        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
        // 直接获取对应index位置的value值返回
        Object v = threadLocalMap.indexedVariable(index);
        if (v != InternalThreadLocalMap.UNSET) {
            return (V) v;
        }

        // 如果还没有设置value值,则获取初始值
        return initialize(threadLocalMap);
    }
}

get()方法比较简单,就是通过获取Object[]对应index位置的value值即可。

2.3.3 FastThreadLocal.remove() 删除变量

public class FastThreadLocal {
	public static void remove() {
        Thread thread = Thread.currentThread();
        if (thread instanceof FastThreadLocalThread) {
            // 比较狠了,直接将Thread的threadLocalMap整个置空了
            ((FastThreadLocalThread) thread).setThreadLocalMap(null);
        } else {
            slowThreadLocalMap.remove();
        }
    }
}

那么有个小问题,我们在使用完FastThreadLocal后,还需要像ThreadLocal的使用那样,主动在线程的run方法之后调用ThreadLocal.remove()方法吗?

答案是:不需要的

我们来看下FastThreadLocalThread的构造方法就明白了

public class FastThreadLocalThread extends Thread {
	public FastThreadLocalThread(Runnable target) {
        super(FastThreadLocalRunnable.wrap(target));
        cleanupFastThreadLocals = true;
    }
}

final class FastThreadLocalRunnable implements Runnable {
    private final Runnable runnable;
    @Override
    public void run() {
        try {
            runnable.run();
        } finally {
            // 在这个run方法的后面,主动调用了删除方法,所以只要我们使用了FastThreadLocalThread进行封装,就不需要再次调用remove方法删除
            FastThreadLocal.removeAll();
        }
    }

	static Runnable wrap(Runnable runnable) {
        return runnable instanceof FastThreadLocalRunnable ? runnable : new FastThreadLocalRunnable(runnable);
    }
}
2.4 InternalThreadLocalMap基本分析

    InternalThreadLocalMap是线程参数存储的容器,

public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {
    
    public static final Object UNSET = new Object();
    private static final int INDEXED_VARIABLE_TABLE_INITIAL_SIZE = 32;
    
    // 私有构造方法
    private InternalThreadLocalMap() {
        super(newIndexedVariableTable());
    }

    private static Object[] newIndexedVariableTable() {
        // 默认创建一个长度为32的对象数组,并使用UNSET空对象填充
        Object[] array = new Object[INDEXED_VARIABLE_TABLE_INITIAL_SIZE];
        Arrays.fill(array, UNSET);
        return array;
    }
}

class UnpaddedInternalThreadLocalMap {
    // 存放线程数据的为当前这个Object[]
    Object[] indexedVariables;
	UnpaddedInternalThreadLocalMap(Object[] indexedVariables) {
        this.indexedVariables = indexedVariables;
    }
}

通过InternalThreadLocalMap的构造方法可以看出,线程的变量信息存储到一个Object[]中,默认长度为32,并使用UNSET对象进行填充

2.4.1 InternalThreadLocalMap.get()

public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {
    public static InternalThreadLocalMap get() {
        Thread thread = Thread.currentThread();
        // 这就是使用FastThreadLocalThread的重要性,如果使用的不是FastThreadLocalThread,则会退化到ThreadLocal的那种方式
        if (thread instanceof FastThreadLocalThread) {
            return fastGet((FastThreadLocalThread) thread);
        } else {
            return slowGet();
        }
    }
    
    private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
        // 直接返回FastThreadLocalThread的InternalThreadLocalMap属性,如果为空,则主动创建一个,懒加载方式
        InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
        if (threadLocalMap == null) {
            thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
        }
        return threadLocalMap;
    }
}

2.4.2 InternalThreadLocalMap.setIndexedVariable() 获取对应index,并设置value值

public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {
    public boolean setIndexedVariable(int index, Object value) {
        Object[] lookup = indexedVariables;
        // 如果不超过数组最大值,则直接将value值添加到当前index位置
        if (index < lookup.length) {
            Object oldValue = lookup[index];
            lookup[index] = value;
            return oldValue == UNSET;
        } else {
            // 已经超过的话,则直接扩容
            expandIndexedVariableTableAndSet(index, value);
            return true;
        }
    }
}

参考:

    有关于ThreadLocal中 根据hashCode寻找位置的相关算法(魔数0x61c88647)的文章可以查看 ThreadLocal终极源码剖析-一篇足矣! - 只会一点java - 博客园 

    还有其他类型的ThreadLocal,可以实现更多功能,比如父子线程之间的数据访问等可以查看 全链路压测必备基础组件之线程上下文管理之“三剑客”_prestigeding的博客-CSDN博客 

关注
打赏
1655041699
查看更多评论
立即登录/注册

微信扫码登录

0.0823s