Java技术之Semaphore的工作原理及实例

Semaphore是一种在多线程环境下使用的设施,该设施负责协调各个线程,以保证它们能够正确、合理的使用公共资源的设施,也是操作系统中用于控制进程同步互斥的量。Semaphore是一种计数信号量,用于管理一组资源,内部是基于AQS的共享模式。它相当于给线程规定一个量从而控制允许活动的线程数。

1.工作原理

以一个停车场是运作为例。为了简单起见,假设停车场只有三个车位,一开始三个车位都是空的。这时如果同时来了五辆车,看门人允许其中三辆不受阻碍的进入,然后放下车拦,剩下的车则必须在入口等待,此后来的车也都不得不在入口处等待。这时,有一辆车离开停车场,看门人得知后,打开车拦,放入一辆,如果又离开两辆,则又可以放入两辆,如此往复。这个停车系统中,每辆车就好比一个线程,看门人就好比一个信号量,看门人限制了可以活动的线程。假如里面依然是三个车位,但是看门人改变了规则,要求每次只能停两辆车,那么一开始进入两辆车,后面得等到有车离开才能有车进入,但是得保证最多停两辆车。对于Semaphore类而言,就如同一个看门人,限制了可活动的线程数。

1.1 Semaphore主要方法:

  • Semaphore(int permits):构造方法,创建具有给定许可数的计数信号量并设置为非公平信号量。

  • Semaphore(int permits,boolean fair):构造方法,当fair等于true时,创建具有给定许可数的计数信号量并设置为公平信号量。

  • void acquire():从此信号量获取一个许可前线程将一直阻塞。相当于一辆车占了一个车位。

  • void acquire(int n):从此信号量获取给定数目许可,在提供这些许可前一直将线程阻塞。比如n=2,就相当于一辆车占了两个车位。

  • void release():释放一个许可,将其返回给信号量。就如同车开走返回一个车位。

  • void release(int n):释放n个许可。

  • int availablePermits():当前可用的许可数。

2.实例讲解

接下来举个例子,就是关于每个人的个人信息,那么一个人占用一个线程,并用Semphore类创建对象从而初始化信号量,控制可活动的线程数。具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package concurrent;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;
public class SemaphoreDemo {
private static final Semaphore semaphore=new Semaphore(3);
private static final ThreadPoolExecutor threadPool=new ThreadPoolExecutor(5,10,60,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>());

private static class InformationThread extends Thread{
private final String name;
private final int age;
public InformationThread(String name,int age)
{
this.name=name;
this.age=age;
}

public void run()
{
try
{
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+":大家好,我是"+name+"我今年"+age+"岁当前时间为:"+System.currentTimeMillis());
Thread.sleep(1000);
System.out.println(name+"要准备释放许可证了,当前时间为:"+System.currentTimeMillis());
System.out.println("当前可使用的许可数为:"+semaphore.availablePermits());
semaphore.release();

}
catch(InterruptedException e)
{
e.printStackTrace();
}
}
}
public static void main(String[] args)
{
String[] name= {"李明","王五","张杰","王强","赵二","李四","张三"};
int[] age= {26,27,33,45,19,23,41};
for(int i=0;i<7;i++)
{
Thread t1=new InformationThread(name[i],age[i]);
threadPool.execute(t1);
}
}

}

运行上述程序结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
pool-1-thread-3:大家好,我是张杰我今年33岁当前时间为:1520424000186
pool-1-thread-1:大家好,我是李明我今年26岁当前时间为:1520424000186
pool-1-thread-2:大家好,我是王五我今年27岁当前时间为:1520424000186
张杰要准备释放许可证了,当前时间为:1520424001187
李明要准备释放许可证了,当前时间为:1520424001187
王五要准备释放许可证了,当前时间为:1520424001187
当前可使用的许可数为:0
当前可使用的许可数为:0
当前可使用的许可数为:0
pool-1-thread-4:大家好,我是王强我今年45岁当前时间为:1520424001187
pool-1-thread-2:大家好,我是张三我今年41岁当前时间为:1520424001187
pool-1-thread-1:大家好,我是李四我今年23岁当前时间为:1520424001187
李四要准备释放许可证了,当前时间为:1520424002187
王强要准备释放许可证了,当前时间为:1520424002187
当前可使用的许可数为:0
张三要准备释放许可证了,当前时间为:1520424002187
pool-1-thread-5:大家好,我是赵二我今年19岁当前时间为:1520424002187
当前可使用的许可数为:0
当前可使用的许可数为:0
赵二要准备释放许可证了,当前时间为:1520424003188
当前可使用的许可数为:2

