Main Page | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Class Members | File Members | Related Pages

CyclicBarrier.d

Go to the documentation of this file.
00001 
00015 module mango.locks.CyclicBarrier;
00016 
00017 class Error : Object
00018 {
00019         private char[] msg;
00020 
00021         this (char[] msg)
00022         {
00023                 this.msg = msg;
00024         }
00025 
00026         char[] toString()
00027         {
00028                 return msg;
00029         }
00030 }
00031 
00032 private {
00033   import std.thread;
00034   import mango.locks.Utils;
00035   import mango.locks.Exceptions;
00036   import mango.locks.ReentrantLock;
00037   import mango.locks.TimeUnit;
00038 }
00039 
00111 class CyclicBarrier {
00112 
00114   private ReentrantLock lock;
00116   private Condition trip;
00118   private int parties_;
00119   /* The command to run when tripped */
00120   private int delegate() barrierCommand;
00121 
00126   private long generation; 
00127 
00131   private bool broken; 
00132 
00137   private int count; 
00138 
00142   private void nextGeneration() {
00143     count = parties_;
00144     ++generation;
00145     trip.notifyAll();
00146   }
00147 
00151   private void breakBarrier() {
00152     broken = true;
00153     trip.notifyAll();
00154   }
00155 
00159   private int dowait(bool timed, long nanos) {
00160     ReentrantLock lock = this.lock;
00161     lock.lock();
00162     try {
00163       int index = --count;
00164       long g = generation;
00165 
00166       if (broken) 
00167         throw new BrokenBarrierException();
00168 
00169       //            if (Thread.interrupted()) {
00170       //                breakBarrier();
00171       //                throw new InterruptedException();
00172       //            }
00173 
00174       if (index == 0) {  // tripped
00175         nextGeneration();
00176         bool ranAction = false;
00177         try {
00178           int delegate() command = barrierCommand;
00179           if (command !is null) 
00180             command();
00181           ranAction = true;
00182           return 0;
00183         } finally {
00184           if (!ranAction)
00185             breakBarrier();
00186         }
00187       }
00188 
00189       for (;;) {
00190         try {
00191           if (!timed) 
00192             trip.wait();
00193           //                    else if (nanos > 0L)
00194           //                        nanos = trip.waitNanos(nanos);
00195         } catch (WaitException ie) {
00196           breakBarrier();
00197           throw ie;
00198         }
00199                 
00200         if (broken || 
00201             g > generation) // true if a reset occurred while waiting
00202           throw new BrokenBarrierException();
00203 
00204         if (g < generation)
00205           return index;
00206 
00207         if (timed && nanos <= 0L) {
00208           breakBarrier();
00209           throw new TimeoutException();
00210         }
00211       }
00212 
00213     } finally {
00214       lock.unlock();
00215     }
00216   }
00217 
00218 
00230   this(int parties, int delegate() barrierAction = null) {
00231     if (parties <= 0)
00232       throw new IllegalArgumentException();
00233     this.lock = new ReentrantLock();
00234     this.trip = lock.newCondition();
00235     this.parties_ = parties; 
00236     this.count = parties;
00237     this.barrierCommand = barrierAction;
00238   }
00239 
00244   int parties() {
00245     return parties_;
00246   }
00247 
00276   int wait() {
00277     try {
00278       return dowait(false, 0L);
00279     } catch (TimeoutException toe) {
00280       throw new Error("Timeout"); // cannot happen;
00281     }
00282   }
00283 
00315   int wait(long timeout, TimeUnit unit) {
00316     return dowait(true, toNanos(timeout,unit));
00317   }
00318 
00326   bool isBroken() {
00327     ReentrantLock lock = this.lock;
00328     lock.lock();
00329     try {
00330       return broken;
00331     } finally {
00332       lock.unlock();
00333     }
00334   }
00335 
00345   void reset() {
00346     ReentrantLock lock = this.lock;
00347     lock.lock();
00348     try {
00349       /*
00350        * Retract generation number enough to cover threads
00351        * currently waiting on current and still resuming from
00352        * previous generation, plus similarly accommodating spans
00353        * after the reset.
00354        */
00355       generation -= 4;
00356       broken = false;
00357       trip.notifyAll();
00358     } finally {
00359       lock.unlock();
00360     }
00361   }
00362 
00369   int getNumberWaiting() {
00370     ReentrantLock lock = this.lock;
00371     lock.lock();
00372     try {
00373       return parties_ - count;
00374     } finally {
00375       lock.unlock();
00376     }
00377   }
00378 
00379   unittest {
00380     double Solver(float[][] matrix) {
00381       int N = matrix.length;
00382       float[] totals;
00383       totals.length = N;
00384       float grand_total = 0;
00385       bool done = false;
00386       // nested function to run after barrier is triggered
00387       int mergeRows() {
00388         for (int k=0;k<N;k++)
00389           grand_total += totals[k];
00390         done = true;
00391         return 0;
00392       }
00393       CyclicBarrier barrier = new CyclicBarrier(N,&mergeRows);
00394       bool gotRow = false;
00395       int i;
00396       // nested function to run in each thread
00397       ThreadReturn workerFcn() {
00398         int myRow;
00399         myRow = i;
00400         gotRow = true;
00401         for (int k=0;k<N;k++) {
00402           totals[myRow] += matrix[myRow][k];
00403         }
00404 
00405         try {
00406           barrier.wait(); 
00407         }
00408         catch (WaitException ex) { return 0; }
00409         catch (BrokenBarrierException ex) { return 0; }
00410         return 0;
00411       }
00412       for (i = 0; i < N; ++i) {
00413         //    for (int k=1;k<1000;k++) Thread.yield();
00414         gotRow = false;
00415         (new Thread(&workerFcn)).start();
00416         while (!gotRow) Thread.yield();
00417         //    for (int k=1;k<1000;k++) Thread.yield();
00418       }
00419       while (!done) Thread.yield();
00420       return grand_total;
00421     }
00422 
00423     version (LocksVerboseUnittest)
00424       printf("started locks.cyclicbarrier unittest\n");
00425     float[][] mat;
00426     mat.length = 3;
00427     for (int k=0;k<3;k++)
00428       mat[k].length = 3;
00429     for (int j=0;j<3;j++) {
00430       for (int k=0;k<3;k++)
00431         mat[j][k] = j+k;
00432     }
00433     assert(18 == Solver(mat));
00434     version (LocksVerboseUnittest)
00435       printf("finished locks.cyclicbarrier unittest\n");
00436   }
00437 }
00438 
00439 

Generated on Fri Nov 11 18:44:19 2005 for Mango by  doxygen 1.4.0