侧边栏壁纸
博主头像
孔子说JAVA博主等级

成功只是一只沦落在鸡窝里的鹰,成功永远属于自信且有毅力的人!

  • 累计撰写 377 篇文章
  • 累计创建 136 个标签
  • 累计收到 12 条评论

目 录CONTENT

文章目录

Java多线程编程之Semaphore的使用方法

孔子说JAVA
2022-12-18 / 0 评论 / 0 点赞 / 99 阅读 / 28,347 字 / 正在检测是否收录...

Semaphore(信号量)通常我们叫它信号量,是JUC包下的一个工具类,用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。常用于限流。当一个线程执行时先通过其方法进行获取许可操作,获取到许可的线程继续执行业务逻辑,当线程执行完成后进行释放许可操作,未获取达到许可的线程进行等待或者直接结束。

image-1670372968565

1、概述

Semaphore来自于JDK1.5的JUC包,直译过来就是信号量,被作为一种多线程并发控制工具来使用。Semaphore是在AQS基础之上实现的一个工具,Semaphore的剩余许可量是通过AQS中的state属性进行的记录,获取许可是将该值进行减少,释放许可是将该值进行增加,当没有足够的许可时,线程会加入到阻塞队列中等待其他线程释放许可并唤醒。

  • Semaphore可以控制同时访问共享资源的线程个数,线程通过 acquire方法获取一个信号量,信号量减一,如果没有就等待;通过release方法释放一个信号量,信号量加一。它通过控制信号量的总数量,以及每个线程所需获取的信号量数量,进而控制多个线程对共享资源访问的并发度,以保证合理的使用共享资源。相比synchronized和独占锁一次只能允许一个线程访问共享资源,功能更加强大,有点类似于共享锁!

Semaphore在构造的时候, 可以传入一个int. 表示有多少许可(permit). 线程获取锁的时候, 要告诉信号量使用多少许可(类比与小汽车和卡车), 当线程要使用的许可不足时, 则调用的线程则会被阻塞。

2、源码解析

Sync是Semaphore的一个内部类,该类继承AQS,这个类又有公平和非公平的两个子类,这个内置的同步器实现Semaphore的功能。

image-1670376155032

通过上面的类图可以看到,Semaphore与ReentrantLock的内部类的结构相同,类内部总共存在Sync、NonfairSync、FairSync三个类,NonfairSync与FairSync类继承自Sync类,Sync类继承自AbstractQueuedSynchronizer抽象类。

public class Semaphore implements java.io.Serializable {}

2.1 成员变量

与CountDownLatch类似,Semaphore主要是通过AQS的共享锁机制实现的,因此它的核心属性只有一个sync。

// 序列化版本号
private static final long serialVersionUID = -3222578661600680210L;

/**
 * 同步队列, 保存某个AQS子类实例
 */
private final Sync sync;

2.2 构造器

在Semaphore中提供了两个构造方法,如下:

// 指定许可数量
public Semaphore(int permits) {
    // sync属性赋值   默认未非公平实现
    sync = new NonfairSync(permits);
}

// 指定许可数量和是否公平实现
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

构造方法类似于ReentrantLock的构造方法,只是多了一个许可数的参数。调用构造方法的结果就是初始化了同步队列实例,设置state值为permits值。

  • permits 表示许可线程的数量
  • fair 表示公平性,如果这个设为 true 的话,下次执行的线程会是等待最久的线程

内置同步器的构造方法如下:

FairSync(int permits) {
    super(permits);
}

NonfairSync(int permits) {
    super(permits);
}

Sync(int permits) {
    // 用state属性记录许可量
    setState(permits);
}

这些构造方法的逻辑是比较简单的哈,相信大家还记得在AQS中有一个state属性,当创建Semaphore时会将传递过来的许可量设置到同步器的state值,并将创建的同步器对象赋值给Semaphore中的sync属性。

2.3 类的内部类

2.3.1 Sync类

abstract static class Sync extends AbstractQueuedSynchronizer {
    //序列化版本号
    private static final long serialVersionUID = 1192457210091910933L;

    //构造方法
    Sync(int permits) {
        setState(permits);
    }

    //获取许可
    final int getPermits() {
        return getState();
    }

    //共享模式下非公平策略获取
    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }

    //共享模式下进行释放
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
            int next = current + releases;
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            if (compareAndSetState(current, next))
                return true;
        }
    }

    //根据指定的缩减量减小可用许可的数目
    final void reducePermits(int reductions) {
        for (;;) {
            int current = getState();
            int next = current - reductions;
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            if (compareAndSetState(current, next))
                return;
        }
    }

    //获取并返回立即可用的所有许可
    final int drainPermits() {
        for (;;) {
            int current = getState();
            if (current == 0 || compareAndSetState(current, 0))
                return current;
        }
    }
}

2.3.2 NonfairSync类

NonfairSync类继承了Sync类,表示采用非公平策略获取资源,其只有一个tryAcquireShared方法,重写了AQS的该方法。

static final class NonfairSync extends Sync {
    //序列化版本号
    private static final long serialVersionUID = -2694183684443567898L;

    //构造方法
    NonfairSync(int permits) {
        super(permits);
    }

    //共享模式下获取
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}

2.3.3 FairSync类

FairSync类继承了Sync类,表示采用公平策略获取资源,其只有一个tryAcquireShared方法,重写了AQS的该方法。

static final class FairSync extends Sync {
    //序列化版本号
    private static final long serialVersionUID = 2014338818796000944L;

    //构造方法
    FairSync(int permits) {
        super(permits);
    }

