J.U.C之AQS-Semaphore

Semaphore是什么

Semaphore也叫信号量,在JDK1.5被引入,可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。

Semaphore内部维护了一组虚拟的许可,许可的数量可以通过构造函数的参数指定。

  • 访问特定资源前,必须使用acquire方法获得许可,如果许可数量为0,该线程则一直阻塞,直到有可用许可。
  • 访问资源后,使用release释放许可。

Semaphore和ReentrantLock类似,获取许可有公平策略和非公平许可策略,默认情况下使用非公平策略。

应用场景

Semaphore可以用来做流量分流,特别是对公共资源有限的场景,比如数据库连接。
假设有这个的需求,读取几万个文件的数据到数据库中,由于文件读取是IO密集型任务,可以启动几十个线程并发读取,但是数据库连接数只有10个,这时就必须控制最多只有10个线程能够拿到数据库连接进行操作。这个时候,就可以使用Semaphore做流量控制。

@Slf4j
public class SemaphoreExample1 {

  private static final int threadCount = 20;

  public static void main(String[] args) throws Exception {

    ExecutorService exec = Executors.newCachedThreadPool();

    final Semaphore semaphore = new Semaphore(10);

    for (int i = 0; i < threadCount; i++) {
      final int threadNum = i;
      exec.execute(
          () -> {
            try {
              semaphore.acquire(); // 获取一个许可
              // 存数据过程
              test(threadNum);
              semaphore.release(); // 释放一个许可
            } catch (Exception e) {
              log.error("exception", e);
            }
          });
    }
    exec.shutdown();
  }

  private static void test(int threadNum) throws Exception {
    log.info("{}", threadNum);
    Thread.sleep(1000);
  }
}
exec.execute(
    () -> {
      try {
        semaphore.acquire(3); // 获取多个许可
        test(threadNum);
        semaphore.release(3); // 释放多个许可
      } catch (Exception e) {
        log.error("exception", e);
      }
    });
exec.execute(
  () -> {
    try {
      if (semaphore.tryAcquire()) { // 尝试获取一个许可
        test(threadNum);
        semaphore.release(); // 释放一个许可
      }
    } catch (Exception e) {
      log.error("exception", e);
    }
  });
exec.execute(() -> {
    try {
        if (semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)) { // 尝试等待获取一个许可
            test(threadNum);
            semaphore.release(); // 释放一个许可
        }
    } catch (Exception e) {
        log.error("exception", e);
    }
});