JAVA 并发编程原理与实战

用户头像
Geek_53983e
关注
发布于: 2020 年 12 月 29 日

1. volatile

在多处理器开发中保证了共享变量的“可见性”。可见性的意思是当一个线程修改一个共享变量时,另外一个线程能马上读到这个修改的值。

 

1.1 应用场景

1.1.1 状态标志

public class DetectorTask implements Runnable {
private volatile boolean stop = false;
@Override
public void run() {
while (!stop) {
long currentTime = SystemClock.now();
for (NioChannel channel : channels) {
if (channel.isConnected()) {
idleCheck0((NioChannelImpl) channel, currentTime);
}
}
try {
Thread.sleep(1000L);
} catch (InterruptedException ignored) {
}
}
}
public void stop() {
stop = true;
}
}



1.1.2 双重检查锁定与延迟初始化



public class BrowserPoolFactory {
private volatile static BrowserPool instance;
public static BrowserPool getInstance(AppProperties appProperties) {
if (instance == null) {
synchronized (BrowserPoolFactory.class) {
if (instance == null) {
instance = appProperties.getSelenium().getBrowserType().equals("firefox") ?
new SeleniumChromePool(appProperties) :
new SeleniumFirefoxPool(appProperties);
}
}
}
return instance;
}
}



1.2 实现原理

基于CPU的MESI(Modified, Exclusive, Shared, Invalid)缓存一致性。

 

1) cpu0开始读取变量x;cpu0将变量x从内存读取到自己的缓存行,此时cpu0缓存行的状态为Exclusive。





2) cpu1与cpu2也开始读取变量x。cpu1与cpu2分别从内存读取变量x到自己的缓存行,此时cpu1和cpu2的缓存行状态为Shared;cpu0监听到cpu1、cpu2读取了变量x,遂将缓存行状态改为Shared。

 

3) cpu0开始修改变量x。cpu0将自己缓存行的变量x改为5,此时cpu0缓存行的状态变为Modified;cpu1和cpu2监听到cpu0的缓存行状态变为Modified,遂将自己的缓存行状态改为Invalid。



4) cpu1开始读取变量x。cpu1发现自己的缓存行状态为Invalid,准备重新将变量x从内存读取到缓存行;此时cpu0监听到cpu1准备读取变量x,遂将缓存行中的变量x值回写到内存中,并通知cpu1内存上的变量x已更新;cpu1收到通知,将变量x从内存读取到自己的缓存行;此时cpu0和cpu1的缓存行状态均变为Shared



2. Semaphore

用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用公共资源。

2.1 应用场景

2.1.1 并发数控制

public class HttpAsyncClientCallbackSender implements CallbackSender {
private Semaphore semaphore;
......
@PostConstruct
public void init() throws Exception {
semaphore = new Semaphore(callbackConcurrent, false);
}
@Override
public boolean send(CallbackJob callbackJob, ClientNotifyHandler clientNotifyHandler) {
try {
semaphore.acquire();
} catch (InterruptedException e) {
return false;
}
......
httpClient.execute(httpPost, new FutureCallback<HttpResponse>() {
@Override
public void completed(final HttpResponse response) {
semaphore.release();
}
@Override
public void failed(final Exception ex) {
semaphore.release();
}
@Override
public void cancelled() {
semaphore.release();
}
});
return true;
}
}



2.2 实现原理

基于AbstractQueueSynchronizer队列型同步器(简称AQS),采用的是AQS的共享式获取锁方式(同一时刻可以有多个线程同时获得锁)。

2.2.1 AQS队列型同步器

AQS是用来构建锁或其他同步组件的基础框架,它使用一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。

AbstractQueueSynchronizer数据结构图

2.2.2 AQS独占式获取锁

独占式获取锁,既同一时刻只能有一个线程成功获取同步状态。

AQS独占式获取锁流程图

2.2.3 AQS共享式获取锁

共享式获取锁,既同一时刻可以有多个线程同时获取同步状态。

AQS共享式获取锁流程图



3. CountDownLatch

允许一个线程或多个线程等待其他N个线程全部执行完成。

3.1 应用场景

3.1.1 等待多个异步任务完成



