Main Page | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Class Members | File Members | Related Pages

Semaphore.d

Go to the documentation of this file.
00001 
00013 module mango.locks.Semaphore;
00014 
00015 private {
00016   import std.thread;
00017 
00018   import mango.sys.Atomic;
00019 
00020   import mango.locks.Utils;
00021   import mango.locks.LockImpl;
00022   import mango.locks.TimeUnit;
00023   import mango.locks.Exceptions;
00024 }
00025 
00137 class Semaphore {
00139   private Sync sync;
00140 
00146   abstract class Sync : AbstractLock {
00147     this(int permits) {
00148       state = permits;
00149     }
00150         
00151     final int getPermits() {
00152       return state;
00153     }
00154 
00155     final int nonfairTryAcquireShared(int acquires) {
00156       for (;;) {
00157         int available = state;
00158         int remaining = available - acquires;
00159         if (remaining < 0 ||
00160             Atomic.compareAndSet32(&state_, available, remaining))
00161           return remaining;
00162       }
00163     }
00164         
00165     protected final bool tryReleaseShared(int releases) {
00166       for (;;) {
00167         int p = state;
00168         if (Atomic.compareAndSet32(&state_, p, p + releases)) 
00169           return true;
00170       }
00171     }
00172 
00173     final void reducePermits(int reductions) {
00174       for (;;) {
00175         int current = state;
00176         if (Atomic.compareAndSet32(&state_, current, current - reductions))
00177           return;
00178       }
00179     }
00180 
00181     final int drainPermits() {
00182       for (;;) {
00183         int current = state;
00184         if (current == 0 || Atomic.compareAndSet32(&state_, current, 0))
00185           return current;
00186       }
00187     }
00188   }
00189 
00193   final class NonfairSync : Sync {
00194     this(int permits) {
00195       super(permits);
00196     }
00197        
00198     protected int tryAcquireShared(int acquires) {
00199       return nonfairTryAcquireShared(acquires);
00200     }
00201   }
00202 
00206   final class FairSync : Sync {
00207     this(int permits) {
00208       super(permits);
00209     }
00210         
00211     protected int tryAcquireShared(int acquires) {
00212       Thread current = Thread.getThis();
00213       for (;;) {
00214         Thread first = getFirstQueuedThread();
00215         if (first !is null && first !is current)
00216           return -1;
00217         int available = state;
00218         int remaining = available - acquires;
00219         if (remaining < 0 ||
00220             Atomic.compareAndSet32(&state_, available, remaining))
00221           return remaining;
00222       }
00223     }
00224   }
00225 
00237   this(int permits, bool fair = false) { 
00238     if (fair)
00239       sync = new FairSync(permits);
00240     else
00241       sync = new NonfairSync(permits);
00242   }
00243 
00260   void acquire(int permits = 1) {
00261     if (permits < 0)
00262       throw new IllegalArgumentException();
00263     sync.acquireShared(permits);
00264   }
00265 
00291   bool tryAcquire(int permits = 1) {
00292     if (permits < 0)
00293       throw new IllegalArgumentException();
00294     return sync.nonfairTryAcquireShared(permits) >= 0;
00295   }
00296 
00329   bool tryAcquire(long timeout, TimeUnit unit, int permits = 1) {
00330     if (permits < 0)
00331       throw new IllegalArgumentException();
00332     return sync.tryAcquireSharedNanos(permits, toNanos(timeout,unit));
00333   }
00334 
00359   void release(int permits = 1) {
00360     if (permits < 0)
00361       throw new IllegalArgumentException();
00362     sync.releaseShared(permits);
00363   }
00364 
00370   int availablePermits() {
00371     return sync.getPermits();
00372   }
00373 
00378   int drainPermits() {
00379     return sync.drainPermits();
00380   }
00381 
00390   protected void reducePermits(int reduction) {
00391     if (reduction < 0)
00392       throw new IllegalArgumentException();
00393     sync.reducePermits(reduction);
00394   }
00395 
00400   bool isFair() {
00401     return (cast(FairSync)sync) !is null;
00402   }
00403 
00414   final bool hasQueuedThreads() { 
00415     return sync.hasQueuedThreads();
00416   }
00417 
00427   final int getQueueLength() {
00428     return sync.getQueueLength();
00429   }
00430 
00441   protected Thread[] getQueuedThreads() {
00442     return sync.getQueuedThreads();
00443   }
00444 
00452   char[] toString() {
00453     char[16] buf;
00454     return super.toString() ~ "[Permits = " ~ 
00455       itoa(buf, sync.getPermits()) ~ "]";
00456   }
00457 
00458   unittest {
00459     Semaphore sem = new Semaphore(2);
00460     int done = 0;
00461     Thread[] t = new Thread[6];
00462 
00463     ThreadReturn f() {
00464       int n;
00465       Thread tt = Thread.getThis();
00466       for (n=0; n < t.length; n++) {
00467         if (tt is t[n])
00468           break;
00469       }
00470       sleepNanos(toNanos(10,TimeUnit.MilliSeconds));
00471       version (LocksVerboseUnittest)
00472         printf(" thread %d started\n",n);
00473       sem.acquire();
00474       sleepNanos(toNanos(10,TimeUnit.MilliSeconds));
00475       version (LocksVerboseUnittest) {
00476         printf(" thread %d aquired\n",n);
00477         printf(" thread %d terminating\n",n);
00478       }
00479       return 0;
00480     }
00481 
00482     ThreadReturn f2() {
00483       int n;
00484       Thread tt = Thread.getThis();
00485       for (n=0; n < t.length; n++) {
00486         if (tt is t[n])
00487           break;
00488       }
00489       version (LocksVerboseUnittest)
00490         printf(" thread %d releasing\n",n);
00491       sleepNanos(toNanos(10,TimeUnit.MilliSeconds));
00492       sem.release();
00493       version (LocksVerboseUnittest)
00494         printf(" thread %d terminating\n",n);
00495       sleepNanos(toNanos(10,TimeUnit.MilliSeconds));
00496       return 0;
00497     }
00498     int n;
00499     for (n=0; n<t.length/2; n++) {
00500       t[n] = new Thread(&f);
00501     }
00502     for (; n<t.length; n++) {
00503       t[n] = new Thread(&f2);
00504     }
00505     version (LocksVerboseUnittest)
00506       printf("starting locks.semaphore unittest\n");
00507     for (n=0; n<t.length/2; n++) {
00508       t[n].start();
00509     }
00510     Thread.yield();
00511     for (; n<t.length; n++) {
00512       t[n].start();
00513       Thread.yield();
00514     }
00515 
00516     foreach(int n, Thread thread; t)
00517       {
00518         version (LocksVerboseUnittest)
00519           printf(" waiting on %d\n", n);
00520         version (Ares)
00521                  thread.join();
00522               else
00523                  thread.wait();
00524       }
00525 
00526     version (LocksVerboseUnittest)
00527       printf("finished locks.semaphore unittest\n");
00528     delete sem;
00529     t[] = null;
00530   }
00531 }

Generated on Mon Nov 14 10:59:40 2005 for Mango by  doxygen 1.4.0