00001
00015 module mango.locks.LockImpl;
00016
00017 private {
00018 import std.thread;
00019
00020 import mango.sys.Atomic;
00021
00022 import mango.locks.Utils;
00023 import mango.locks.Condition;
00024 import mango.locks.TimeUnit;
00025 import mango.locks.Exceptions;
00026 }
00027
00232 abstract class AbstractLock {
00233
00238 protected this() { }
00239
00315 final static class Node {
00317 const int CANCELLED = 1;
00319 const int SIGNAL = -1;
00321 const int CONDITION = -2;
00323 static Node SHARED;
00325 static Node EXCLUSIVE;
00326
00327 static this() {
00328 SHARED = new Node();
00329 EXCLUSIVE = null;
00330 }
00331
00360 int waitStatus;
00361 int v_waitStatus() { volatile return waitStatus; }
00362 void v_waitStatus(int val) { volatile waitStatus = val; }
00363
00375 Node prev;
00376 Node v_prev() { volatile return prev; }
00377 void v_prev(Node val) { volatile prev = val; }
00378
00390 Node next;
00391 Node v_next() { volatile return next; }
00392 void v_next(Node val) { volatile next = val; }
00393
00398 Thread thread;
00399 Thread v_thread() { volatile return thread; }
00400 void v_thread(Thread val) { volatile thread = val; }
00401
00411 Node nextWaiter;
00412
00416 final bool isShared() {
00417 return nextWaiter is SHARED;
00418 }
00419
00424 final Node predecessor() {
00425 volatile return prev;
00426 }
00427
00428 this() {
00429 }
00430
00431 this(Thread thread, Node mode) {
00432 this.nextWaiter = mode;
00433 this.thread = thread;
00434 }
00435
00436 this(Thread thread, int waitStatus) {
00437 this.waitStatus = waitStatus;
00438 this.thread = thread;
00439 }
00440 }
00441
00447 private Node head;
00448 Node v_head() { volatile return head; }
00449 void v_head(Node val) { volatile head = val; }
00450
00455 private Node tail;
00456 Node v_tail() { volatile return tail; }
00457 void v_tail(Node val) { volatile tail = val; }
00458
00462 protected int state_;
00463
00469 protected final int state() {
00470 volatile return state_;
00471 }
00472
00478 protected final void state(int newState) {
00479 volatile state_ = newState;
00480 }
00481
00482
00483
00489 private Node enq(Node node) {
00490 for (;;) {
00491 Node t = v_tail;
00492 if (t is null) {
00493 Node h = new Node();
00494 volatile {
00495 h.next = node;
00496 node.prev = h;
00497 }
00498 if (Atomic.compareAndSet32(&head,
00499 null,
00500 h)) {
00501 v_tail = node;
00502 return h;
00503 }
00504 }
00505 else {
00506 node.v_prev = t;
00507 if (Atomic.compareAndSet32(&tail, t, node)) {
00508 t.v_next = node;
00509 return t;
00510 }
00511 }
00512 }
00513 }
00514
00521 private Node addWaiter(Node mode) {
00522 Node node = new Node(Thread.getThis(), mode);
00523
00524 Node pred = v_tail;
00525 if (pred !is null) {
00526 node.v_prev = pred;
00527 if (Atomic.compareAndSet32(&tail, pred, node)) {
00528 pred.v_next = node;
00529 return node;
00530 }
00531 }
00532 enq(node);
00533 return node;
00534 }
00535
00542 private void setHead(Node node) {
00543 volatile {
00544 head = node;
00545 node.thread = null;
00546 node.prev = null;
00547 }
00548 }
00549
00554 private void unparkSuccessor(Node node) {
00555
00556
00557
00558
00559 Atomic.compareAndSet32(&node.waitStatus, Node.SIGNAL, 0);
00560
00561
00562
00563
00564
00565
00566 Thread thread;
00567 volatile {
00568 Node s = node.next;
00569 if (s !is null && s.waitStatus <= 0)
00570 thread = s.thread;
00571 else {
00572 thread = null;
00573 for (s = tail; s !is null && s !is node; ) {
00574 volatile {
00575 if (s.waitStatus <= 0)
00576 thread = s.thread;
00577 s = s.prev;
00578 }
00579 }
00580 }
00581 if (thread !is null)
00582 thread.resume();
00583 }
00584 }
00585
00593 private void setHeadAndPropagate(Node node, int propagate) {
00594 setHead(node);
00595 if (propagate > 0 && node.v_waitStatus != 0) {
00596
00597
00598
00599
00600 Node s = node.v_next;
00601 if (s is null || s.isShared())
00602 unparkSuccessor(node);
00603 }
00604 }
00605
00606
00607
00612 private void cancelAcquire(Node node) {
00613 if (node !is null) {
00614 volatile {
00615 node.thread = null;
00616
00617 node.waitStatus = Node.CANCELLED;
00618 }
00619 unparkSuccessor(node);
00620 }
00621 }
00622
00631 private static bool shouldParkAfterFailedAcquire(Node pred, Node node) {
00632 int s = pred.v_waitStatus;
00633 if (s < 0)
00634
00635
00636
00637
00638 return true;
00639 if (s > 0)
00640
00641
00642
00643
00644 node.prev = pred.v_prev;
00645 else {
00646
00647
00648
00649
00650
00651 Atomic.compareAndSet32(&pred.waitStatus, 0, Node.SIGNAL);
00652 }
00653 return false;
00654 }
00655
00656
00657
00658
00659
00660
00661
00662
00663
00664
00671 final void acquireQueued(Node node, int arg) {
00672 try {
00673 for (;;) {
00674 Node p = node.predecessor();
00675 if (p is v_head && tryAcquire(arg)) {
00676 setHead(node);
00677 p.v_next = null;
00678 return;
00679 }
00680 if (shouldParkAfterFailedAcquire(p, node))
00681 version (Ares)
00682 Thread.getThis.suspend();
00683 else
00684 Thread.getThis().pause();
00685 }
00686 } catch (Exception ex) {
00687 cancelAcquire(node);
00688 throw ex;
00689 }
00690 assert ( false );
00691 }
00692
00699 private bool doAcquireNanos(int arg, long nanosTimeout) {
00700 long lastTime = currentTimeNanos();
00701 Node node = addWaiter(Node.EXCLUSIVE);
00702 try {
00703 for (;;) {
00704 Node p = node.predecessor();
00705 if (p is v_head && tryAcquire(arg)) {
00706 setHead(node);
00707 p.v_next = null;
00708 return true;
00709 }
00710 if (nanosTimeout <= 0) {
00711 cancelAcquire(node);
00712 return false;
00713 }
00714 if (shouldParkAfterFailedAcquire(p, node)) {
00715 sleepNanos(nanosTimeout);
00716 long now = currentTimeNanos();
00717 nanosTimeout -= now - lastTime;
00718 lastTime = now;
00719 }
00720 }
00721 } catch (Exception ex) {
00722 cancelAcquire(node);
00723 throw ex;
00724 }
00725 assert ( false );
00726 }
00727
00732 private void doAcquireShared(int arg) {
00733 Node node = addWaiter(Node.SHARED);
00734 try {
00735 for (;;) {
00736 Node p = node.predecessor();
00737 if (p is v_head) {
00738 int r = tryAcquireShared(arg);
00739 if (r >= 0) {
00740 setHeadAndPropagate(node, r);
00741 p.v_next = null;
00742 return;
00743 }
00744 }
00745 if (shouldParkAfterFailedAcquire(p, node))
00746 version (Ares)
00747 Thread.getThis.suspend();
00748 else
00749 Thread.getThis().pause();
00750 }
00751 } catch (Exception ex) {
00752 cancelAcquire(node);
00753 throw ex;
00754 }
00755 assert ( false );
00756 }
00757
00764 private bool doAcquireSharedNanos(int arg, long nanosTimeout) {
00765 long lastTime = currentTimeNanos();
00766 Node node = addWaiter(Node.SHARED);
00767 try {
00768 for (;;) {
00769 Node p = node.predecessor();
00770 if (p is v_head) {
00771 int r = tryAcquireShared(arg);
00772 if (r >= 0) {
00773 setHeadAndPropagate(node, r);
00774 p.v_next = null;
00775 return true;
00776 }
00777 }
00778 if (nanosTimeout <= 0) {
00779 cancelAcquire(node);
00780 return false;
00781 }
00782 if (shouldParkAfterFailedAcquire(p, node)) {
00783 sleepNanos(nanosTimeout);
00784 long now = currentTimeNanos();
00785 nanosTimeout -= now - lastTime;
00786 lastTime = now;
00787 }
00788 }
00789 } catch (Exception ex) {
00790 cancelAcquire(node);
00791 throw ex;
00792 }
00793 assert ( false );
00794 }
00795
00796
00797
00819 protected bool tryAcquire(int arg) {
00820 throw new UnsupportedOperationException();
00821 return false;
00822 }
00823
00840 protected bool tryRelease(int arg) {
00841 throw new UnsupportedOperationException();
00842 return false;
00843 }
00844
00870 protected int tryAcquireShared(int arg) {
00871 throw new UnsupportedOperationException();
00872 return 0;
00873 }
00874
00890 protected bool tryReleaseShared(int arg) {
00891 throw new UnsupportedOperationException();
00892 return false;
00893 }
00894
00908 bool isHeldExclusively() {
00909 throw new UnsupportedOperationException();
00910 return false;
00911 }
00912
00924 final void acquire(int arg) {
00925 if (!tryAcquire(arg))
00926 acquireQueued(addWaiter(Node.EXCLUSIVE), arg);
00927 }
00928
00944 final bool tryAcquireNanos(int arg, long nanosTimeout) {
00945 return tryAcquire(arg) ||
00946 doAcquireNanos(arg, nanosTimeout);
00947 }
00948
00959 final bool release(int arg) {
00960 if (tryRelease(arg)) {
00961 volatile {
00962 Node h = head;
00963 if (h !is null && h.waitStatus != 0)
00964 unparkSuccessor(h);
00965 }
00966 return true;
00967 }
00968 return false;
00969 }
00970
00981 final void acquireShared(int arg) {
00982 if (tryAcquireShared(arg) < 0)
00983 doAcquireShared(arg);
00984 }
00985
00999 final bool tryAcquireSharedNanos(int arg, long nanosTimeout) {
01000 return tryAcquireShared(arg) >= 0 ||
01001 doAcquireSharedNanos(arg, nanosTimeout);
01002 }
01003
01012 final bool releaseShared(int arg) {
01013 if (tryReleaseShared(arg)) {
01014 volatile {
01015 Node h = head;
01016 if (h !is null && h.waitStatus != 0)
01017 unparkSuccessor(h);
01018 }
01019 return true;
01020 }
01021 return false;
01022 }
01023
01024
01025
01038 final bool hasQueuedThreads() {
01039 volatile return head !is tail;
01040 }
01041
01051 final bool hasContended() {
01052 return v_head !is null;
01053 }
01054
01066 final Thread getFirstQueuedThread() {
01067
01068 volatile return (head is tail)? null : fullGetFirstQueuedThread();
01069 }
01070
01074 private Thread fullGetFirstQueuedThread() {
01075
01076
01077
01078
01079 for (;;) {
01080 volatile {
01081 Node h = head;
01082 if (h is null)
01083 return null;
01084
01085
01086
01087
01088
01089
01090
01091
01092 Node s = h.next;
01093 if (s !is null) {
01094 Thread st = s.thread;
01095 Node sp = s.prev;
01096 if (st !is null && sp is head)
01097 return st;
01098 }
01099
01100
01101
01102
01103
01104
01105
01106 Node t = tail;
01107 if (t is h)
01108 return null;
01109
01110 if (t !is null) {
01111 Thread tt = t.thread;
01112 Node tp = t.prev;
01113 if (tt !is null && tp is head)
01114 return tt;
01115 }
01116 }
01117 }
01118 }
01119
01129 final bool isQueued(Thread thread) {
01130 if (thread is null)
01131 throw new IllegalArgumentException();
01132 for (Node p = v_tail; p !is null;) {
01133 volatile {
01134 if (p.thread is thread)
01135 return true;
01136 p = p.prev;
01137 }
01138 }
01139 return false;
01140 }
01141
01142
01143
01153 final int getQueueLength() {
01154 int n = 0;
01155 for (Node p = v_tail; p !is null;) {
01156 volatile {
01157 if (p.thread !is null)
01158 ++n;
01159 p = p.prev;
01160 }
01161 }
01162 return n;
01163 }
01164
01175 final Thread[] getQueuedThreads() {
01176 Thread[] list;
01177 for (Node p = v_tail; p !is null; ) {
01178 volatile {
01179 Thread t = p.thread;
01180 if (t !is null)
01181 list ~= t;
01182 p = p.prev;
01183 }
01184 }
01185 return list;
01186 }
01187
01195 final Thread[] getExclusiveQueuedThreads() {
01196 Thread[] list;
01197 for (Node p = v_tail; p !is null; ) {
01198 volatile {
01199 if (!p.isShared()) {
01200 Thread t = p.thread;
01201 if (t !is null)
01202 list ~= t;
01203 }
01204 p = p.prev;
01205 }
01206 }
01207 return list;
01208 }
01209
01217 final Thread[] getSharedQueuedThreads() {
01218 Thread[] list;
01219 for (Node p = v_tail; p !is null; ) {
01220 volatile {
01221 if (p.isShared()) {
01222 Thread t = p.thread;
01223 if (t !is null)
01224 list ~=t;
01225 }
01226 p = p.prev;
01227 }
01228 }
01229 return list;
01230 }
01231
01241 char[] toString() {
01242 int s = state();
01243 char[] q = hasQueuedThreads()? "non" : "";
01244
01245
01246 return q;
01247 }
01248
01249
01250
01251
01258 final bool isOnSyncQueue(Node node) {
01259 volatile {
01260 if (node.waitStatus == Node.CONDITION || node.prev is null)
01261 return false;
01262 if (node.next !is null)
01263 return true;
01264 }
01265
01266
01267
01268
01269
01270
01271
01272
01273 return findNodeFromTail(node);
01274 }
01275
01281 private bool findNodeFromTail(Node node) {
01282 Node t = v_tail;
01283 bool res = false;
01284 for (;;) {
01285 if (t is node) {
01286 res = true;
01287 goto exit;
01288 }
01289 if (t is null) {
01290 res = false;
01291 goto exit;
01292 }
01293 t = t.v_prev;
01294 }
01295 exit:
01296 return res;
01297 }
01298
01306 final bool transferForNotify(Node node) {
01307
01308
01309
01310 if (!Atomic.compareAndSet32(&node.waitStatus, Node.CONDITION, 0))
01311 return false;
01312
01313
01314
01315
01316
01317
01318
01319 Node p = enq(node);
01320 int c = p.v_waitStatus;
01321 if (c > 0 || !Atomic.compareAndSet32(&p.waitStatus, c, Node.SIGNAL)) {
01322 Thread t = node.v_thread;
01323 if (t) t.resume();
01324 }
01325 return true;
01326 }
01327
01336 final bool transferAfterCancelledWait(Node node) {
01337 if (Atomic.compareAndSet32(&node.waitStatus, Node.CONDITION, 0)) {
01338 enq(node);
01339 return true;
01340 }
01341
01342
01343
01344
01345
01346
01347 while (!isOnSyncQueue(node))
01348 Thread.yield();
01349 return false;
01350 }
01351
01358 final int fullyRelease(Node node) {
01359 try {
01360 int savedState = state();
01361 if (release(savedState))
01362 return savedState;
01363 } catch(Exception ex) {
01364 node.v_waitStatus = Node.CANCELLED;
01365 throw ex;
01366 }
01367
01368 node.v_waitStatus = Node.CANCELLED;
01369 throw new Exception("illegal monitor state");
01370 }
01371
01372
01373
01380 final bool owns(ConditionObject condition) {
01381 if (condition is null)
01382 throw new IllegalArgumentException();
01383 return condition.isOwnedBy(this);
01384 }
01385
01396 final bool hasWaiters(ConditionObject condition) {
01397 if (!owns(condition))
01398 throw new IllegalArgumentException();
01399 return condition.hasWaiters();
01400 }
01401
01412 final int getWaitQueueLength(ConditionObject condition) {
01413 if (!owns(condition))
01414 throw new IllegalArgumentException();
01415 return condition.getWaitQueueLength();
01416 }
01417
01428 final Thread[] getWaitingThreads(ConditionObject condition) {
01429 if (!owns(condition))
01430 throw new IllegalArgumentException();
01431 return condition.getWaitingThreads();
01432 }
01433
01448 static class ConditionObject : Condition {
01450 private Node firstWaiter;
01452 private Node lastWaiter;
01453 private AbstractLock parent;
01457 this(AbstractLock parent) {
01458 this.parent = parent;
01459 }
01460
01461
01462
01467 private Node addConditionWaiter() {
01468 Node node = new Node(Thread.getThis(), Node.CONDITION);
01469 Node t = lastWaiter;
01470 if (t is null)
01471 firstWaiter = node;
01472 else
01473 t.nextWaiter = node;
01474 lastWaiter = node;
01475 return node;
01476 }
01477
01484 private void doNotify(Node first) {
01485 do {
01486 if ( (firstWaiter = first.nextWaiter) is null)
01487 lastWaiter = null;
01488 first.nextWaiter = null;
01489 } while (!parent.transferForNotify(first) &&
01490 (first = firstWaiter) !is null);
01491 }
01492
01497 private void doNotifyAll(Node first) {
01498 lastWaiter = firstWaiter = null;
01499 do {
01500 Node next = first.nextWaiter;
01501 first.nextWaiter = null;
01502 parent.transferForNotify(first);
01503 first = next;
01504 } while (first !is null);
01505 }
01506
01507
01508
01514 final void notify() {
01515 if (!parent.isHeldExclusively())
01516 throw new IllegalArgumentException();
01517 Node first = firstWaiter;
01518 if (first !is null)
01519 doNotify(first);
01520 }
01521
01526 final void notifyAll() {
01527 if (!parent.isHeldExclusively())
01528 throw new IllegalArgumentException();
01529 Node first = firstWaiter;
01530 if (first !is null)
01531 doNotifyAll(first);
01532 }
01533
01534
01535
01536
01537
01538
01539
01540
01541
01542
01543
01544 final void wait() {
01545 Node node = addConditionWaiter();
01546 int savedState = parent.fullyRelease(node);
01547 while (!parent.isOnSyncQueue(node)) {
01548 version (Ares)
01549 Thread.getThis.suspend();
01550 else
01551 Thread.getThis().pause();
01552 }
01553 parent.acquireQueued(node, savedState);
01554 }
01555
01566 final long waitNanos(long nanosTimeout) {
01567 long lastTime = currentTimeNanos();
01568 Node node = addConditionWaiter();
01569 int savedState = parent.fullyRelease(node);
01570 while (!parent.isOnSyncQueue(node)) {
01571 if (nanosTimeout <= 0L) {
01572 parent.transferAfterCancelledWait(node);
01573 break;
01574 }
01575 sleepNanos(nanosTimeout);
01576 long now = currentTimeNanos();
01577 nanosTimeout -= now - lastTime;
01578 lastTime = now;
01579 }
01580 parent.acquireQueued(node, savedState);
01581 return nanosTimeout - (currentTimeNanos() - lastTime);
01582 }
01583
01596 final bool wait(long time, TimeUnit unit) {
01597 long nanosTimeout = toNanos(time,unit);
01598 Node node = addConditionWaiter();
01599 int savedState = parent.fullyRelease(node);
01600 long lastTime = currentTimeNanos();
01601 bool timedout = false;
01602 while (!parent.isOnSyncQueue(node)) {
01603 if (nanosTimeout <= 0L) {
01604 timedout = parent.transferAfterCancelledWait(node);
01605 break;
01606 }
01607 sleepNanos(nanosTimeout);
01608 long now = currentTimeNanos();
01609 nanosTimeout -= now - lastTime;
01610 lastTime = now;
01611 }
01612 parent.acquireQueued(node, savedState);
01613 return !timedout;
01614 }
01615
01616
01617
01623 final bool isOwnedBy(AbstractLock sync) {
01624
01625 return sync is parent;
01626 }
01627
01633 final bool hasWaiters() {
01634 if (!parent.isHeldExclusively())
01635 throw new IllegalArgumentException();
01636 for (Node w = firstWaiter; w !is null; w = w.nextWaiter) {
01637 if (w.waitStatus == Node.CONDITION)
01638 return true;
01639 }
01640 return false;
01641 }
01642
01649 final int getWaitQueueLength() {
01650 if (!parent.isHeldExclusively())
01651 throw new IllegalArgumentException();
01652 int n = 0;
01653 for (Node w = firstWaiter; w !is null; w = w.nextWaiter) {
01654 if (w.waitStatus == Node.CONDITION)
01655 ++n;
01656 }
01657 return n;
01658 }
01659
01666 final Thread[] getWaitingThreads() {
01667 if (!parent.isHeldExclusively())
01668 throw new IllegalArgumentException();
01669 Thread[] list;
01670 for (Node w = firstWaiter; w !is null; w = w.nextWaiter) {
01671 if (w.waitStatus == Node.CONDITION) {
01672 Thread t = w.thread;
01673 if (t !is null)
01674 list ~= t;
01675 }
01676 }
01677 return list;
01678 }
01679 }
01680 }