public class JobPusher {
......
private void push0(final JobPullRequest request) {
......
int availableThread = consumerNode.getAvailableThread().get();
int batchSize = jobPushBatchSize;
int it = availableThread % batchSize == 0 ? availableThread / batchSize : availableThread / batchSize + 1;
CountDownLatch latch = new CountDownLatch(it);
for (int i = 1; i <= it; i++) {
pushExecutorService.execute(new Runnable() {
@Override
public void run() {
try {
send(remotingServer, finalSize, consumerNode);
} finally {
latch.countDown();
}
}
});
}
latch.await();
}
}



3.2 实现原理

基于AbstractQueueSynchronizer队列型同步器(简称AQS),采用的是AQS的共享式获取锁方式(同一时刻可以有多个线程同时获得锁)。

public class CountDownLatch {
private static final class Sync extends AbstractQueuedSynchronizer {
......
Sync(int count) {
setState(count);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0) return false;
int nextc = c-1;
if (compareAndSetState(c, nextc) return nextc == 0;
}
}
}
private final Sync sync;
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void countDown() {
sync.releaseShared(1);
}
......
}



4. Atomic*

保证多线程操作的原子性,包括AtomicBoolean、AtomicInteger、AtomicLong、AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray、AtomicReference、AtomicReferenceFieldUpdater、AtomicMarkableReference。

 

4.1 应用场景

4.1.1 生成序列号

public class JobNodeConfigFactory {
private static final AtomicInteger SEQ = new AtomicInteger(0);
......
public static void buildIdentity(Config config) {
String sb = getNodeTypeShort(config.getNodeType()) +
"_" +
config.getIp() +
"_" +
getPid() +
"_" +
DateUtils.format(new Date(), "HH-mm-ss.SSS")
+ "_" + SEQ.incrementAndGet();
config.setIdentity(sb);
}
......
}

 

4.1.2 计数器

public class NdAntiPornClassifier {
......
private AtomicInteger preTrainedModelUsedTimes = new AtomicInteger(0);
private AtomicInteger preTrainedModelDieTimes = new AtomicInteger(0);
public Map<String, Float> classify(byte[] imageBytes)throws IOException {
......
preTrainedModelUsedTimes.getAndIncrement();
Map<String, Float> result = new HashMap<String, Float>();
Session session = preTrainedModel.session();
try (Tensor<TFloat32> inputTensor = preprocessImage(imageBytes)) {
try (Tensor<TFloat32> outputTensor = session.runner()
.feed(modelInputName, inputTensor)
.fetch(modelOutputName).run().get(0)
.expect(TFloat32.DTYPE)) {
FloatNdArray probabilitiesList = NdArrays
.ofFloats(Shape.of(1, CATEGORIES.size()));
outputTensor.data().copyTo(probabilitiesList);
FloatNdArray probabilities = probabilitiesList.get(0);
for (int i = 0; i < probabilities.size(); i++) {
if (i < CATEGORIES.size()) {
result.put(CATEGORIES.get(i),
probabilities.getFloat(i));
}
}
return result;
}
} catch (OutOfMemoryError e) {
log.error("Out of memory when using tensorflow", e);
preTrainedModelDieTimes.getAndIncrement();
throw e;
} finally {
restartClassifierIfNecessary();
reloadModelIfNecessary();
}
}
}

 

4.2 实现原理

调用JVM的Unsafe中的CAS(compareAndSwap)方法,先取得现有的值,检查现有的值是否被其他线程修改,若未被修改,则将值更新为新的值;若值已被修改,则进入自旋,直到值更新成功。

public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
......
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
......
}



public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}



5. ReentrantLock

可重入锁,表示该锁能够支持一个线程对资源的重复加锁,任意线程在获取到锁之后能够再次获取该锁而不会被锁阻塞。

 

5.1 应用场景

5.1.1 有超时的获取锁

public abstract class AbstractRemotingClient extends AbstractRemoting
implements RemotingClient {
private static final long LockTimeoutMillis = 3000;
protected final RemotingClientConfig remotingClientConfig;
private final Lock lockChannelTables = new ReentrantLock();
......
private Channel createChannel(final String addr) throws InterruptedException {
......
if (this.lockChannelTables.tryLock(LockTimeoutMillis, TimeUnit.MILLISECONDS)) {
try {
boolean createNewConnection = false;
......
cw = new ChannelWrapper(channelFuture);
this.channelTables.put(addr, cw);
} catch (Exception e) {
log.error("createChannel: create channel exception", e);
} finally {
this.lockChannelTables.unlock();
}
}
}
......
}



