00001 00013 module mango.locks.Semaphore; 00014 00015 private { 00016 import std.thread; 00017 00018 import mango.sys.Atomic; 00019 00020 import mango.locks.Utils; 00021 import mango.locks.LockImpl; 00022 import mango.locks.TimeUnit; 00023 import mango.locks.Exceptions; 00024 } 00025 00137 class Semaphore { 00139 private Sync sync; 00140 00146 abstract class Sync : AbstractLock { 00147 this(int permits) { 00148 state = permits; 00149 } 00150 00151 final int getPermits() { 00152 return state; 00153 } 00154 00155 final int nonfairTryAcquireShared(int acquires) { 00156 for (;;) { 00157 int available = state; 00158 int remaining = available - acquires; 00159 if (remaining < 0 || 00160 Atomic.compareAndSet32(&state_, available, remaining)) 00161 return remaining; 00162 } 00163 } 00164 00165 protected final bool tryReleaseShared(int releases) { 00166 for (;;) { 00167 int p = state; 00168 if (Atomic.compareAndSet32(&state_, p, p + releases)) 00169 return true; 00170 } 00171 } 00172 00173 final void reducePermits(int reductions) { 00174 for (;;) { 00175 int current = state; 00176 if (Atomic.compareAndSet32(&state_, current, current - reductions)) 00177 return; 00178 } 00179 } 00180 00181 final int drainPermits() { 00182 for (;;) { 00183 int current = state; 00184 if (current == 0 || Atomic.compareAndSet32(&state_, current, 0)) 00185 return current; 00186 } 00187 } 00188 } 00189 00193 final class NonfairSync : Sync { 00194 this(int permits) { 00195 super(permits); 00196 } 00197 00198 protected int tryAcquireShared(int acquires) { 00199 return nonfairTryAcquireShared(acquires); 00200 } 00201 } 00202 00206 final class FairSync : Sync { 00207 this(int permits) { 00208 super(permits); 00209 } 00210 00211 protected int tryAcquireShared(int acquires) { 00212 Thread current = Thread.getThis(); 00213 for (;;) { 00214 Thread first = getFirstQueuedThread(); 00215 if (first !is null && first !is current) 00216 return -1; 00217 int available = state; 00218 int remaining = available - acquires; 00219 if (remaining < 0 || 00220 Atomic.compareAndSet32(&state_, available, remaining)) 00221 return remaining; 00222 } 00223 } 00224 } 00225 00237 this(int permits, bool fair = false) { 00238 if (fair) 00239 sync = new FairSync(permits); 00240 else 00241 sync = new NonfairSync(permits); 00242 } 00243 00260 void acquire(int permits = 1) { 00261 if (permits < 0) 00262 throw new IllegalArgumentException(); 00263 sync.acquireShared(permits); 00264 } 00265 00291 bool tryAcquire(int permits = 1) { 00292 if (permits < 0) 00293 throw new IllegalArgumentException(); 00294 return sync.nonfairTryAcquireShared(permits) >= 0; 00295 } 00296 00329 bool tryAcquire(long timeout, TimeUnit unit, int permits = 1) { 00330 if (permits < 0) 00331 throw new IllegalArgumentException(); 00332 return sync.tryAcquireSharedNanos(permits, toNanos(timeout,unit)); 00333 } 00334 00359 void release(int permits = 1) { 00360 if (permits < 0) 00361 throw new IllegalArgumentException(); 00362 sync.releaseShared(permits); 00363 } 00364 00370 int availablePermits() { 00371 return sync.getPermits(); 00372 } 00373 00378 int drainPermits() { 00379 return sync.drainPermits(); 00380 } 00381 00390 protected void reducePermits(int reduction) { 00391 if (reduction < 0) 00392 throw new IllegalArgumentException(); 00393 sync.reducePermits(reduction); 00394 } 00395 00400 bool isFair() { 00401 return (cast(FairSync)sync) !is null; 00402 } 00403 00414 final bool hasQueuedThreads() { 00415 return sync.hasQueuedThreads(); 00416 } 00417 00427 final int getQueueLength() { 00428 return sync.getQueueLength(); 00429 } 00430 00441 protected Thread[] getQueuedThreads() { 00442 return sync.getQueuedThreads(); 00443 } 00444 00452 char[] toString() { 00453 char[16] buf; 00454 return super.toString() ~ "[Permits = " ~ 00455 itoa(buf, sync.getPermits()) ~ "]"; 00456 } 00457 00458 unittest { 00459 Semaphore sem = new Semaphore(2); 00460 int done = 0; 00461 Thread[] t = new Thread[6]; 00462 00463 ThreadReturn f() { 00464 int n; 00465 Thread tt = Thread.getThis(); 00466 for (n=0; n < t.length; n++) { 00467 if (tt is t[n]) 00468 break; 00469 } 00470 sleepNanos(toNanos(10,TimeUnit.MilliSeconds)); 00471 version (LocksVerboseUnittest) 00472 printf(" thread %d started\n",n); 00473 sem.acquire(); 00474 sleepNanos(toNanos(10,TimeUnit.MilliSeconds)); 00475 version (LocksVerboseUnittest) { 00476 printf(" thread %d aquired\n",n); 00477 printf(" thread %d terminating\n",n); 00478 } 00479 return 0; 00480 } 00481 00482 ThreadReturn f2() { 00483 int n; 00484 Thread tt = Thread.getThis(); 00485 for (n=0; n < t.length; n++) { 00486 if (tt is t[n]) 00487 break; 00488 } 00489 version (LocksVerboseUnittest) 00490 printf(" thread %d releasing\n",n); 00491 sleepNanos(toNanos(10,TimeUnit.MilliSeconds)); 00492 sem.release(); 00493 version (LocksVerboseUnittest) 00494 printf(" thread %d terminating\n",n); 00495 sleepNanos(toNanos(10,TimeUnit.MilliSeconds)); 00496 return 0; 00497 } 00498 int n; 00499 for (n=0; n<t.length/2; n++) { 00500 t[n] = new Thread(&f); 00501 } 00502 for (; n<t.length; n++) { 00503 t[n] = new Thread(&f2); 00504 } 00505 version (LocksVerboseUnittest) 00506 printf("starting locks.semaphore unittest\n"); 00507 for (n=0; n<t.length/2; n++) { 00508 t[n].start(); 00509 } 00510 Thread.yield(); 00511 for (; n<t.length; n++) { 00512 t[n].start(); 00513 Thread.yield(); 00514 } 00515 00516 foreach(int n, Thread thread; t) 00517 { 00518 version (LocksVerboseUnittest) 00519 printf(" waiting on %d\n", n); 00520 version (Ares) 00521 thread.join(); 00522 else 00523 thread.wait(); 00524 } 00525 00526 version (LocksVerboseUnittest) 00527 printf("finished locks.semaphore unittest\n"); 00528 delete sem; 00529 t[] = null; 00530 } 00531 }