User Tools

Site Tools


java-concurrent:aqs

CountDownLatch,CyclicBarrier,Semaphore 这三种都是一种特定功能锁。

CountDownLatch: 比如CountDownLatch lockA = new CountDownLatch(9); 那么 lockA.await();那么线程就锁在这里, 一直等到 lockA.countDown() 执行了9次,让计数到了0 然后await的点就会被触发

CyclicBarrier: CyclicBarrier lockB = new CyclicBarrier(4); 那么 lockB.await() await()被调用了4次,就自动触发解锁,往下执行。 关于这个的循环使用: https://blog.csdn.net/qq_37142346/article/details/79846011

Semaphore: Semaphore lockC = new Semaphore(3); 是指同一个时间段,只允许三个线程执行。 在线程执行代码块中 lockC.acquire(); 那么这个锁会一直保持 只有三个线程同时在 执行acquire()后面的代码。当然其他线程执行完毕了。另外的线程才能进入。

public class ExampleCountDownLatch {
 
    public static void main(String[] args) throws InterruptedException {
        final int totalThread = 9;
        CountDownLatch countDownLatch = new CountDownLatch(totalThread);
 
        ExecutorService executorService = Executors.newCachedThreadPool();
        for(int i = 0; i< totalThread ;i ++) {
            executorService.execute(() -> {
                try {
                    Thread.sleep(500L);
                    System.out.println(Thread.currentThread().getName() + " run...");
                    countDownLatch.countDown();
                } catch(InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
 
        System.out.println("waiting other thread to execute");
        countDownLatch.await();
        System.out.println("end");
        executorService.shutdown();
    }
}
 
 
public class ExampleCyclicBarrier {
 
    public static void main(String[] args) {
        final int totalThread = 10;
        CyclicBarrier cyclicBarrier = new CyclicBarrier(totalThread);
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < totalThread; i++) {
            executorService.execute(() -> {
                System.out.println("before ....");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.println("after ...");
            });
        }
        executorService.shutdown();
    }
}
 
 
 
public class ExampleSemaphore {
 
    public static void main (String[] args) {
        final int clientCount = 3;
        final int totalRequest = 10;
        Semaphore semaphore = new Semaphore(clientCount);
 
        ExecutorService executorService = Executors.newFixedThreadPool(5);
 
        for(int i=0; i< totalRequest; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    System.out.println(semaphore.availablePermits() + " " + Thread.currentThread().getName());
                    Thread.sleep(7000);
                } catch(InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();
                }
            });
        }
        executorService.shutdown();
    }
}

AbstractQueuedSynchronizer

参考:Java技术之AQS详解 https://www.jianshu.com/p/da9d051dcc3d

提供了一个框架,用于实现阻塞锁和一系列依赖FIFO等待队列实现的相关同步器(如信号量,事件)。 AQS为一系列同步器依赖于一个单独的原子变量(state)的同步器提供了一个非常有用的基础。子类们必须定义改变state变量的protected方法,这些方法定义了state是如何被获取或释放的。鉴于此,本类中的其他方法执行所有的排队和阻塞机制。子类也可以维护其他的state变量,但是为了保证同步,必须原子地操作这些变量。

Provides a framework for implementing blocking locks and related synchronizers (semaphores, envents, etc) that reply on first-in-first-out (FIFO) wait queues. This class is designed to be a useful basis for most kinds of synchronizers that reply on a single atomic value to represent state. Subclass must define the protected methods that change this state, and which define what that state means in terms of this object being acquired or released. Given these, the other methods in this class carry out all queuing and blocking mechanics. Subclasses can maintain other state fields, but only the atomically updated value manipulated using methods (getState), (setState) and (compareAndSetState) is tracked with respect to synchronization.

java-concurrent/aqs.txt · Last modified: 2022/01/13 07:50 by morgan0329

Except where otherwise noted, content on this wiki is licensed under the following license: 沪ICP备12046235号-2
Donate Powered by PHP Valid HTML5 Valid CSS Driven by DokuWiki