5.1.2 与Condition配合实现线程的等待通知

public class BoundedQueue<T> {
private Object[] items;
private int addIndex, removeIndex, count;
private ReentrantLock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition();
private Condition notFull = lock.newCondition();
public BoundedQueue(int size) {
items = new Object[size];
}
public void add(T t) throws InterruptedException {
lock.lock();
try {
while (count == items.length) notFull.wait();
items[addIndex] = t;
if (++addIndex == items.length) addIndex = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public T remove() throws InterruptedException {
lock.lock();
try {
while (count == 0) notEmpty.await();
Object x = items[removeIndex];
if (++removeIndex == items.length) removeIndex = 0;
--count;
notFull.signal();
return (T) x;
} finally {
lock.unlock();
}
}
}



5.2 实现原理

基于AbstractQueueSynchronizer队列型同步器。当一个线程尝试去获取锁时,发现锁已被获取,则判断获取锁的线程是否为当前线程,若是,则再次成功获取到锁。

final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}



6. BlockingQueue

阻塞队列是一个支持阻塞插入、阻塞移除操作的队列:

  • 阻塞插入(put):当队列满时,执行插入操作的线程会阻塞住,直到队列不满(有元素从队列移除)

  • 阻塞移除(take):当队列为空时,抚摩行移除操作的线程会阻塞住,直到队列不为空(有元素插入到队列)

  • 非阻塞插入(offer):当队列满时,直接返回false,表示插入失败;否则返回true,表示插入成功

  • 非阻塞移除(poll):当队列为空时,直接返回null;否则返回一个元素

 

常用的阻塞队列有以下几种:

  • ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列

  • LinkedBlockingQueue:一个由链接由链表结构组成的有界阻塞队列

  • DelayQueue:一个支持按时间排序的无界阻塞队列

  • SynchronousQueue:一个不存储元素的阻塞队列



6.1 应用场景

6.1.1 使用LinkedBlockingQueue实现FixedThreadPool

FixedThreadPool运示意图

 

6.1.2 使用DelayQueue实现ScheduledThreadPool

ScheduledThreadPool运行示意图

6.2 实现原理

6.2.1 LinkedBlockingQueue

5) 插入、移除使用不同的锁,采用等待通知模式,通过Condition通知阻塞中的线程。

transient Node<E> head;
private transient Node<E> last;
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();



6) 阻塞插入时,若队列已满,则阻塞当前线程,等待队列非满通知(有元素被移出队列);如果当前插入的为第一个元素,则发出队列非空通知,唤醒阻塞中的执行移除操作的线程。

public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0) signalNotEmpty();
}



7) 阻塞移除时,若队列为空,则阻塞当前线程,等待队列非空通知(有元素插入队列);如果当前移除的为最后一个元素,则发出队列非满通知,唤醒阻塞中的执行插入操作的线程。

public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1) notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity) signalNotFull();
return x;
}



6.2.2 DelayQueue

1) 插入、移除使用同一把锁,使用PriorityQueue来保存队列元素。因为PriorityQueue非线程安全,所以需要使用锁来保证多线程对其操作的安全性。PriorityQueue内部采用小顶堆结构来保存元素,剩余时间最小的元素在最顶端,每次插入、移除元素均会重新调整小顶堆。

private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private final Condition available = lock.newCondition();





2) 由于该队列为无界队列,插入时不阻塞,直接调用PriorityQueue的offer方法插入元素;如果插入的元素为队列的第一个元素,则发出队列非空通知,唤醒阻塞中的执行移除操作的线程。

public void put(E e) {
offer(e);
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}



3) 阻塞移除时,如果队列为空,则阻塞当前线程,等待队列非空通知(有元素插入队列);如果队列的第一个元素还未到执行时间,则阻塞当前线程直到执行时间到;

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) {
available.await();
} else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0) return q.poll();
first = null; // don't retain ref while waiting
if (leader != null) {
available.await();
} else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}



发布于: 2020 年 12 月 29 日阅读数: 9
用户头像

Geek_53983e

关注

还未添加个人签名 2020.12.24 加入

还未添加个人简介

评论

发布
暂无评论
JAVA并发编程原理与实战