以上是非公平信号量,将建立Semaphore对象的语句改为如下语句:

1
private static final Semaphore semaphore=new Semaphore(3,true);

运行程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
pool-1-thread-2:大家好,我是王五我今年27岁当前时间为:1520424286454
pool-1-thread-3:大家好,我是张杰我今年33岁当前时间为:1520424286454
pool-1-thread-1:大家好,我是李明我今年26岁当前时间为:1520424286454
pool-1-thread-1:李明要准备释放许可证了,当前时间为:1520424287455
当前可使用的许可数为:0
pool-1-thread-2:王五要准备释放许可证了,当前时间为:1520424287455
pool-1-thread-3:张杰要准备释放许可证了,当前时间为:1520424287455
当前可使用的许可数为:0
当前可使用的许可数为:1
pool-1-thread-1:大家好,我是李四我今年23岁当前时间为:1520424287455
pool-1-thread-5:大家好,我是赵二我今年19岁当前时间为:1520424287455
pool-1-thread-4:大家好,我是王强我今年45岁当前时间为:1520424287455
pool-1-thread-4:王强要准备释放许可证了,当前时间为:1520424288456
当前可使用的许可数为:0
pool-1-thread-1:李四要准备释放许可证了,当前时间为:1520424288456
pool-1-thread-3:大家好,我是张三我今年41岁当前时间为:1520424288456
pool-1-thread-5:赵二要准备释放许可证了,当前时间为:1520424288456
当前可使用的许可数为:0
当前可使用的许可数为:0
pool-1-thread-3:张三要准备释放许可证了,当前时间为:1520424289456
当前可使用的许可数为:2

3.实现单例模式

将创建信号量对象语句修改如下:

1
private static final Semaphore semaphore=new Semaphore(1);

运行程序,结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
pool-1-thread-1:大家好,我是李明我今年26岁当前时间为:1520424379699
pool-1-thread-1:李明要准备释放许可证了,当前时间为:1520424380700
当前可使用的许可数为:0
pool-1-thread-2:大家好,我是王五我今年27岁当前时间为:1520424380700
pool-1-thread-2:王五要准备释放许可证了,当前时间为:1520424381701
当前可使用的许可数为:0
pool-1-thread-3:大家好,我是张杰我今年33岁当前时间为:1520424381701
pool-1-thread-3:张杰要准备释放许可证了,当前时间为:1520424382702
当前可使用的许可数为:0
pool-1-thread-4:大家好,我是王强我今年45岁当前时间为:1520424382702
pool-1-thread-4:王强要准备释放许可证了,当前时间为:1520424383702
当前可使用的许可数为:0
pool-1-thread-5:大家好,我是赵二我今年19岁当前时间为:1520424383702
pool-1-thread-5:赵二要准备释放许可证了,当前时间为:1520424384702
当前可使用的许可数为:0
pool-1-thread-1:大家好,我是李四我今年23岁当前时间为:1520424384702
pool-1-thread-1:李四要准备释放许可证了,当前时间为:1520424385702
当前可使用的许可数为:0
pool-1-thread-2:大家好,我是张三我今年41岁当前时间为:1520424385702
pool-1-thread-2:张三要准备释放许可证了,当前时间为:1520424386703
当前可使用的许可数为:0

如上可知,如果将给定许可数设置为1,就如同一个单例模式,即单个停车位,只有一辆车进,然后这辆车出来后,下一辆车才能进。

4.总结

Semaphore主要用于控制当前活动线程数目,就如同停车场系统一般,而Semaphore则相当于看守的人,用于控制总共允许停车的停车位的个数,而对于每辆车来说就如同一个线程,线程需要通过acquire()方法获取许可,而release()释放许可。如果许可数达到最大活动数,那么调用acquire()之后,便进入等待队列,等待已获得许可的线程释放许可,从而使得多线程能够合理的运行。


上面原文

5.源码解析

Semaphore有两种模式,公平模式和非公平模式。公平模式就是调用acquire的顺序就是获取许可证的顺序,遵循FIFO( First Input First Output,简单说就是指先进先出);而非公平模式是抢占式的,也就是有可能一个新的获取线程恰好在一个许可证释放时得到了这个许可证,而前面还有等待的线程。

5.1 构造方法

Semaphore有两个构造方法,如下:

