您当前的位置: 首页 > 

顧棟

暂无认证

  • 1浏览

    0关注

    227博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

【JUC系列】同步工具类之Exchanger

顧棟 发布时间:2022-05-02 06:00:00 ,浏览量:1

Exchanger

文章目录
  • Exchanger
    • 示例
    • 核心算法
    • 组成
      • 内部类Participant
      • 内部类Node
      • 成员变量
      • 构造函数
      • 核心方法
        • 方法调用说明
在一个同步点,两个线程可以在该点配对和交换对中的数据。这两个线程通过exchange()方法交换数据,当一个线程先执行exchange()方法后,它会一直等待第二个线程也执行exchange()方法,当这两个线程到达同步点时,这两个线程就可以交换数据了。

Exchanger 可以看作是 SynchronousQueue 的双向形式。

示例用法:以下是一个类的亮点,它使用 Exchanger 在线程之间交换缓冲区,以便填充缓冲区的线程在需要时获得一个新清空的缓冲区,并将填充的缓冲区交给清空缓冲区的线程。

示例
import java.text.SimpleDateFormat;
import java.util.Date;
import ja va.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;

public class ExchangerDemo {

    static class ThreadA extends Thread {
        private final Exchanger exchanger;
        private String data = "A";

        ThreadA(Exchanger exchanger) {
            this.exchanger = exchanger;
        }

        ThreadA(String data, Exchanger exchanger) {
            this.data = data;
            this.exchanger = exchanger;
        }

        @Override
        public void run() {
            try {
                TimeUnit.SECONDS.sleep(1);
                System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] " + "Before exchange ThreadA's data is " + data);
                data = exchanger.exchange(data);
                System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] " + "After exchange ThreadA's data is " + data);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class ThreadB extends Thread {
        private final Exchanger exchanger;
        private String data = "B";

        ThreadB(Exchanger exchanger) {
            this.exchanger = exchanger;
        }

        ThreadB(String data, Exchanger exchanger) {
            this.data = data;
            this.exchanger = exchanger;
        }

        @Override
        public void run() {
            try {
                TimeUnit.SECONDS.sleep(1);
                System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] " + "Before exchange ThreadB's data is " + data);
                data = exchanger.exchange(data);
                System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] " + "After exchange ThreadB's data is " + data);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Exchanger exchanger = new Exchanger();
        new ThreadA("A1", exchanger).start();
        new ThreadB("B1", exchanger).start();
    }
}

执行结果

[15:56:26] Before exchange ThreadB's data is B1
[15:56:26] Before exchange ThreadA's data is A1
[15:56:26] After exchange ThreadB's data is A1
[15:56:26] After exchange ThreadA's data is B1
核心算法
Created with Raphaël 2.3.0 Start 数据槽slot是否有值? 将item 设置到Node 中 CAS操作(将node更新item)是否成功 阻塞,等待被唤醒 被唤醒后 返回node中匹配到的item End CAS操作(将slot中node重置为null)是否成功 获取node中的数据 将自己的值设置到node的match 唤醒等待的线程 yes no yes no yes no

伪代码

for (;;) {
    if (slot is empty) { // offer
        // slot为空时,将item 设置到Node 中        
        place item in a Node;
        if (can CAS slot from empty to node) {
            // 当将node通过CAS交换到slot中时,挂起线程等待被唤醒
            wait for release;
            // 被唤醒后返回node中匹配到的item
            return matching item in node;
        }
    } else if (can CAS slot from node to empty) { // release
         // 将slot设置为空
        // 获取node中的item,将需要交换的数据设置到匹配的item
        get the item in node;
        set matching item in node;
        // 唤醒等待的线程
        release waiting thread;
    }
    // else retry on CAS failure
}
组成 内部类Participant

继承了ThreadLocal

    // 对应的线程本地类--初始化中构造了Node
    static final class Participant extends ThreadLocal {
        public Node initialValue() { return new Node(); }
    }
内部类Node
    // 节点保存部分交换的数据,以及其他每个线程的标记。 通过@sun.misc.Contended 填充以减少内存争用。
	@sun.misc.Contended static final class Node {
        int index;              // Arena index -- Arena 下标 slot插槽用
        int bound;              // Last recorded value of Exchanger.bound 上一次记录的Exchanger.bound
        int collides;           // Number of CAS failures at current bound 在当前bound下CAS失败的次数
        int hash;               // Pseudo-random for spins 自旋的伪随机数
        Object item;            // This thread's current item 线程的当前项,需要交换的数据
        volatile Object match;  // Item provided by releasing thread 做releasing操作的线程传递的项 释放线程提供的项
        volatile Thread parked; // Set to this thread when parked, else null 阻塞时设置为该线程,否则为空
    }

@sun.misc.Contended 伪共享解决方案

需要注意的是在启动jvm的时候要加入-XX:-RestrictContended

成员变量
// 每线程状态
private final Participant participant;
// 默认值为null 直到出现多个参与者使用同一个交换场所时,会存在严重伸缩性问题。既然单个交换场所存在问题,那么我们就安排多个,也就是数组arena。通过数组arena来安排不同的线程使用不同的slot来降低竞争问题,并且可以保证最终一定会成对交换数据。但是Exchanger不是一来就会生成arena数组来降低竞争,只有当产生竞争是才会生成arena数组。
private volatile Node[] arena;
// 在检测到争用之前使用的插槽。
private volatile Node slot;
// 最大有效竞技场位置的索引,与高位的 SEQ 编号进行或运算,每次更新时递增。 从 0 到 SEQ 的初始更新用于确保 arena 数组只构造一次。
private volatile int bound;

// 任意两个已使用插槽之间的字节距离(作为移位值)。 1  0L)) {
                // 设置线程t被当前对象阻塞
                U.putObject(t, BLOCKER, this);
                // 给p挂机线程的值赋值
                p.parked = t;
                if (slot == p)
                    // 如果slot还没有被置为null,也就表示暂未有线程过来交换数据,需要将当前线程挂起
                    U.park(false, ns);
                // 线程被唤醒,将被挂起的线程设置为null
                p.parked = null;
                // 设置线程t未被任何对象阻塞
                U.putObject(t, BLOCKER, null);
            }
            else if (U.compareAndSwapObject(this, SLOT, p, null)) {
                // arena不为null则v为null,其它为超时则v为超市对象TIMED_OUT,并且跳出循环
                v = timed && ns  1;

假设NCPU=4,NCPU >>> 1 = 2
MMASK的值为255
MMASK             
关注
打赏
1663402667
查看更多评论
0.0414s