    //共享模式下获取
    protected int tryAcquireShared(int acquires) {
        for (;;) {
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

2.4 方法解析

获取许可

在Semaphore中提供了如下的获取许可的方法

// 获取一个许可/令牌,会阻塞等待其他线程释放许可(在获取到令牌、或者被其他线程调用中断之前线程一直处于阻塞状态)
void acquire() throws InterruptedException

// 获取指定的许可数,会阻塞等待其他线程释放(在获取到令牌、或者被其他线程调用中断、或超时之前线程一直处于阻塞状态)
void acquire(int permits) throws InterruptedException 

// 获取一个许可,会阻塞等待其他线程释放许可 可被中断
void acquireUninterruptibly() 

// 获取指定的许可数 会阻塞等待其他线程释放许可 可被中断
void acquireUninterruptibly(int permits)

// 尝试获取许可,不会进行阻塞等待(尝试获得令牌,返回获取令牌成功或失败,不阻塞线程。尝试获取许可, 不会进行阻塞等待)
boolean tryAcquire()

// 尝试获取指定的许可数 不会阻塞等待
boolean tryAcquire(int permits)

// 尝试获取许可, 可指定等待时间(尝试获得令牌,在超时时间内循环尝试获取,直到尝试获取成功或超时返回,不阻塞线程)
boolean tryAcquire(long timeout, TimeUnit unit)

// 尝试获取指定的许可数 可指定等待时间
boolean tryAcquire(int permits, long timeout, TimeUnit unit) 

// 从Semaphore中获取一个许可证,如果获取不到则阻塞等待,直到其他线程释放了一个许可证。(忽略中断/不响应中断)
acquireUninterruptibly() 

释放许可

// 释放一个令牌,唤醒一个获取令牌不成功的阻塞线程。
release()

// 释放指定数量的许可证。
release(int permits)

其他方法

// 返回可用的令牌数量。
public int availablePermits()

// 获取等待队列里阻塞的线程数。
public final int getQueueLength() 

// 等待队列里是否还存在等待线程。
public final boolean hasQueuedThreads()

// 减少 reduction 个许可证
protected void reducePermits(int reduction)

// 返回所有等待获取许可证的线程集合
protected Collection<Thread> getQueuedThreads()

// 清空令牌把可用令牌数置为0,返回清空令牌的数量。
drainPermits()

2.4.1 acquire方法

这个方法没有返回值,当许可不足时会阻塞线程等待其他线程释放许可,其源码如下:

public void acquire() throws InterruptedException {
    // 调用AQS中的方法
    sync.acquireSharedInterruptibly(1);
}

// AQS中的方法
public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // tryAcquireShared是个模板方法,需要子类去实现
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

在acquire的方法中,会调用到AQS中的acquireSharedInterruptibly方法,在这个方法中用到了模板方法模式,tryAcquireShared方法是一个模板方法,需要子类去实现。接下来我们分别看看在Semaphore中的公平和非公平模式都是如何实现的。

公平模式下的tryAcquireShared方法实现如下:

protected int tryAcquireShared(int acquires) {
    // 自旋
    for (;;) {
        // 判断是否已经存在阻塞的线程
        if (hasQueuedPredecessors())
            return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

非公平模式下的tryAcquireShared方法实现如下:

protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}

final int nonfairTryAcquireShared(int acquires) {
    // 自旋
    for (;;) {
        // 获取剩余的许可量
        int available = getState();
        // 扣减需要的信号量后的值
        int remaining = available - acquires;
        // 信号量不足 或者CAS替换state失败  返回扣减后的信号量值
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

公平和非公平模式的实现的区别是在公平模式的实现中会先判断是否已经存在阻塞的线程了,存在的话不会再去竞争获取许可了。

AQS.doAcquireSharedInterruptibly 方法的逻辑如下:

private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    // 添加到AQS的阻塞队列中
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            // 获取节点的前置节点
            final Node p = node.predecessor();
            // 前置节点是头节点
            if (p == head) {
                // 尝试获取许可
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 清楚无用的节点并挂起当前线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            // 获取失败移除节点
            cancelAcquire(node);
    }
}

2.4.2 tryAcquire方法

这个方法的返回值表示是否获取许可成功,不会阻塞等待其他线程释放许可,没有许可了会直接返回false,这里就要分公平和非公平策略两种情况了。其源码如下:

非公平策略

public boolean tryAcquire() {
    // 调用Semaphore.Sync中的nonfairTryAcquireShared方法
    return sync.nonfairTryAcquireShared(1) >= 0;
}

// 内部调用Sync类中的nonfairTryAcquireShared方法
final int nonfairTryAcquireShared(int acquires) {
    // 自旋
    for (;;) {
        // 获取剩余的许可量
        int available = getState();
        // 扣减需要的信号量后的值
        int remaining = available - acquires;
        // 信号量不足 或者CAS替换state失败  返回扣减后的信号量值
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

该方法本质就是一个自旋方法,通过自旋+CAS来保证修改许可值的线程安全性。该方法返回的情况有如下两种情况

  • 信号量不够,直接返回,返回值为负数,表示获取失败;
  • 信号量足够,且CAS操作成功,返回值为剩余许可值,获取成功。

公平策略

公平策略下的tryAcquireShared方法定义如下:

protected int tryAcquireShared(int acquires) {
    //自旋
    for (;;) {
        if (hasQueuedPredecessors())
            return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

我们看到它与非公平策略的唯一区别就是多了下面这个if代码:

if (hasQueuedPredecessors())
    return -1;

即在获取共享锁之前,先调用hasQueuedPredecessors方法来判断队列中是否存在其他正在排队的节点,如果是返回true,否则为false。因此当存在其他正在排队的节点,当前节点就无法获取许可,只能排队等待,这也是公平策略的体现。

2.4.3 release方法

释放许可的方法为release方法,其源码如下:

public void release() {
    sync.releaseShared(1);
}

// AQS中的方法
public final boolean releaseShared(int arg) {
    // 模板方法  需要子类实现
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        // 成功
        return true;
    }
    // 失败
    return false;
}

Semaphore.Sync 实现的 tryReleaseShared 方法逻辑如下:

protected final boolean tryReleaseShared(int releases) {
    // 自旋
    for (;;) {
        // 获取当前的许可量
        int current = getState();
        // 加上需要释放的量
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        // CAS修改state
        if (compareAndSetState(current, next))
            return true;
    }
}

该方法也是一个自旋方法,通过自旋+CAS原子性地修改同步状态,逻辑很简单。

AQS.doreleaseShared 的逻辑如下:

private void doReleaseShared() {
    // 自旋
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 唤醒下个节点  LockSupport.unpark
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

3、底层执行流程

Semaphore 有点像一个停车场,permits 就好像停车位数量,当线程获得了 permits 就像是获得了停车位,然后停车场显示空余车位减一。刚开始,permits(state)为 3,这时 5 个线程来获取资源。

image-1670377666068

假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列 park 阻塞。

image-1670377684108

这时 Thread-4 释放了 permits,状态如下:

image-1670377697743

接下来 Thread-0 竞争成功,permits 再次设置为0,设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态

image-1670377713168

Semaphore获取锁的过程总结为如下:

  1. 判断是否满足获取锁条件, 关键方法nonfairTryAcquireShared.
  2. 若获取锁成功,则也会修改state.
  3. 若获取锁失败,关键方法doAcquireSharedInterruptibly阻塞的获取锁.
    1. 添加到双向链表
    2. 若是头节点后继, 则尝试获取锁, 否者则判断进入睡眠等待唤醒, 唤醒后继续执行3.2
    3. 若不进入睡眠,则直接运行到3.2步

emaphore释放锁的过程总结为如下:

  1. 释放N个许可, 因为存在并发释放, 需要CAS确保设置更新后的值.
  2. 唤醒双向链表中有效的等待节点. (可能存在并发问题,引入PROPAGATE状态)
  3. 被唤醒的节点调用获取锁的流程.

4、使用场景

可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。假如有一个需求, 要读取几万个文件的数据,因为都是 IO 密集型任务,我们可以启动几十个线程并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有 10 个,这时我们必须控制只有 10 个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,就可以使用 Semaphore 来做流量控制。

  • 场景1:假设数据库的连接数上线为10个,多个线程并发操作数据库可以使用Semaphore来控制并发操作数据库的线程个数最多为10个。

  • 场景2:一个单向隧道能同时容纳10个小汽车或5个卡车通过(1个卡车等效与2个小汽车), 而隧道入口记录着当前已经在隧道内的汽车等效比重. 比如1个小汽车和1个卡车, 则隧道入口显示3. 若隧道入口显示10表示已经满了. 当汽车驶出隧道之后, 隧道入口显示的数字则会相应的减小。

  • 场景3:实现一个简单的登录队列,通过Semaphore来限制系统中的用户数。

5、代码示例

5.1 示例一

Semaphore基本使用示例

package com.kz.example.juc;

import java.util.concurrent.Semaphore;

/**
 * Semaphore基本使用示例
 *
 * @Author kongzi
 * @Date 2022/12/7 08:43
 * @Version 1.0
 */
public class UseSemaphoreDemo {
    private static class MyRunnable implements Runnable {
        // 成员属性 Semaphore对象
        private final Semaphore semaphore;

        public MyRunnable(Semaphore semaphore) {
            this.semaphore = semaphore;
        }

        @Override
        public void run() {
            String threadName = Thread.currentThread().getName();
            // 获取许可
            boolean acquire = semaphore.tryAcquire();
            // 未获取到许可 结束
            if (!acquire) {
                System.out.println("线程【" + threadName + "】未获取到许可,结束");
                return;
            }
            // 获取到许可
            try {
                System.out.println("线程【" + threadName + "】获取到许可");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 释放许可
                semaphore.release();
                System.out.println("线程【" + threadName + "】释放许可");
            }
        }
    }

    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(2);
        for (int i = 0; i <= 10; i++) {
            MyRunnable runnable = new MyRunnable(semaphore);
            Thread thread = new Thread(runnable, "Thread-" + i);
            thread.start();
        }
    }
}

执行结果:

线程【Thread-2】获取到许可
线程【Thread-3】获取到许可
线程【Thread-6】未获取到许可,结束
线程【Thread-7】未获取到许可,结束
线程【Thread-10】未获取到许可,结束
线程【Thread-0】未获取到许可,结束
线程【Thread-1】未获取到许可,结束
线程【Thread-4】未获取到许可,结束
线程【Thread-5】未获取到许可,结束
线程【Thread-8】未获取到许可,结束
线程【Thread-9】未获取到许可,结束
线程【Thread-3】释放许可
线程【Thread-2】释放许可

5.2 示例二

使用 Semaphore做流量控制,实现一个简单的登录队列,通过Semaphore来限制系统中的用户数。

package com.kz.example.juc;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

/**
 * 使用 Semaphore做流量控制
 * 实现一个简单的登录队列,通过Semaphore来限制系统中的用户数
 *
 * @Author kongzi
 * @Date 2022/12/7 09:12
 * @Version 1.0
 */
public class UseSemaphoreLoginQueue {

    static class LoginQueueUsingSemaphore {
        private Semaphore semaphore;

        /**
         * @param slotLimit
         */
        public LoginQueueUsingSemaphore(int slotLimit) {
            semaphore = new Semaphore(slotLimit);
        }

        boolean tryLogin() {
            //获取一个凭证
            return semaphore.tryAcquire();
        }

        void logout() {
            semaphore.release();
        }

        int availableSlots() {
            return semaphore.availablePermits();
        }
    }


    public static void main(String[] args) {

        //允许最大的登录数
        int slots = 10;
        ExecutorService executorService = Executors.newFixedThreadPool(slots);
        LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots);
        //线程池模拟登录
        for (int i = 1; i <= slots; i++) {
            final int num = i;
            executorService.execute(() -> {
                if (loginQueue.tryLogin()) {
                    System.out.println("用户:" + num + "登录成功!");
                } else {
                    System.out.println("用户:" + num + "登录失败!");
                }
            });
        }
        executorService.shutdown();


        System.out.println("当前可用许可证数:" + loginQueue.availableSlots());

        //此时已经登录了10个用户,再次登录的时候会返回false
        if (loginQueue.tryLogin()) {
            System.out.println("登录成功!");
        } else {
            System.out.println("系统登录用户已满,登录失败!");
        }
        //有用户退出登录
        loginQueue.logout();

        //再次登录
        if (loginQueue.tryLogin()) {
            System.out.println("登录成功!");
        } else {
            System.out.println("系统登录用户已满,登录失败!");
        }

    }
}

执行结果:

用户:1登录成功!
用户:5登录成功!
用户:4登录成功!
用户:2登录成功!
用户:3登录成功!
用户:8登录成功!
用户:6登录成功!
用户:7登录成功!
用户:9登录成功!
用户:10登录成功!
当前可用许可证数:0
系统登录用户已满,登录失败!
登录成功!

5.3 示例三

实现停车场功能,有100辆车需要到停车场停车,但只有10个停车位

package com.kz.example.juc;

import java.util.Random;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
 * 使用 Semaphore做流量控制
 * 实现停车场功能,有100辆车需要到停车场停车,但只有10个停车位
 *
 * @Author kongzi
 * @Date 2022/12/7 08:34
 * @Version 1.0
 */
public class UseSemaphorePark {

    static Semaphore s = new Semaphore(10);

    public static void main(String[] args) {
        for (int i = 0; i < 100; i++) {
            new Thread(() -> {
                try {
                    if (s.availablePermits() == 0) {
                        System.out.println(Thread.currentThread().getName() + "车位不足");
                    }
                    System.out.println(Thread.currentThread().getName() + "开始停车");
                    s.acquire();
                    int i1 = new Random().nextInt(10);

                    System.out.println(Thread.currentThread().getName() + "停" + i1 + "秒");
                    TimeUnit.SECONDS.sleep(i1);
                    s.release();
                    System.out.println(Thread.currentThread().getName() + "停车完毕");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, i + "号车").start();
        }
    }

}

打印结果:

0号车开始停车
5号车开始停车
31号车开始停车
1号车开始停车
8号车开始停车
9号车开始停车
12号车开始停车
12号车停1秒
9号车停7秒
1号车停3秒
13号车开始停车
13号车停9秒
0号车停5秒
8号车停1秒
59号车开始停车
59号车停1秒
16号车开始停车
43号车开始停车
5号车停4秒
31号车停1秒
16号车停8秒
62号车车位不足
62号车开始停车
75号车车位不足
75号车开始停车
25号车车位不足
25号车开始停车
24号车车位不足
24号车开始停车
2号车车位不足
2号车开始停车
79号车车位不足
3号车车位不足
79号车开始停车
3号车开始停车
80号车车位不足
80号车开始停车
7号车车位不足
7号车开始停车
10号车车位不足
10号车开始停车
84号车车位不足
84号车开始停车
14号车车位不足
15号车车位不足
14号车开始停车
15号车开始停车
99号车车位不足
23号车车位不足
23号车开始停车
26号车车位不足
27号车车位不足
27号车开始停车
22号车车位不足
22号车开始停车
92号车车位不足
92号车开始停车
26号车开始停车
30号车车位不足
30号车开始停车
99号车开始停车
4号车车位不足
4号车开始停车
32号车车位不足
32号车开始停车
33号车车位不足
33号车开始停车
36号车车位不足
36号车开始停车
37号车车位不足
37号车开始停车
34号车车位不足
34号车开始停车
40号车车位不足
40号车开始停车
41号车车位不足
41号车开始停车
35号车车位不足
35号车开始停车
38号车车位不足
38号车开始停车
39号车车位不足
39号车开始停车
44号车车位不足
44号车开始停车
97号车车位不足
96号车车位不足
95号车车位不足
95号车开始停车
98号车车位不足
18号车车位不足
18号车开始停车
88号车车位不足
97号车开始停车
6号车车位不足
77号车车位不足
77号车开始停车
19号车车位不足
19号车开始停车
82号车车位不足
78号车车位不足
78号车开始停车
29号车车位不足
29号车开始停车
69号车车位不足
76号车车位不足
76号车开始停车
6号车开始停车
49号车车位不足
81号车车位不足
81号车开始停车
86号车车位不足
86号车开始停车
83号车车位不足
83号车开始停车
88号车开始停车
87号车车位不足
87号车开始停车
85号车车位不足
85号车开始停车
89号车车位不足
89号车开始停车
90号车车位不足
90号车开始停车
11号车车位不足
11号车开始停车
91号车车位不足
91号车开始停车
93号车车位不足
93号车开始停车
98号车开始停车
94号车车位不足
94号车开始停车
96号车开始停车
47号车车位不足
47号车开始停车
50号车车位不足
50号车开始停车
51号车车位不足
51号车开始停车
46号车车位不足
46号车开始停车
45号车车位不足
45号车开始停车
48号车车位不足
48号车开始停车
42号车车位不足
42号车开始停车
17号车车位不足
17号车开始停车
54号车车位不足
54号车开始停车
55号车车位不足
55号车开始停车
49号车开始停车
58号车车位不足
58号车开始停车
72号车车位不足
72号车开始停车
21号车车位不足
20号车车位不足
20号车开始停车
53号车车位不足
53号车开始停车
52号车车位不足
52号车开始停车
57号车车位不足
57号车开始停车
56号车车位不足
56号车开始停车
66号车车位不足
66号车开始停车
63号车车位不足
63号车开始停车
61号车车位不足
61号车开始停车
60号车车位不足
60号车开始停车
69号车开始停车
64号车车位不足
65号车车位不足
65号车开始停车
68号车车位不足
68号车开始停车
74号车车位不足
74号车开始停车
73号车车位不足
73号车开始停车
67号车车位不足
67号车开始停车
71号车车位不足
71号车开始停车
70号车车位不足
70号车开始停车
82号车开始停车
28号车车位不足
28号车开始停车
64号车开始停车
21号车开始停车
8号车停车完毕
31号车停车完毕
59号车停车完毕
25号车停0秒
75号车停3秒
24号车停8秒
62号车停9秒
43号车停3秒
12号车停车完毕
25号车停车完毕
1号车停车完毕
2号车停6秒
5号车停车完毕
79号车停1秒
43号车停车完毕
3号车停0秒
80号车停2秒
75号车停车完毕
7号车停2秒
3号车停车完毕
79号车停车完毕
0号车停车完毕
84号车停1秒
10号车停2秒
84号车停车完毕
14号车停5秒
7号车停车完毕
15号车停4秒
23号车停0秒
80号车停车完毕
23号车停车完毕
27号车停4秒
9号车停车完毕
22号车停4秒
10号车停车完毕
92号车停9秒
16号车停车完毕
26号车停0秒
26号车停车完毕
30号车停3秒
13号车停车完毕
24号车停车完毕
4号车停6秒
99号车停2秒
2号车停车完毕
32号车停0秒
32号车停车完毕
33号车停3秒
62号车停车完毕
36号车停0秒
36号车停车完毕
37号车停4秒
27号车停车完毕
40号车停3秒
15号车停车完毕
34号车停1秒
22号车停车完毕
35号车停1秒
41号车停5秒
99号车停车完毕
30号车停车完毕
39号车停1秒
38号车停3秒
14号车停车完毕
34号车停车完毕
44号车停5秒
35号车停车完毕
95号车停3秒
39号车停车完毕
33号车停车完毕
97号车停1秒
18号车停6秒
40号车停车完毕
77号车停8秒
97号车停车完毕
19号车停6秒
37号车停车完毕
78号车停9秒
38号车停车完毕
29号车停2秒
4号车停车完毕
76号车停9秒
95号车停车完毕
6号车停3秒
41号车停车完毕
81号车停1秒
92号车停车完毕
86号车停4秒
29号车停车完毕
83号车停8秒
44号车停车完毕
88号车停8秒
81号车停车完毕
87号车停5秒
6号车停车完毕
85号车停9秒
89号车停1秒
18号车停车完毕
19号车停车完毕
11号车停0秒
89号车停车完毕
91号车停3秒
90号车停6秒
11号车停车完毕
86号车停车完毕
93号车停6秒
98号车停7秒
77号车停车完毕
87号车停车完毕
94号车停0秒
94号车停车完毕
96号车停7秒
91号车停车完毕
47号车停3秒
78号车停车完毕
50号车停0秒
50号车停车完毕
51号车停5秒
76号车停车完毕
46号车停9秒
83号车停车完毕
45号车停1秒
48号车停8秒
88号车停车完毕
45号车停车完毕
42号车停2秒
90号车停车完毕
17号车停2秒
54号车停8秒
47号车停车完毕
93号车停车完毕
55号车停3秒
85号车停车完毕
49号车停6秒
42号车停车完毕
58号车停1秒
17号车停车完毕
72号车停4秒
51号车停车完毕
20号车停0秒
20号车停车完毕
53号车停9秒
98号车停车完毕
52号车停6秒
58号车停车完毕
57号车停5秒
96号车停车完毕
56号车停6秒
55号车停车完毕
66号车停8秒
72号车停车完毕
63号车停5秒
48号车停车完毕
61号车停6秒
46号车停车完毕
60号车停5秒
49号车停车完毕
69号车停6秒
54号车停车完毕
57号车停车完毕
68号车停0秒
65号车停1秒
68号车停车完毕
74号车停9秒
52号车停车完毕
73号车停2秒
65号车停车完毕
67号车停1秒
56号车停车完毕
71号车停4秒
67号车停车完毕
70号车停8秒
73号车停车完毕
82号车停8秒
63号车停车完毕
28号车停3秒
53号车停车完毕
64号车停8秒
66号车停车完毕
21号车停7秒
60号车停车完毕
61号车停车完毕
71号车停车完毕
69号车停车完毕
28号车停车完毕
74号车停车完毕
70号车停车完毕
21号车停车完毕
82号车停车完毕
64号车停车完毕

5.4 示例四

演示Semaphore用法,一个数据库连接池的实现,需要注意 Semaphore 只是对资源并发访问的线程数进行监控,并不会保证线程安全。

package com.kz.example.juc;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.LinkedList;
import java.util.concurrent.Semaphore;

/**
 * 使用 Semaphore做流量控制
 * 演示Semaphore用法,一个数据库连接池的实现
 *
 * @Author kongzi
 * @Date 2022/12/7 07:45
 * @Version 1.0
 */
public class UseSemaphoreDBPool {
    private final static int POOL_SIZE = 10;
    /**
     * 两个指示器,分别表示池子还有可用连接和已用连接
     */
    private final Semaphore useful, useless;
    /**
     * 存放数据库连接的容器
     */
    private static LinkedList<Connection> pool = new LinkedList<Connection>();

    /**
     * 初始化池
     */
    static {
        String url = "jdbc:mysql://localhost:3306/test?user=root&password=111111&useSSL=false&serverTimezone=GMT%2B8";
        try {
            for (int i = 0; i < POOL_SIZE; i++) {
                // 获取连接并放入连接池
                Connection conn = DriverManager.getConnection(url);
                pool.addLast(conn);
                System.out.println("初始化 conn-" + i + ":" + conn.hashCode() + " ");
            }
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    public UseSemaphoreDBPool() {
        this.useful = new Semaphore(10);
        this.useless = new Semaphore(0);
    }

    /**
     * 归还连接
     */
    public void returnConnect(Connection connection) throws InterruptedException {
        if (connection != null) {
            System.out.println("当前有" + useful.getQueueLength() + "个线程等待数据库连接!!"
                    + "可用连接数:" + useful.availablePermits());
            useless.acquire();
            synchronized (pool) {
                pool.addLast(connection);
            }
            useful.release();
        }
    }

    /**
     * 从池子中获取连接
     */
    public Connection takeConnect() throws InterruptedException {
        useful.acquire();
        Connection connection;
        synchronized (pool) {
            connection = pool.removeFirst();
        }
        useless.release();
        return connection;
    }
}

测试类:

package com.kz.example.juc;

import java.sql.Connection;
import java.util.Random;

/**
 * 使用 Semaphore做流量控制的测试类 - 测试数据库连接池
 * 演示Semaphore用法,一个数据库连接池的实现
 *
 * @Author kongzi
 * @Date 2022/12/7 07:45
 * @Version 1.0
 */
public class UseSemaphoreDBPoolTest {
    private static UseSemaphoreDBPool dbPool = new UseSemaphoreDBPool();

    private static class BusiThread extends Thread {
        @Override
        public void run() {
            // 让每个线程持有连接的时间不一样
            Random r = new Random();
            long start = System.currentTimeMillis();
            try {
                Connection connect = dbPool.takeConnect();
                System.out.println("Thread_" + Thread.currentThread().getId()
                        + "_获取数据库连接共耗时【" + (System.currentTimeMillis() - start) + "】ms.");
                // 模拟业务操作,线程持有连接查询数据
                Thread.sleep(100 + r.nextInt(100));
                System.out.println("查询数据完成,归还连接!");
                dbPool.returnConnect(connect);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < 50; i++) {
            Thread thread = new BusiThread();
            thread.start();
        }
    }
}

打印结果:

初始化 conn-0:1778535015 
初始化 conn-1:24433162 
初始化 conn-2:519821334 
初始化 conn-3:1307096070 
初始化 conn-4:2081303229 
初始化 conn-5:1076835071 
初始化 conn-6:1525262377 
初始化 conn-7:1836643189 
初始化 conn-8:1597462040 
初始化 conn-9:853119666 
Thread_13_获取数据库连接共耗时【0】ms.
Thread_14_获取数据库连接共耗时【0】ms.
Thread_15_获取数据库连接共耗时【15】ms.
Thread_17_获取数据库连接共耗时【0】ms.
Thread_16_获取数据库连接共耗时【0】ms.
Thread_20_获取数据库连接共耗时【0】ms.
Thread_19_获取数据库连接共耗时【0】ms.
Thread_18_获取数据库连接共耗时【0】ms.
Thread_21_获取数据库连接共耗时【0】ms.
Thread_22_获取数据库连接共耗时【0】ms.
查询数据完成,归还连接!
当前有40个线程等待数据库连接!!可用连接数:0
Thread_23_获取数据库连接共耗时【117】ms.
查询数据完成,归还连接!
当前有39个线程等待数据库连接!!可用连接数:0
Thread_24_获取数据库连接共耗时【118】ms.
查询数据完成,归还连接!
当前有38个线程等待数据库连接!!可用连接数:0
Thread_25_获取数据库连接共耗时【136】ms.
查询数据完成,归还连接!
当前有37个线程等待数据库连接!!可用连接数:0
Thread_26_获取数据库连接共耗时【155】ms.
查询数据完成,归还连接!
当前有36个线程等待数据库连接!!可用连接数:0
Thread_27_获取数据库连接共耗时【157】ms.
查询数据完成,归还连接!
查询数据完成,归还连接!
当前有35个线程等待数据库连接!!可用连接数:0
当前有35个线程等待数据库连接!!可用连接数:0
Thread_29_获取数据库连接共耗时【163】ms.
Thread_28_获取数据库连接共耗时【163】ms.
查询数据完成,归还连接!
当前有33个线程等待数据库连接!!可用连接数:0
Thread_30_获取数据库连接共耗时【164】ms.
查询数据完成,归还连接!
当前有32个线程等待数据库连接!!可用连接数:0
Thread_31_获取数据库连接共耗时【180】ms.
查询数据完成,归还连接!
当前有31个线程等待数据库连接!!可用连接数:0
Thread_34_获取数据库连接共耗时【186】ms.
查询数据完成,归还连接!
当前有30个线程等待数据库连接!!可用连接数:0
Thread_33_获取数据库连接共耗时【267】ms.
查询数据完成,归还连接!
当前有29个线程等待数据库连接!!可用连接数:0
Thread_32_获取数据库连接共耗时【280】ms.
查询数据完成,归还连接!
当前有28个线程等待数据库连接!!可用连接数:0
Thread_35_获取数据库连接共耗时【281】ms.
查询数据完成,归还连接!
当前有27个线程等待数据库连接!!可用连接数:0
Thread_36_获取数据库连接共耗时【283】ms.
查询数据完成,归还连接!
当前有26个线程等待数据库连接!!可用连接数:0
Thread_37_获取数据库连接共耗时【303】ms.
查询数据完成,归还连接!
当前有25个线程等待数据库连接!!可用连接数:0
Thread_38_获取数据库连接共耗时【307】ms.
查询数据完成,归还连接!
当前有24个线程等待数据库连接!!可用连接数:0
Thread_39_获取数据库连接共耗时【314】ms.
查询数据完成,归还连接!
当前有23个线程等待数据库连接!!可用连接数:0
Thread_40_获取数据库连接共耗时【317】ms.
查询数据完成,归还连接!
当前有22个线程等待数据库连接!!可用连接数:0
Thread_41_获取数据库连接共耗时【337】ms.
查询数据完成,归还连接!
当前有21个线程等待数据库连接!!可用连接数:0
Thread_42_获取数据库连接共耗时【381】ms.
查询数据完成,归还连接!
当前有20个线程等待数据库连接!!可用连接数:0
Thread_43_获取数据库连接共耗时【427】ms.
查询数据完成,归还连接!
当前有19个线程等待数据库连接!!可用连接数:0
Thread_44_获取数据库连接共耗时【432】ms.
查询数据完成,归还连接!
当前有18个线程等待数据库连接!!可用连接数:0
Thread_45_获取数据库连接共耗时【442】ms.
查询数据完成,归还连接!
当前有17个线程等待数据库连接!!可用连接数:0
Thread_46_获取数据库连接共耗时【444】ms.
查询数据完成,归还连接!
当前有16个线程等待数据库连接!!可用连接数:0
Thread_48_获取数据库连接共耗时【452】ms.
查询数据完成,归还连接!
当前有15个线程等待数据库连接!!可用连接数:0
Thread_47_获取数据库连接共耗时【464】ms.
查询数据完成,归还连接!
当前有14个线程等待数据库连接!!可用连接数:0
查询数据完成,归还连接!
Thread_49_获取数据库连接共耗时【468】ms.
当前有13个线程等待数据库连接!!可用连接数:0
Thread_50_获取数据库连接共耗时【468】ms.
查询数据完成,归还连接!
当前有12个线程等待数据库连接!!可用连接数:0
Thread_51_获取数据库连接共耗时【475】ms.
查询数据完成,归还连接!
当前有11个线程等待数据库连接!!可用连接数:0
Thread_52_获取数据库连接共耗时【484】ms.
查询数据完成,归还连接!
当前有10个线程等待数据库连接!!可用连接数:0
Thread_53_获取数据库连接共耗时【575】ms.
查询数据完成,归还连接!
当前有9个线程等待数据库连接!!可用连接数:0
Thread_54_获取数据库连接共耗时【581】ms.
查询数据完成,归还连接!
当前有8个线程等待数据库连接!!可用连接数:0
Thread_55_获取数据库连接共耗时【587】ms.
查询数据完成,归还连接!
当前有7个线程等待数据库连接!!可用连接数:0
Thread_56_获取数据库连接共耗时【607】ms.
查询数据完成,归还连接!
查询数据完成,归还连接!
当前有6个线程等待数据库连接!!可用连接数:0
当前有6个线程等待数据库连接!!可用连接数:0
Thread_57_获取数据库连接共耗时【618】ms.
Thread_58_获取数据库连接共耗时【618】ms.
查询数据完成,归还连接!
当前有4个线程等待数据库连接!!可用连接数:0
Thread_59_获取数据库连接共耗时【640】ms.
查询数据完成,归还连接!
查询数据完成,归还连接!
当前有3个线程等待数据库连接!!可用连接数:0
当前有3个线程等待数据库连接!!可用连接数:0
Thread_60_获取数据库连接共耗时【646】ms.
Thread_61_获取数据库连接共耗时【646】ms.
查询数据完成,归还连接!
当前有1个线程等待数据库连接!!可用连接数:0
Thread_62_获取数据库连接共耗时【655】ms.
查询数据完成,归还连接!
当前有0个线程等待数据库连接!!可用连接数:0
查询数据完成,归还连接!
当前有0个线程等待数据库连接!!可用连接数:1
查询数据完成,归还连接!
当前有0个线程等待数据库连接!!可用连接数:2
查询数据完成,归还连接!
当前有0个线程等待数据库连接!!可用连接数:3
查询数据完成,归还连接!
当前有0个线程等待数据库连接!!可用连接数:4
查询数据完成,归还连接!
当前有0个线程等待数据库连接!!可用连接数:5
查询数据完成,归还连接!
当前有0个线程等待数据库连接!!可用连接数:6
查询数据完成,归还连接!
当前有0个线程等待数据库连接!!可用连接数:7
查询数据完成,归还连接!
当前有0个线程等待数据库连接!!可用连接数:8
查询数据完成,归还连接!
当前有0个线程等待数据库连接!!可用连接数:9

从打印结果可以看出,一次只有 10 个线程执行 acquire(),只有线程进行 release() 方法后才会有别的线程执行 acquire()。

5.5 示例五-数据库连接池

在初始化Semaphore时可以设置其公平性,如果为公平Semaphore,则按照请求时间获得许可,即先发送的请求先获得许可,如果为非公平Semaphore,则先发送的请求未必先获得许可,这有助于提高程序的吞吐量,但是有可能导致某些请求始终获取不到许可(tryAcquire方法不使用公平性设置)

package com.kz.example.juc;

import java.sql.Connection;
import java.sql.DriverManager;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.Semaphore;

/**
 * Semaphore(信号量) 数据库连接池
 *
 * @Author kongzi
 * @Date 2022/12/7 10:22
 * @Version 1.0
 */
public class UseSemaphoreConnPool {
    private LinkedList<Connection> unusedConns =
            new LinkedList<Connection>();
    /**
     * 释放连接时对查找性能要求较高,故使用哈希表
     */
    private Map<Connection, String> usedConns =
            new HashMap<Connection, String>();
    private final Semaphore available;

    public UseSemaphoreConnPool(int size) throws Exception {
        StringBuilder builder = new StringBuilder();
        builder.append("-----pool----- ");
        // 公平性Semaphore
        available = new Semaphore(size, true);
        String url = "jdbc:mysql://localhost:3306/test?user=root&password=111111&useSSL=false&serverTimezone=GMT%2B8";
        for (int i = 0; i < size; i++) {
            Connection conn = DriverManager.getConnection(url);
            unusedConns.add(conn);
            builder.append("conn-" + i + ":" + conn.hashCode() + " ");
        }
        builder.append("-------------- ");
        System.out.print(builder.toString());
    }

    public Connection getConn() throws InterruptedException {
        //获取Semaphore中的许可
        available.acquire();
        Connection conn = null;
        synchronized (this) {
            conn = unusedConns.removeFirst();
            usedConns.put(conn, "");

            System.out.println(Thread.currentThread().getName()
                    + ":" + conn.hashCode() + "[got]");
            System.out.println(display());
        }
        return conn;
    }

    public void close(Connection conn) {
        synchronized (this) {
            if (usedConns.containsKey(conn)) {
                usedConns.remove(conn);
                unusedConns.addLast(conn);

                System.out.println(Thread.currentThread().getName()
                        + ":" + conn.hashCode() + "[closed]");
                System.out.println(display());
            }
        }
        //释放线程获取的许可
        available.release();
    }

    private final synchronized String display() {
        String str = "";
        if (unusedConns.size() > 0) {
            str = "";
            for (Connection conn : unusedConns) {
                str += conn.hashCode() + "|";
            }
        }
        if (!str.equals("")) {
            return str;
        } else {
            return "empty";
        }
    }
}

测试类:

package com.kz.example.juc;

import java.sql.Connection;
import java.util.concurrent.CountDownLatch;

/**
 * Semaphore(信号量) 数据库连接池测试类
 *
 * @Author kongzi
 * @Date 2022/12/7 10:23
 * @Version 1.0
 */
public class UseSemaphoreConnPoolTest implements Runnable {
    private static CountDownLatch latch = new CountDownLatch(1);
    private UseSemaphoreConnPool pool;

    public UseSemaphoreConnPoolTest(UseSemaphoreConnPool pool) {
        this.pool = pool;
    }

    @Override
    public void run() {
        try {
            latch.await();
            Connection conn = pool.getConn();
            Thread.sleep(1 * 1000);
            pool.close(conn);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws Exception {
        UseSemaphoreConnPool pool = new UseSemaphoreConnPool(2);
        for (int i = 0; i < 4; i++) {
            Thread t = new Thread(new UseSemaphoreConnPoolTest(pool));
            t.start();
        }
        //保证4个线程同时运行
        latch.countDown();
    }
}

执行结果:

-----pool----- conn-0:1778535015 conn-1:24433162 -------------- Thread-2:1778535015[got]
24433162|
Thread-3:24433162[got]
empty
Thread-2:1778535015[closed]
1778535015|
Thread-0:1778535015[got]
empty
Thread-3:24433162[closed]
24433162|
Thread-1:24433162[got]
empty
Thread-0:1778535015[closed]
1778535015|
Thread-1:24433162[closed]
1778535015|24433162|

特别注意如果getConn方法和close方法都为同步方法,将产生死锁:

public synchronized Connection getConn() throws InterruptedException{  
    ......  
}  
      
public synchronized void close(Connection conn){  
    ......  
}  

同一时刻只能有一个线程调用连接池的getConn方法或close方法,当Semaphore中没有可用的许可,并且此时恰好有一个线程成功调用连接池的getConn方法,则该线程将一直阻塞在acquire方法上,其它线程将没有办法获取连接池上的锁并调用close方法释放许可,程序将会卡死。

  • 阻塞方法上不要加锁,否则将导致锁长时间不释放,如果该锁为互斥锁,将导致程序卡住
  • acquire方法本身使用乐观锁实现,也不需要再加互斥锁

5.6 示例六-使用Semaphore实现互斥

使用Semaphore实现互斥只需要将许可证数量设置为1,这样就可以保证只有一个线程能获取到许可证。相比内置锁synchronized和重入锁ReentrantLock,使用Semaphore实现互斥有个明显的缺点:不可重入,没有释放许可证的情况下,再次调acquire方法将导致死锁。

package com.kz.example.juc;

import java.util.concurrent.Semaphore;

/**
 * 使用Semaphore实现互斥
 * 使用Semaphore实现互斥只需要将许可证数量设置为1,这样就可以保证只有一个线程能获取到许可证。
 *
 * @Author kongzi
 * @Date 2022/12/7 09:42
 * @Version 1.0
 */
public class UseSemaphoreDemo2 {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(1);

        Runnable runnable = () -> {
            String threadName = Thread.currentThread().getName();
            try {
                //获取一个许可证
                semaphore.acquire();
                System.out.println(threadName + "执行任务A...");
                semaphore.acquire();
                System.out.println(threadName + "执行任务B...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                //释放一个许可证
                semaphore.release();
            }
        };

        new Thread(runnable).start();
    }

}

执行结果:

Thread-0执行任务A...

“执行任务B”永远不会打印,因为许可证只有一个,第二次acquire方法的调用会因为无法获取到许可证而一直阻塞。

5.7 示例七-不可重入互斥锁

package com.kz.example.juc;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;

/**
 * 不可重入互斥锁
 *
 * @Author kongzi
 * @Date 2022/12/7 10:32
 * @Version 1.0
 */
public class UseSemaphoreDemo3 implements Runnable {
    private static CountDownLatch latch = new CountDownLatch(1);
    private static Semaphore lock = new Semaphore(1, true);

    @Override
    public void run() {
        try {
            latch.await();
            this.work();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private void work() throws InterruptedException {
        lock.acquire();
        System.out.println("Locking by "
                + Thread.currentThread().getName());
        Thread.sleep(1 * 1000);
        lock.release();
    }

    public static void main(String[] args) throws Exception {
        for (int i = 0; i < 4; i++) {
            Thread t = new Thread(new UseSemaphoreDemo3());
            t.start();
        }
        //保证4个线程同时运行
        latch.countDown();
    }
}

执行结果:

Locking by Thread-0
Locking by Thread-2
Locking by Thread-1
Locking by Thread-3

6、总结

Semaphore是一个有效的流量控制工具,它基于AQS共享锁实现。我们常常用它来控制对有限资源的访问。

  • 每次使用资源前,先申请一个信号量,如果资源数不够,就会阻塞等待;
  • 每次释放资源后,就释放一个信号量。

Semaphore和CountDownLatch的原理都差不多,都是直接使用AQS的共享模式实现自己的逻辑,都是对于AQS的state资源的利用,但是它们却实现了不同的功能,CountDownLatch中state被看作一个倒计数器,当state变为0时,表示线程可以放开执行。而Semaphore中的state被看作信号量资源,获取不到资源则可能会阻塞,获取到资源则可以访问共享区域,共享区域使用完毕要记得还回信号量。

很明显Semaphore的信号量资源很像锁资源,但是我们前面就说过他们的不同,那就是锁资源是和获得锁的线程绑定的,而这里的信号量资源并没有和线程绑定,也就是说你可以让一些线程不停的“释放信号量”,而另一些线程只是不停的“获取信号量”,这在AQS内部实际上就是对state状态的值的改变而已,与线程无关!

通常Semaphore可以用来控制多线程对于共享资源访问的并发量,在上面的案例中我们就见过!另外还需要注意的是,如果在AQS的同步队列中队头结点线程需要获取n个资源,目前有m个资源,如果m小于n,那么这个队列中的头结点线程以及后面的所有结点线程都会因为不能获取到资源而继续阻塞,即使头结点后面的结点中的线程所需的资源数量小于m也不行。即已经在AQS同步队列中阻塞的线程,只能按照先进先出的顺序去获取资源,如果头部线程因为所需资源数量不够而一直阻塞,那么队列后面的线程必定不能获取资源!

和CountDownLatch一样,Semaphore的源码看起来非常简单,那是因为复杂的线程等待、唤醒机制都被AQS实现了,如果想要真正了解Semaphore的原理,那么AQS是必须要了解的。实际上如果学会了AQS,那么JUC中的锁或者其他同步组件就很简单了!

0

评论区