1
2
3
4
5
6
7
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

从上面可以看到两个构造方法,都必须提供许可的数量,第二个构造方法可以指定是公平模式还是非公平模式,默认非公平模式。
Semaphore内部基于AQS的共享模式,所以实现都委托给了Sync类。
这里就看一下NonfairSync的构造方法:

1
2
3
NonfairSync(int permits) {
super(permits);
}

可以看到直接调用了父类的构造方法,Sync的构造方法如下:

1
2
3
Sync(int permits) {
setState(permits);
}

可以看到调用了setState方法,也就是说AQS中的资源就是许可证的数量。

5.2 获取许可

先从获取一个许可看起,并且先看非公平模式下的实现。首先看acquire方法,acquire方法有几个重载,但主要是下面这个方法

1
2
3
4
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}

从上面可以看到,调用了Sync的acquireSharedInterruptibly方法,该方法在父类AQS中,如下:

1
2
3
4
5
6
7
8
9
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//如果线程被中断了,抛出异常
if (Thread.interrupted())
throw new InterruptedException();
//获取许可失败,将线程加入到等待队列中
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

AQS子类如果要使用共享模式的话,需要实现tryAcquireShared方法,下面看NonfairSync的该方法实现:

1
2
3
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}

该方法调用了父类中的nonfairTyAcquireShared方法,如下:

1
2
3
4
5
6
7
8
9
10
11
12
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
//获取剩余许可数量
int available = getState();
//计算给完这次许可数量后的个数
int remaining = available - acquires;
//如果许可不够或者可以将许可数量重置的话,返回
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

从上面可以看到,只有在许可不够时返回值才会小于0,其余返回的都是剩余许可数量,这也就解释了,一旦许可不够,后面的线程将会阻塞。看完了非公平的获取,再看下公平的获取,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
protected int tryAcquireShared(int acquires) {
for (;;) {
//如果前面有线程再等待,直接返回-1
if (hasQueuedPredecessors())
return -1;
//后面与非公平一样
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

从上面可以看到,FairSync与NonFairSync的区别就在于会首先判断当前队列中有没有线程在等待,如果有,就老老实实进入到等待队列;而不像NonfairSync一样首先试一把,说不定就恰好获得了一个许可,这样就可以插队了。
看完了获取许可后,再看一下释放许可。

5.3 释放许可

释放许可也有几个重载方法,但都会调用下面这个带参数的方法,

1
2
3
4
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}

releaseShared方法在AQS中,如下:

1
2
3
4
5
6
7
8
public final boolean releaseShared(int arg) {
//如果改变许可数量成功
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

AQS子类实现共享模式的类需要实现tryReleaseShared类来判断是否释放成功,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
protected final boolean tryReleaseShared(int releases) {
for (;;) {
//获取当前许可数量
int current = getState();
//计算回收后的数量
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//CAS改变许可数量成功,返回true
if (compareAndSetState(current, next))
return true;
}
}

从上面可以看到,一旦CAS改变许可数量成功,那么就会调用doReleaseShared()方法释放阻塞的线程。

5.4 减小许可数量

Semaphore还有减小许可数量的方法,该方法可以用于用于当资源用完不能再用时,这时就可以减小许可证。代码如下:

1
2
3
4
protected void reducePermits(int reduction) {
if (reduction < 0) throw new IllegalArgumentException();
sync.reducePermits(reduction);
}

可以看到,委托给了Sync,Sync的reducePermits方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
final void reducePermits(int reductions) {
for (;;) {
//得到当前剩余许可数量
int current = getState();
//得到减完之后的许可数量
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
//如果CAS改变成功
if (compareAndSetState(current, next))
return;
}
}

从上面可以看到,就是CAS改变AQS中的state变量,因为该变量代表许可证的数量。

5.5 获取剩余许可数量 

Semaphore还可以一次将剩余的许可数量全部取走,该方法是drain方法,如下:

1
2
3
public int drainPermits() {
return sync.drainPermits();
}

Sync的实现如下:

1
2
3
4
5
6
7
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}

可以看到,就是CAS将许可数量置为0。

5.6 总结

Semaphore是信号量,用于管理一组资源。其内部是基于AQS的共享模式,AQS的状态表示许可证的数量,在许可证数量不够时,线程将会被挂起;而一旦有一个线程释放一个资源,那么就有可能重新唤醒等待队列中的线程继续执行。

原文出自:https://blog.csdn.net/qq_19431333/article/details/70212663