Skip to content

分布式系统互斥性与幂等性问题的分析与解决

参考: https://tech.meituan.com/2016/09/29/distributed-system-mutually-exclusive-idempotence-cerberus-gtis.html

  • 互斥性问题。

  • 幂等性问题。

  • 互斥性问题用通俗的话来讲,就是对共享资源的抢占问题。

  • 操作的互斥性问题,也可以理解为一个需要保证时序性、原子性的问题。

  • 传统的基于数据库的架构中,对于数据的抢占问题往往是通过数据库事务(ACID)来保证的。

  • 在分布式环境中,出于对性能以及一致性敏感度的要求,使得分布式锁成为了一种比较常见而高效的解决方案。

image.png

多线程解决方案

  • Java JDK中提供了两种互斥锁Lock和synchronized。不同的线程之间对同一资源进行抢占,该资源通常表现为某个类的普通成员变量。因此,利用ReentrantLock或者synchronized将共享的变量及其操作锁住,即可基本解决资源抢占的问题。
  • 原理
    • ReentrantLock
      • ReentrantLock 主要利用CAS+CLH队列来实现。它支持公平锁和非公平锁,两者的实现类似
        • CAS:Compare and Swap,比较并交换。CAS有3个操作数:内存值V、预期值A、要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则什么都不做。该操作是一个原子操作,被广泛的应用在Java的底层实现中。在Java中,CAS主要是由sun.misc.Unsafe这个类通过JNI 调用 CPU 底层指令实现。
        • CLH队列:带头结点的双向非循环链表(如下图所示):image.png
      • ReentrantLock的基本实现可以概括为:先通过CAS尝试获取锁。如果此时已经有线程占据了锁,那就加入CLH队列并且被挂起。当锁被释放之后,排在CLH队列队首的线程会被唤醒,然后CAS再次尝试获取锁。
        • 非公平锁:如果同时还有另一个线程进来尝试获取,那么有可能会让这个线程抢先获取
        • 公平锁:如果同时还有另一个线程进来尝试获取,当它发现自己不是在队首的话,就会排到队尾,由队首的线程获取到锁。
      • 可重入锁
        • 在尝试获取锁的时候,会先调用上面的方法。如果状态为0,则表明此时无人占有锁。此时尝试进行set,一旦成功,则成功占有锁。如果状态不为0,再判断是否是当前线程获取到锁。如果是的话,将状态+1,因为此时就是当前线程,所以不用CAS。这也就是可重入锁的实现原理
    • synchronized
      • monitor
        • 每个对象都有一个锁,也就是监视器(monitor)。当monitor被占有时就表示它被锁定。线程执行monitorenter指令时尝试获取对象所对应的monitor的所有权,过程如下:
          • 如果monitor的进入数为0,则该线程进入monitor,然后将进入数设置为1,该线程即为monitor的所有者;
          • 如果线程已经拥有了该monitor,只是重新进入,则进入monitor的进入数加1;
          • 如果其他线程已经占用了monitor,则该线程进入阻塞状态,直到monitor的进入数为0,再重新尝试获取monitor的所有权

利用操作系统层面的进程间通信原理来解决临界资源的抢占问题。

比较常见的一种方法便是使用信号量(Semaphores)。

信号量在POSIX标准下有两种,分别为有名信号量和无名信号量。无名信号量通常保存在共享内存中,而有名信号量是与一个特定的文件名称相关联。信号量是一个整数变量,有计数信号量和二值信号量两种。对信号量的操作,主要是P操作(wait)和V操作(signal)。

  • P操作:先检查信号量的大小,若值大于零,则将信号量减1,同时进程获得共享资源的访问权限,继续执行;若小于或者等于零,则该进程被阻塞后,进入等待队列。
  • V操作:该操作将信号量的值加1,如果有进程阻塞着等待该信号量,那么其中一个进程将被唤醒。

后面自己看一下这篇文章,感觉有价值内容一般

