java并发编程系列原理篇–JDK中的通信工具类Semaphore

前言

java多线程之间进行通信时,JDK主要提供了以下几种通信工具类。主要有Semaphore、CountDownLatch、CyclicBarrier、exchanger、Phaser这几个通讯类。下面我们来详细介绍每个工具类的作用、原理及用法。

Semaphore介绍

Semaphore翻译过来是信号的意思。顾名思义,这个工具类提供的功能就是多个线程彼此“打信号”。而这个“信号”是一个int类型的数据,也可以看成是一种“资源”,用来限定线程访问该资源的数量。
它的构造函数有两个,一个参数的用来指定线程访问资源的数量;两个参数的一个用来指定线程访问资源的数量,一个用来指定是否为公平锁。关于公平锁非公平锁的概念请参照文章java并发编程系列之原理篇-synchronized与锁。构造函数代码如下:


  // 默认情况下使用非公平
  public Semaphore(int permits) {
        sync = new NonfairSync(permits);
  }
  public Semaphore(int permits, boolean fair) {
      sync = fair ? new FairSync(permits) : new NonfairSync(permits);
  }

它的最主要的两个方法是acquire()和release()。acquire()方法会申请一个permit,而release方法会释放一个permit。当然,你也可以申请多个acquire(int permits)或者释放多个release(int permits)。每次acquire,permits就会减少一个或者多个。如果减少到了0,再有其他线程来acquire,那就要阻塞这个线程直到有其它线程release permit为止。

Semaphore的使用

Semaphore主要用来控制线程访问资源的数量的场景。举个例子,在并发条件下,我只想让3个线程来执行某一任务。请看示例代码:

public class SemaphoreDemo {

    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i < 10; i++) {
            new Thread(new MyThread(i,semaphore)).start();
        }
    }

     static class  MyThread implements Runnable{

        private int id;//线程的ID号
        private Semaphore semaphore;

        public MyThread(int id, Semaphore semaphore){
            this.id = id;
            this.semaphore = semaphore;
        }

        @Override
        public void run() {
            try {
                //获取信号量permit许可
                semaphore.acquire();
                //接下来可以用来执行具体的线程任务
                System.out.println(String.format("当前的线程是%d,还剩有%d个线程资源可以使用,有%d个线程处于等待中。",
                        id,semaphore.availablePermits(),semaphore.getQueueLength()));
                Random random = new Random();
                //随机睡眠时间,打乱释放顺序
                Thread.sleep(random.nextInt(1000));
                System.out.println(String.format("线程%d释放了资源",id));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                //任务结束,释放资源
                semaphore.release();
            }
        }
    }
}

输出结果:

当前的线程是1,还剩有1个线程资源可以使用,有0个线程处于等待中。
当前的线程是0,还剩有2个线程资源可以使用,有0个线程处于等待中。
当前的线程是2,还剩有0个线程资源可以使用,有0个线程处于等待中。
线程2释放了资源
当前的线程是3,还剩有0个线程资源可以使用,有6个线程处于等待中。
线程1释放了资源
当前的线程是4,还剩有0个线程资源可以使用,有5个线程处于等待中。
线程0释放了资源
当前的线程是5,还剩有0个线程资源可以使用,有4个线程处于等待中。
线程3释放了资源
当前的线程是6,还剩有0个线程资源可以使用,有3个线程处于等待中。
线程4释放了资源
当前的线程是7,还剩有0个线程资源可以使用,有2个线程处于等待中。
线程5释放了资源
当前的线程是8,还剩有0个线程资源可以使用,有1个线程处于等待中。
线程8释放了资源
当前的线程是9,还剩有0个线程资源可以使用,有0个线程处于等待中。
线程7释放了资源
线程6释放了资源
线程9释放了资源

从结果可以看出来,最初抢到这3个资源的线程是1,0,2,而其他线程进入了等待队列。之后每当有一个线程释放了该资源,才会有其他在等待队列的线程抢到资源。Semaphore默认的acquire方法是会让线程进入等待队列,且会抛出中断异常。但它还有一些方法可以忽略中断或不进入阻塞队列:

 // 忽略中断
 public void acquireUninterruptibly()
 public void acquireUninterruptibly(int permits)

 // 不进入等待队列,底层使用CAS
 public boolean tryAcquire
 public boolean tryAcquire(int permits)
 public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException
 public boolean tryAcquire(long timeout, TimeUnit unit)

Semaphore的原理

Semaphore内部有一个继承了AQS的同步器Sync成员变量,重写了tryAcquireShared方法。在这个方法里,会去尝试获取资源。如果获取失败(想要的资源数量小于目前已有的资源数量),就会返回一个负数(代表尝试获取资源失败)。然后当前线程就会进入AQS的等待队列。具体的代码逻辑请查看JDK1.8中java.util.concurrent包下的Semaphore类。

参考链接

在这里很感谢能够有幸看到来自各个大厂大神们的开源项目深入浅出Java多线程,让我对多线程的知识有一个更深层次的了解。