Exchanger
文章目录
在一个同步点,两个线程可以在该点配对和交换对中的数据。这两个线程通过exchange()方法交换数据,当一个线程先执行exchange()方法后,它会一直等待第二个线程也执行exchange()方法,当这两个线程到达同步点时,这两个线程就可以交换数据了。
- Exchanger
- 示例
- 核心算法
- 组成
- 内部类Participant
- 内部类Node
- 成员变量
- 构造函数
- 核心方法
- 方法调用说明
Exchanger 可以看作是 SynchronousQueue 的双向形式。
示例用法:以下是一个类的亮点,它使用 Exchanger 在线程之间交换缓冲区,以便填充缓冲区的线程在需要时获得一个新清空的缓冲区,并将填充的缓冲区交给清空缓冲区的线程。
示例import java.text.SimpleDateFormat;
import java.util.Date;
import ja va.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
public class ExchangerDemo {
static class ThreadA extends Thread {
private final Exchanger exchanger;
private String data = "A";
ThreadA(Exchanger exchanger) {
this.exchanger = exchanger;
}
ThreadA(String data, Exchanger exchanger) {
this.data = data;
this.exchanger = exchanger;
}
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] " + "Before exchange ThreadA's data is " + data);
data = exchanger.exchange(data);
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] " + "After exchange ThreadA's data is " + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class ThreadB extends Thread {
private final Exchanger exchanger;
private String data = "B";
ThreadB(Exchanger exchanger) {
this.exchanger = exchanger;
}
ThreadB(String data, Exchanger exchanger) {
this.data = data;
this.exchanger = exchanger;
}
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] " + "Before exchange ThreadB's data is " + data);
data = exchanger.exchange(data);
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] " + "After exchange ThreadB's data is " + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
Exchanger exchanger = new Exchanger();
new ThreadA("A1", exchanger).start();
new ThreadB("B1", exchanger).start();
}
}
执行结果
[15:56:26] Before exchange ThreadB's data is B1
[15:56:26] Before exchange ThreadA's data is A1
[15:56:26] After exchange ThreadB's data is A1
[15:56:26] After exchange ThreadA's data is B1
核心算法
Created with Raphaël 2.3.0
Start
数据槽slot是否有值?
将item 设置到Node 中
CAS操作(将node更新item)是否成功
阻塞,等待被唤醒
被唤醒后
返回node中匹配到的item
End
CAS操作(将slot中node重置为null)是否成功
获取node中的数据
将自己的值设置到node的match
唤醒等待的线程
yes
no
yes
no
yes
no
伪代码
for (;;) {
if (slot is empty) { // offer
// slot为空时,将item 设置到Node 中
place item in a Node;
if (can CAS slot from empty to node) {
// 当将node通过CAS交换到slot中时,挂起线程等待被唤醒
wait for release;
// 被唤醒后返回node中匹配到的item
return matching item in node;
}
} else if (can CAS slot from node to empty) { // release
// 将slot设置为空
// 获取node中的item,将需要交换的数据设置到匹配的item
get the item in node;
set matching item in node;
// 唤醒等待的线程
release waiting thread;
}
// else retry on CAS failure
}
组成
内部类Participant
继承了ThreadLocal
// 对应的线程本地类--初始化中构造了Node
static final class Participant extends ThreadLocal {
public Node initialValue() { return new Node(); }
}
内部类Node
// 节点保存部分交换的数据,以及其他每个线程的标记。 通过@sun.misc.Contended 填充以减少内存争用。
@sun.misc.Contended static final class Node {
int index; // Arena index -- Arena 下标 slot插槽用
int bound; // Last recorded value of Exchanger.bound 上一次记录的Exchanger.bound
int collides; // Number of CAS failures at current bound 在当前bound下CAS失败的次数
int hash; // Pseudo-random for spins 自旋的伪随机数
Object item; // This thread's current item 线程的当前项,需要交换的数据
volatile Object match; // Item provided by releasing thread 做releasing操作的线程传递的项 释放线程提供的项
volatile Thread parked; // Set to this thread when parked, else null 阻塞时设置为该线程,否则为空
}
@sun.misc.Contended 伪共享解决方案
需要注意的是在启动jvm的时候要加入-XX:-RestrictContended
成员变量// 每线程状态
private final Participant participant;
// 默认值为null 直到出现多个参与者使用同一个交换场所时,会存在严重伸缩性问题。既然单个交换场所存在问题,那么我们就安排多个,也就是数组arena。通过数组arena来安排不同的线程使用不同的slot来降低竞争问题,并且可以保证最终一定会成对交换数据。但是Exchanger不是一来就会生成arena数组来降低竞争,只有当产生竞争是才会生成arena数组。
private volatile Node[] arena;
// 在检测到争用之前使用的插槽。
private volatile Node slot;
// 最大有效竞技场位置的索引,与高位的 SEQ 编号进行或运算,每次更新时递增。 从 0 到 SEQ 的初始更新用于确保 arena 数组只构造一次。
private volatile int bound;
// 任意两个已使用插槽之间的字节距离(作为移位值)。 1 0L)) {
// 设置线程t被当前对象阻塞
U.putObject(t, BLOCKER, this);
// 给p挂机线程的值赋值
p.parked = t;
if (slot == p)
// 如果slot还没有被置为null,也就表示暂未有线程过来交换数据,需要将当前线程挂起
U.park(false, ns);
// 线程被唤醒,将被挂起的线程设置为null
p.parked = null;
// 设置线程t未被任何对象阻塞
U.putObject(t, BLOCKER, null);
}
else if (U.compareAndSwapObject(this, SLOT, p, null)) {
// arena不为null则v为null,其它为超时则v为超市对象TIMED_OUT,并且跳出循环
v = timed && ns 1;
假设NCPU=4,NCPU >>> 1 = 2
MMASK的值为255
MMASK
关注
打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?