下面内容是 chatgpt 生成

Semaphore

sha mo fo er

Semaphore 是 Java 中用于控制对资源访问的并发控制工具。它类似于计数器,用于控制同时访问某一特定资源的线程数量。Semaphore 提供了两种操作:获取(acquire)和释放(release)。获取操作会阻塞线程直到许可可用,而释放操作会增加可用许可的数量。

Semaphore 的基本概念

  • 计数器Semaphore 内部有一个计数器,表示当前可用的许可数量。
  • 获取许可:线程通过调用 acquire() 方法获取许可,如果当前许可数为 0,线程会被阻塞直到有许可可用。
  • 释放许可:线程通过调用 release() 方法释放许可,增加可用许可数量。

Semaphore 的构造方法

Semaphore 提供了两种主要的构造方法:

  • Semaphore(int permits):创建一个具有给定许可数的 Semaphore
  • Semaphore(int permits, boolean fair):创建一个具有给定许可数且公平的 Semaphore。如果 fairtrue,则线程将以 FIFO(先进先出)的顺序获取许可。

使用示例

以下是一些使用 Semaphore 的示例:

1. 控制访问某一资源

假设我们有一个资源,只允许最多三个线程同时访问。我们可以使用 Semaphore 来控制对该资源的访问:

import java.util.concurrent.Semaphore;

public class SemaphoreExample {
    private static final int MAX_PERMITS = 3;
    private static final Semaphore semaphore = new Semaphore(MAX_PERMITS);

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            new Thread(new Worker(semaphore)).start();
        }
    }
}

class Worker implements Runnable {
    private final Semaphore semaphore;

    Worker(Semaphore semaphore) {
        this.semaphore = semaphore;
    }

    @Override
    public void run() {
        try {
            // 获取许可
            semaphore.acquire();
            System.out.println(Thread.currentThread().getName() + " acquired a permit.");
            // 模拟资源访问
            Thread.sleep(2000);
            System.out.println(Thread.currentThread().getName() + " released a permit.");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 释放许可
            semaphore.release();
        }
    }
}

2. 限制同时访问的线程数量

在某些情况下,我们可能希望限制对某一块代码的同时访问的线程数量。例如,限制一次只能有两个线程访问某一段代码:

import java.util.concurrent.Semaphore;

public class LimitedAccessExample {
    private static final int MAX_CONCURRENT_THREADS = 2;
    private static final Semaphore semaphore = new Semaphore(MAX_CONCURRENT_THREADS);

    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            new Thread(new Task(semaphore)).start();
        }
    }
}

class Task implements Runnable {
    private final Semaphore semaphore;

    Task(Semaphore semaphore) {
        this.semaphore = semaphore;
    }

    @Override
    public void run() {
        try {
            // 获取许可
            semaphore.acquire();
            System.out.println(Thread.currentThread().getName() + " is performing a task.");
            // 模拟任务执行
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName() + " has finished the task.");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 释放许可
            semaphore.release();
        }
    }
}

公平性

默认情况下,Semaphore 是非公平的,即无法保证先调用 acquire 的线程会先获取到许可。如果需要保证公平性,可以在创建 Semaphore 对象时指定:

Semaphore semaphore = new Semaphore(MAX_PERMITS, true);

在这种情况下,Semaphore 会按 FIFO 的顺序分配许可,先请求许可的线程会先得到许可。

总结

  • 控制并发访问Semaphore 用于控制对共享资源的并发访问,通过设置许可数来限制同时访问的线程数量。
  • 获取和释放许可:线程通过 acquire() 方法获取许可,通过 release() 方法释放许可。
  • 公平性:可以通过构造方法设置 Semaphore 的公平性,以确保先请求许可的线程先获得许可。

Semaphore 在并发编程中非常有用,适用于限制对资源的访问、实现限流等场景。

理解和掌握 Semaphore 的使用,对于编写高效、安全的并发程序至关重要。