00001
00016 module mango.locks.ReadWriteLock;
00017
00018 import mango.locks.Lock;
00019
00060 interface ReadWriteLock {
00066 Lock readLock();
00067
00073 Lock writeLock();
00074 }
00075
00076 private {
00077 import std.thread;
00078
00079 import mango.sys.Atomic;
00080
00081 import mango.locks.Utils;
00082 import mango.locks.LockImpl;
00083 import mango.locks.TimeUnit;
00084 import mango.locks.Exceptions;
00085 }
00086
00194 class ReentrantReadWriteLock : ReadWriteLock {
00195
00196 private final ReentrantReadWriteLock.ReadLock readerLock;
00197
00198 private final ReentrantReadWriteLock.WriteLock writerLock;
00199
00200 private final Sync sync;
00201
00208 this(bool fair = false) {
00209 if (fair)
00210 sync = new FairSync();
00211 else
00212 sync = new NonfairSync();
00213 readerLock = new ReadLock(this);
00214 writerLock = new WriteLock(this);
00215 }
00216
00217 Lock writeLock() { return writerLock; }
00218 Lock readLock() { return readerLock; }
00219
00220
00221
00222
00223
00224
00225
00226
00227 const int SHARED_SHIFT = 16;
00228 const int SHARED_UNIT = (1 << SHARED_SHIFT);
00229 const int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
00230
00232 static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
00234 static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
00235
00236
00237
00238
00239
00240 abstract static class Sync : AbstractLock {
00241
00242 Thread owner;
00243
00244
00245
00246
00247 abstract void wlock();
00248
00249
00250
00251
00252
00253
00254 final bool nonfairTryAcquire(int acquires) {
00255
00256 acquires = exclusiveCount(acquires);
00257 Thread current = Thread.getThis();
00258 int c = state;
00259 int w = exclusiveCount(c);
00260 if (w + acquires >= SHARED_UNIT)
00261 throw new Exception("Maximum lock count exceeded");
00262 if (c != 0 && (w == 0 || current !is owner))
00263 return false;
00264 if (!Atomic.compareAndSet32(&state_, c, c+acquires))
00265 return false;
00266 owner = current;
00267 return true;
00268 }
00269
00270
00271
00272
00273 final int nonfairTryAcquireShared(int acquires) {
00274 for (;;) {
00275 int c = state;
00276 int nextc = c + (acquires << SHARED_SHIFT);
00277 if (nextc < c)
00278 throw new Exception("Maximum lock count exceeded");
00279 if (exclusiveCount(c) != 0 &&
00280 owner !is Thread.getThis())
00281 return -1;
00282 if (Atomic.compareAndSet32(&state_, c, nextc))
00283 return 1;
00284
00285 }
00286 }
00287
00288 protected final bool tryRelease(int releases) {
00289 Thread current = Thread.getThis();
00290 int c = state;
00291 if (owner !is current)
00292 throw new Exception("Illegal monitor state");
00293 int nextc = c - releases;
00294 bool free = false;
00295 if (exclusiveCount(c) == releases) {
00296 free = true;
00297 owner = null;
00298 }
00299 state = nextc;
00300 return free;
00301 }
00302
00303 protected final bool tryReleaseShared(int releases) {
00304 for (;;) {
00305 int c = state;
00306 int nextc = c - (releases << SHARED_SHIFT);
00307 if (nextc < 0)
00308 throw new Exception("Illegal monitor state");
00309 if (Atomic.compareAndSet32(&state_, c, nextc))
00310 return nextc == 0;
00311 }
00312 }
00313
00314 final bool isHeldExclusively() {
00315 return exclusiveCount(state) != 0 &&
00316 owner is Thread.getThis();
00317 }
00318
00319
00320
00321 final ConditionObject newCondition() {
00322 return new ConditionObject(this);
00323 }
00324
00325 final Thread getOwner() {
00326 int c = exclusiveCount(state);
00327 Thread o = owner;
00328 return (c == 0)? null : o;
00329 }
00330
00331 final int getReadLockCount() {
00332 return sharedCount(state);
00333 }
00334
00335 final bool isWriteLocked() {
00336 return exclusiveCount(state) != 0;
00337 }
00338
00339 final int getWriteHoldCount() {
00340 int c = exclusiveCount(state);
00341 Thread o = owner;
00342 return (o is Thread.getThis())? c : 0;
00343 }
00344
00345 final int getCount() { return state; }
00346 }
00347
00348
00349
00350
00351 final static class NonfairSync : Sync {
00352 protected final bool tryAcquire(int acquires) {
00353 return nonfairTryAcquire(acquires);
00354 }
00355
00356 protected final int tryAcquireShared(int acquires) {
00357 return nonfairTryAcquireShared(acquires);
00358 }
00359
00360
00361 final void wlock() {
00362 if (Atomic.compareAndSet32(&state_, 0, 1))
00363 owner = Thread.getThis();
00364 else
00365 acquire(1);
00366 }
00367 }
00368
00369
00370
00371
00372 final static class FairSync : Sync {
00373 protected final bool tryAcquire(int acquires) {
00374
00375 acquires = exclusiveCount(acquires);
00376 Thread current = Thread.getThis();
00377 Thread first;
00378 int c = state;
00379 int w = exclusiveCount(c);
00380 if (w + acquires >= SHARED_UNIT)
00381 throw new Exception("Maximum lock count exceeded");
00382 if ((w == 0 || current !is owner) &&
00383 (c != 0 ||
00384 ((first = getFirstQueuedThread()) !is null &&
00385 first !is current)))
00386 return false;
00387 if (!Atomic.compareAndSet32(&state_, c, c + acquires))
00388 return false;
00389 owner = current;
00390 return true;
00391 }
00392
00393 protected final int tryAcquireShared(int acquires) {
00394 Thread current = Thread.getThis();
00395 for (;;) {
00396 Thread first = getFirstQueuedThread();
00397 if (first !is null && first !is current)
00398 return -1;
00399 int c = state;
00400 int nextc = c + (acquires << SHARED_SHIFT);
00401 if (nextc < c)
00402 throw new Exception("Maximum lock count exceeded");
00403 if (exclusiveCount(c) != 0 &&
00404 owner !is Thread.getThis())
00405 return -1;
00406 if (Atomic.compareAndSet32(&state_, c, nextc))
00407 return 1;
00408
00409 }
00410 }
00411
00412 final void wlock() {
00413 acquire(1);
00414 }
00415 }
00416
00420 static class ReadLock : Lock {
00421 private Sync sync;
00422
00427 this(ReentrantReadWriteLock lock) {
00428 sync = lock.sync;
00429 }
00430
00441 void lock() {
00442 sync.acquireShared(1);
00443 }
00444
00467 bool tryLock() {
00468 return sync.nonfairTryAcquireShared(1) >= 0;
00469 }
00470
00510 bool tryLock(long timeout, TimeUnit unit) {
00511 return sync.tryAcquireSharedNanos(1, toNanos(timeout,unit));
00512 }
00513
00520 void unlock() {
00521 sync.releaseShared(1);
00522 }
00523
00529 Condition newCondition() {
00530 throw new UnsupportedOperationException();
00531 return null;
00532 }
00533
00541 char[] toString() {
00542 char[16] buf;
00543
00544 int r = sync.getReadLockCount();
00545 return super.toString() ~
00546 "[Read locks = " ~ itoa(buf, r) ~ "]";
00547 }
00548
00549 }
00550
00554 static class WriteLock : Lock {
00555 private final Sync sync;
00556
00561 this(ReentrantReadWriteLock lock) {
00562 sync = lock.sync;
00563 }
00564
00581 void lock() {
00582 sync.wlock();
00583 }
00584
00612 bool tryLock( ) {
00613 return sync.nonfairTryAcquire(1);
00614 }
00615
00661 bool tryLock(long timeout, TimeUnit unit) {
00662 return sync.tryAcquireNanos(1, toNanos(timeout,unit));
00663 }
00664
00672 void unlock() {
00673 sync.release(1);
00674 }
00675
00682 Condition newCondition() {
00683 return sync.newCondition();
00684 }
00685
00693 char[] toString() {
00694 Thread owner = sync.getOwner();
00695 return super.toString() ~ ((owner is null) ?
00696 "[Unlocked]" :
00697 ("[Locked by thread" ~ owner.toString() ~ "]"));
00698 }
00699
00700 }
00701
00702
00703
00704
00709 final bool isFair() {
00710 return (cast(FairSync)sync) !is null;
00711 }
00712
00722 protected Thread getOwner() {
00723 return sync.getOwner();
00724 }
00725
00732 int getReadLockCount() {
00733 return sync.getReadLockCount();
00734 }
00735
00743 bool isWriteLocked() {
00744 return sync.isWriteLocked();
00745 }
00746
00752 bool isWriteLockedByCurrentThread() {
00753 return sync.isHeldExclusively();
00754 }
00755
00764 int getWriteHoldCount() {
00765 return sync.getWriteHoldCount();
00766 }
00767
00778 protected Thread[] getQueuedWriterThreads() {
00779 return sync.getExclusiveQueuedThreads();
00780 }
00781
00792 protected Thread[] getQueuedReaderThreads() {
00793 return sync.getSharedQueuedThreads();
00794 }
00795
00806 final bool hasQueuedThreads() {
00807 return sync.hasQueuedThreads();
00808 }
00809
00820 final bool hasQueuedThread(Thread thread) {
00821 return sync.isQueued(thread);
00822 }
00823
00833 final int getQueueLength() {
00834 return sync.getQueueLength();
00835 }
00836
00847 protected Thread[] getQueuedThreads() {
00848 return sync.getQueuedThreads();
00849 }
00850
00861 bool hasWaiters(Condition condition) {
00862 if ((condition is null) ||
00863 (cast(AbstractLock.ConditionObject)condition is null))
00864 throw new IllegalArgumentException();
00865 return sync.hasWaiters(cast(AbstractLock.ConditionObject)condition);
00866 }
00867
00878 int getWaitQueueLength(Condition condition) {
00879 if ((condition is null) ||
00880 (cast(AbstractLock.ConditionObject)condition is null))
00881 throw new IllegalArgumentException();
00882 return sync.getWaitQueueLength(cast(AbstractLock.ConditionObject)condition);
00883 }
00884
00897 protected Thread[] getWaitingThreads(Condition condition) {
00898 if ((condition is null) ||
00899 (cast(AbstractLock.ConditionObject)condition is null))
00900 throw new IllegalArgumentException();
00901 return sync.getWaitingThreads(cast(AbstractLock.ConditionObject)condition);
00902 }
00903
00912 char[] toString() {
00913 char[16] buf1;
00914 char[16] buf2;
00915 int c = sync.getCount();
00916 int w = exclusiveCount(c);
00917 int r = sharedCount(c);
00918
00919 return super.toString() ~
00920 "[Write locks = " ~ itoa(buf1, w) ~ ", Read locks = "
00921 ~ itoa(buf2, r) ~ "]";
00922 }
00923
00924 }