最原始的自旋锁就是多个线程不断自旋,大家都不断尝试获取锁。看下面例子,主要看lock和unlock两个方法,Unsafe仅仅是为操作提供了硬件级别的原子CAS操作。对于lock方法,假如有若干线程竞争,能成功通过CAS将value值修改为newV的线程即是成功获取锁的线程。成功获取锁的线程将顺利通过,而其它线程则不断在循环检测value值是否改回0,将value改为0的操作就是获取锁的线程释放锁的操作。对于unlock方法,用于释放锁,释放后其它线程又继续对该锁竞争。如此一来,没获得锁的线程也不会被挂起或阻塞,而是不断循环检查状态。
1. public class SpinLock {
2. private static Unsafe unsafe = null;
3. private static final long valueOffset;
4. private volatile int value = 0;
5. static {
6. try {
7. unsafe = getUnsafeInstance();
8. valueOffset = unsafe.objectFieldOffset(SpinLock.class.getDeclaredField("value"));
9. } catch (Exception ex) {
10. throw new Error(ex);
11. }
12. }
13.
14. private static Unsafe getUnsafeInstance() throws Exception {
15. Field theUnsafeInstance = Unsafe.class.getDeclaredField("theUnsafe");
16. theUnsafeInstance.setAccessible(true);
17. return (Unsafe) theUnsafeInstance.get(Unsafe.class);
18. }
19.
20. public void lock() {
21. for (;;) {
22. int newV = value + 1;
23. if (newV == 1)
24. if (unsafe.compareAndSwapInt(this, valueOffset, 0, newV)) {
25. return;
26. }
27. }
28. }
29.
30. public void unlock() {
31. unsafe.compareAndSwapInt(this, valueOffset, 1, 0);
32. }
33. }
鉴于原始自旋锁存在公平性问题,于是引入一种排队机制来解决它,这就是排队自旋锁。所有线程在尝试获取锁之前得先拿到一个排队号,然后再不断轮询当前是不是已经轮到自己了,判断的依据就是当前处理号是否等于自己的排队号。如果两者相等,则表示已经轮到自己了,于是得到锁并往下执行。
看下面例子,主要看lock和unlock两个方法,Unsafe仅仅是为操作提供了硬件级别的原子CAS操作。对于lock方法,首先通过不断循环去尝试拿到一个排队号,一旦成功拿到排队号后就开始通过while(processingNum != nowNum)轮询看自己是否已经轮到了。而unlock方法则是直接修改当前处理号,直接加1,表示自己已经不需要锁了,可以让给下一位了。
1. public class TicketLock {
2. private static Unsafe unsafe = null;
3. private static final long ticketNumOffset;
4. private static final long processingNumOffset;
5. private volatile int ticketNum = 0;
6. private volatile int processingNum = 0;
7. static {
8. try {
9. unsafe = getUnsafeInstance();
10. ticketNumOffset = unsafe
11. .objectFieldOffset(TicketLock.class.getDeclaredField("ticketNum"));
12. processingNumOffset = unsafe
13. .objectFieldOffset(TicketLock.class.getDeclaredField("processingNum"));
14. } catch (Exception ex) {
15. throw new Error(ex);
16. }
17. }
18.
19. private static Unsafe getUnsafeInstance() throws Exception {
20. Field theUnsafeInstance = Unsafe.class.getDeclaredField("theUnsafe");
21. theUnsafeInstance.setAccessible(true);
22. return (Unsafe) theUnsafeInstance.get(Unsafe.class);
23. }
24.
25. public int lock() {
26. int nowNum;
27. for (;;) {
28. nowNum = ticketNum;
29. if (unsafe.compareAndSwapInt(this, ticketNumOffset, ticketNum, ticketNum + 1)) {
30. break;
31. }
32. }
33. while (processingNum != nowNum) {
34. }
35.
36. return nowNum;
37. }
38.
39. public void unlock(int ticket) {
40. int next = ticket + 1;
41. unsafe.compareAndSwapInt(this, processingNumOffset, ticket, next);
42. }
43.
44. }
为了优化同步带来的花销,Craig、Landin、Hagersten三个人发明了CLH锁。其核心思想是:通过一定手段将所有线程对某一共享变量的轮询竞争转化为一个线程队列,且队列中的线程各自轮询自己的本地变量。
这个转化过程有两个要点:一是应该构建怎样的队列以及如何构建队列?为了保证公平性,我们构建的将是一个FIFO队列。构建的时候主要通过移动尾部节点tail来实现队列的排队,每个想获取锁的线程创建一个新节点并通过CAS原子操作将新节点赋给tail,然后让当前线程轮询前一节点的某个状态位。如图可以清晰看到队列结构及自旋操作,这样就成功构建了线程排队队列。二是如何释放队列?执行完线程后只需将当前线程对应的节点状态位置为解锁状态即可,由于下一节点一直在轮询,所以可获取到锁。
所以,CLH锁的核心思想是将众多线程长时间对某资源的竞争,通过有序化这些线程将其转化为只需对本地变量检测。而唯一存在竞争的地方就是在入队列之前对尾节点tail的竞争,但此时竞争的线程数量已经少了很多了。比起所有线程直接对某资源竞争的轮询次数也减少了很多,这也大大节省了CPU缓存同步的消耗,从而大大提升系统性能。
下面我们提供一个简单的CLH锁实现代码,以便更好理解CLH锁的原理。其中lock与unlock两方法提供加锁和解锁操作,每次加锁解锁必须将一个CLHNode对象作为参数传入。lock方法的for循环是通过CAS操作将新节点插入队列,而while循环则是检测前驱节点的锁状态位。一旦前驱节点锁状态位允许则结束检测,让线程往下执行。解锁操作先判断当前节点是否为尾节点,如是则直接将尾节点设置为空,此时说明仅仅只有一条线程在执行,否则将当前节点的锁状态位设置为解锁状态。
1. public class CLHLock {
2. private static Unsafe unsafe = null;
3. private static final long valueOffset;
4. private volatile CLHNode tail;
5.
6. public class CLHNode {
7. private volatile boolean isLocked = true;
8. }
9.
10. static {
11. try {
12. unsafe = getUnsafeInstance();
13. valueOffset = unsafe.objectFieldOffset(CLHLock.class.getDeclaredField("tail"));
14. } catch (Exception ex) {
15. throw new Error(ex);
16. }
17. }
18.
19. public void lock(CLHNode currentThreadNode) {
20. CLHNode preNode = null;
21. for (;;) {
22. preNode = tail;
23. if (unsafe.compareAndSwapObject(this, valueOffset, preNode, currentThreadNode))
24. break;
25. }
26. if (preNode != null)
27. while (preNode.isLocked) {
28. }
29. }
30.
31. public void unlock(CLHNode currentThreadNode) {
32. if (!unsafe.compareAndSwapObject(this, valueOffset, currentThreadNode, null))
33. currentThreadNode.isLocked = false;
34. }
35.
36. private static Unsafe getUnsafeInstance() throws Exception {
37. Field theUnsafeInstance = Unsafe.class.getDeclaredField("theUnsafe");
38. theUnsafeInstance.setAccessible(true);
39. return (Unsafe) theUnsafeInstance.get(Unsafe.class);
40. }
41. }
MCS锁由John Mellor-Crummey和Michael Scott两人发明,它的出现旨在解决CLH锁存在的问题。它也是基于FIFO队列,与CLH锁相似,不同的地方在于轮询的对象不同。MCS锁中线程只对本地变量自旋,而前驱节点则负责通知其结束自旋操作。这样的话就减少了CPU缓存与主存储之间的不必要的同步操作,减少了同步带来的性能损耗。
如下图,每个线程对应着队列中的一个节点。节点内有一个spin变量,表示是否需要旋转。一旦前驱节点使用完锁后便修改后继节点的spin变量,通知其不必继续做自旋操作,已成功获取锁。
下面我们提供一个简单的MCS锁实现代码,以便更好理解MCS锁的原理。其中lock与unlock两方法提供加锁和解锁操作,每次加锁解锁必须将一个MCSNode对象作为参数传入。lock方法的for循环是通过CAS操作将新节点赋给队列尾部节点tail,如果存在前驱节点的话则自己要开始自旋操作,等待前驱节点解锁时通知自己。一旦前驱节点执行解锁后,则会将本节点的spin变量修改为false,本节点则获取到锁并停止自旋,让线程往下执行。解锁操作先判断当前节点是否为尾节点,如果是则什么都不用处理。此时说明仅仅只有一条线程在执行,否则将后继节点的spin变量设置为false。此时要考虑特殊情况,如果不存在后继节点则将尾部节点tail设为null。在此期间可能又有线程进来,这时tail的CAS修改会失败,所以就只能自旋等后继节点不为空再往下执行。
1. public class MCSLock {
2. private static Unsafe unsafe = null;
3. volatile MCSNode tail;
4. private static final long valueOffset;
5.
6. public static class MCSNode {
7. MCSNode next;
8. volatile boolean spin = true;
9. }
10.
11. static {
12. try {
13. unsafe = getUnsafeInstance();
14. valueOffset = unsafe.objectFieldOffset(MCSLock.class.getDeclaredField("tail"));
15. } catch (Exception ex) {
16. throw new Error(ex);
17. }
18. }
19.
20. private static Unsafe getUnsafeInstance() throws Exception {
21. Field theUnsafeInstance = Unsafe.class.getDeclaredField("theUnsafe");
22. theUnsafeInstance.setAccessible(true);
23. return (Unsafe) theUnsafeInstance.get(Unsafe.class);
24. }
25.
26. public void lock(MCSNode currentThreadMcsNode) {
27. MCSNode predecessor = null;
28. for (;;) {
29. predecessor = tail;
30. if (unsafe.compareAndSwapObject(this, valueOffset, tail, currentThreadMcsNode))
31. break;
32. }
33. if (predecessor != null) {
34. predecessor.next = currentThreadMcsNode;
35. while (currentThreadMcsNode.spin) {
36. }
37. }
38. }
39.
40. public void unlock(MCSNode currentThreadMcsNode) {
41. if (tail != currentThreadMcsNode) {
42. if (currentThreadMcsNode.next == null) {
43. if (unsafe.compareAndSwapObject(this, valueOffset, currentThreadMcsNode, null)) {
44. return;
45. } else {
46. while (currentThreadMcsNode.next == null) {
47. }
48. }
49. }
50. currentThreadMcsNode.next.spin = false;
51. }
52. }
}
更多JAVA并发原理剖析请关注下面的专栏。
作者简介:笔名seaboat,擅长人工智能、计算机科学、数学原理、基础算法。出版书籍:《图解数据结构与算法》、《Tomcat内核设计剖析》、《人工智能原理科普》。