上文讲了Sentinel在线程池使用方面的经验。
既然有了多线程任务执行,那么就会存在并发问题,当出现并发问题时,解决方案无外乎加锁。
Sentinel中也有很多解决并发问题的方案,跟着笔者来看看吧。
1.synchronized使用
这是比较常用的一种同步方式,我们来看下Sentinel对其的使用
// ZookeeperDataSource.java
// 被锁定对象,final类型的,保证全局唯一
private static final Object lock = new Object();
private void initZookeeperListener(final String serverAddr, final List authInfos) {
try {
...
String zkKey = getZkKey(serverAddr, authInfos);
if (zkClientMap.containsKey(zkKey)) {
this.zkClient = zkClientMap.get(zkKey);
} else {
// 使用的方式在这里
// 锁定lock对象
synchronized (lock) {
if (!zkClientMap.containsKey(zkKey)) {
...
this.zkClient = zc;
this.zkClient.start();
Map newZkClientMap = new HashMap(zkClientMap.size());
newZkClientMap.putAll(zkClientMap);
newZkClientMap.put(zkKey, zc);
zkClientMap = newZkClientMap;
} else {
this.zkClient = zkClientMap.get(zkKey);
}
}
}
this.nodeCache = new NodeCache(this.zkClient, this.path);
this.nodeCache.getListenable().addListener(this.listener, this.pool);
this.nodeCache.start();
} catch (Exception e) {
RecordLog.warn("[ZookeeperDataSource] Error occurred when initializing Zookeeper data source", e);
e.printStackTrace();
}
}
2.ReentrantLock的使用
synchronized加锁方式,可以分解为:入锁和出锁。
// 入锁
synchronized(lock){
// owner code
}// 出锁
出锁之后即释放对该对象的锁定,其他线程可以竞争该对象锁,执行业务代码。
这种加锁的方式胜在简单,但是有一个缺陷就是:如果执行的业务逻辑特别耗时,那么该对象锁就一直无法释放,其他线程就一直无法获得该对象锁。
解决这个缺陷,可以使用ReentrantLock。下面来看一个Sentinel的使用:
// ClusterNode.java
// 创建锁对象
private final ReentrantLock lock = new ReentrantLock();
public Node getOrCreateOriginNode(String origin) {
StatisticNode statisticNode = originCountMap.get(origin);
if (statisticNode == null) {
try {
// 加锁
lock.lock();
statisticNode = originCountMap.get(origin);
...
} finally {
// 释放锁,注意该处,一定要在finally中释放锁,否则lock一直无法释放,其他线程就无法执行
lock.unlock();
}
}
return statisticNode;
}
刚才这个示例好像还是没有解决刚才提出的Synchronized的问题。
回头来看下ReentrantLock的其他方法:
// 获取锁,可被中断
public void lockInterruptibly() throws InterruptedException{}
// 持续不断的去获取锁
public boolean tryLock() {}
// 规定时间内没有获取锁则返回false
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {}
可以通过第三个方法来获取锁,这样就实现了规定时间内返回锁。
3.创建同步集合
开发中,集合被大量使用,而集合也是多线程并发问题高发区,如何线程安全的使用集合是一个问题。
我们来看下一个示例:
// DynamicSentinelProperty.java
protected Set listeners
= Collections.synchronizedSet(new HashSet());
这个是JDK提供的一个将常规集合转换成同步集合的方法,还有以下方法,可以看到,针对常规集合,我们都可以通过Collections来转换成同步集合。
我们可以简单的分析下这种同步的实现方式:
1)Collections.SynchronizedSet()static class SynchronizedSet
extends SynchronizedCollection
implements Set {
private static final long serialVersionUID = 487447009682186044L;
// 构造方法,直接使用SynchronizedCollection的构造方法
SynchronizedSet(Set s) {
super(s);
}
SynchronizedSet(Set s, Object mutex) {
super(s, mutex);
}
...
}
// SynchronizedCollection.java
static class SynchronizedCollection implements Collection, Serializable {
private static final long serialVersionUID = 3053995032091335093L;
// 真正实现方法的类
final Collection c; // Backing Collection
final Object mutex; // Object on which to synchronize
// 上述构造方法就是调用这个的
SynchronizedCollection(Collection c) {
this.c = Objects.requireNonNull(c);
mutex = this;
}
SynchronizedCollection(Collection c, Object mutex) {
this.c = Objects.requireNonNull(c);
this.mutex = Objects.requireNonNull(mutex);
}
// 可以看到,同步的奥秘在这
// 调用方法统统添加synchronized,内部实现还由原来的集合类实现
public int size() {
synchronized (mutex) {return c.size();}
}
public boolean isEmpty() {
synchronized (mutex) {return c.isEmpty();}
}
public boolean contains(Object o) {
synchronized (mutex) {return c.contains(o);}
}
...
总结:通过Collections创建同步集合也是一个简单的选项。
缺点就是:所有的方法都被同步了,效率低
2)jdk自带同步集合类
java.util.concurrent包下有一批Concurrent开头的集合类,这些都是jdk自实现的一些同步集合,可以保证在多线程下安全使用。之前笔者也有写过一些相关博客,读者可自行阅读。
下面来看下Sentinel的使用:
// StatisticSlotCallbackRegistry.java
public final class StatisticSlotCallbackRegistry {
private static final Map entryCallbackMap
= new ConcurrentHashMap();
...
// 具体方法使用
// 可以看到这里没有使用同步策略,由ConcurrentHashMap内部来保证线程安全
public static void clearExitCallback() {
exitCallbackMap.clear();
}
public static void addEntryCallback(String key, ProcessorSlotEntryCallback callback) {
entryCallbackMap.put(key, callback);
}
4.原子类
这个也是我们比较常用的一种同步方式,与同步集合类在相同的包路径下,具体路径是java.util.concurrent.atomic。
这里面主要都是一些有关Integer、Long的同步原子类,我们在做计数类的操作时,可以避免使用加锁的方式,直接使用原子类即可。
// NamedThreadFactory.java
public class NamedThreadFactory implements ThreadFactory {
// 线程计数
private final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
// 非线程安全的newThread方法,计数需要保持并发安全,就直接使用AtomicInteger即可
Thread t = new Thread(group, r, namePrefix + "-thread-" + threadNumber.getAndIncrement(), 0);
t.setDaemon(daemon);
return t;
}
总结:
1.使用Synchronized来加锁业务代码(最简单,使用不当也容易造成其他线程无法获取对象锁)
2.使用ReentrantLock加锁业务代码(多种加锁方法,满足用户多需求)
3.使用同步集合(Collections.SynchronizedSet()、ConcurrentHashMap等)
4.使用原子类(AtomicInteger等)