00001
00015 module mango.locks.CyclicBarrier;
00016
00017 class Error : Object
00018 {
00019 private char[] msg;
00020
00021 this (char[] msg)
00022 {
00023 this.msg = msg;
00024 }
00025
00026 char[] toString()
00027 {
00028 return msg;
00029 }
00030 }
00031
00032 private {
00033 import std.thread;
00034 import mango.locks.Utils;
00035 import mango.locks.Exceptions;
00036 import mango.locks.ReentrantLock;
00037 import mango.locks.TimeUnit;
00038 }
00039
00111 class CyclicBarrier {
00112
00114 private ReentrantLock lock;
00116 private Condition trip;
00118 private int parties_;
00119
00120 private int delegate() barrierCommand;
00121
00126 private long generation;
00127
00131 private bool broken;
00132
00137 private int count;
00138
00142 private void nextGeneration() {
00143 count = parties_;
00144 ++generation;
00145 trip.notifyAll();
00146 }
00147
00151 private void breakBarrier() {
00152 broken = true;
00153 trip.notifyAll();
00154 }
00155
00159 private int dowait(bool timed, long nanos) {
00160 ReentrantLock lock = this.lock;
00161 lock.lock();
00162 try {
00163 int index = --count;
00164 long g = generation;
00165
00166 if (broken)
00167 throw new BrokenBarrierException();
00168
00169
00170
00171
00172
00173
00174 if (index == 0) {
00175 nextGeneration();
00176 bool ranAction = false;
00177 try {
00178 int delegate() command = barrierCommand;
00179 if (command !is null)
00180 command();
00181 ranAction = true;
00182 return 0;
00183 } finally {
00184 if (!ranAction)
00185 breakBarrier();
00186 }
00187 }
00188
00189 for (;;) {
00190 try {
00191 if (!timed)
00192 trip.wait();
00193
00194
00195 } catch (WaitException ie) {
00196 breakBarrier();
00197 throw ie;
00198 }
00199
00200 if (broken ||
00201 g > generation)
00202 throw new BrokenBarrierException();
00203
00204 if (g < generation)
00205 return index;
00206
00207 if (timed && nanos <= 0L) {
00208 breakBarrier();
00209 throw new TimeoutException();
00210 }
00211 }
00212
00213 } finally {
00214 lock.unlock();
00215 }
00216 }
00217
00218
00230 this(int parties, int delegate() barrierAction = null) {
00231 if (parties <= 0)
00232 throw new IllegalArgumentException();
00233 this.lock = new ReentrantLock();
00234 this.trip = lock.newCondition();
00235 this.parties_ = parties;
00236 this.count = parties;
00237 this.barrierCommand = barrierAction;
00238 }
00239
00244 int parties() {
00245 return parties_;
00246 }
00247
00276 int wait() {
00277 try {
00278 return dowait(false, 0L);
00279 } catch (TimeoutException toe) {
00280 throw new Error("Timeout");
00281 }
00282 }
00283
00315 int wait(long timeout, TimeUnit unit) {
00316 return dowait(true, toNanos(timeout,unit));
00317 }
00318
00326 bool isBroken() {
00327 ReentrantLock lock = this.lock;
00328 lock.lock();
00329 try {
00330 return broken;
00331 } finally {
00332 lock.unlock();
00333 }
00334 }
00335
00345 void reset() {
00346 ReentrantLock lock = this.lock;
00347 lock.lock();
00348 try {
00349
00350
00351
00352
00353
00354
00355 generation -= 4;
00356 broken = false;
00357 trip.notifyAll();
00358 } finally {
00359 lock.unlock();
00360 }
00361 }
00362
00369 int getNumberWaiting() {
00370 ReentrantLock lock = this.lock;
00371 lock.lock();
00372 try {
00373 return parties_ - count;
00374 } finally {
00375 lock.unlock();
00376 }
00377 }
00378
00379 unittest {
00380 double Solver(float[][] matrix) {
00381 int N = matrix.length;
00382 float[] totals;
00383 totals.length = N;
00384 float grand_total = 0;
00385 bool done = false;
00386
00387 int mergeRows() {
00388 for (int k=0;k<N;k++)
00389 grand_total += totals[k];
00390 done = true;
00391 return 0;
00392 }
00393 CyclicBarrier barrier = new CyclicBarrier(N,&mergeRows);
00394 bool gotRow = false;
00395 int i;
00396
00397 ThreadReturn workerFcn() {
00398 int myRow;
00399 myRow = i;
00400 gotRow = true;
00401 for (int k=0;k<N;k++) {
00402 totals[myRow] += matrix[myRow][k];
00403 }
00404
00405 try {
00406 barrier.wait();
00407 }
00408 catch (WaitException ex) { return 0; }
00409 catch (BrokenBarrierException ex) { return 0; }
00410 return 0;
00411 }
00412 for (i = 0; i < N; ++i) {
00413
00414 gotRow = false;
00415 (new Thread(&workerFcn)).start();
00416 while (!gotRow) Thread.yield();
00417
00418 }
00419 while (!done) Thread.yield();
00420 return grand_total;
00421 }
00422
00423 version (LocksVerboseUnittest)
00424 printf("started locks.cyclicbarrier unittest\n");
00425 float[][] mat;
00426 mat.length = 3;
00427 for (int k=0;k<3;k++)
00428 mat[k].length = 3;
00429 for (int j=0;j<3;j++) {
00430 for (int k=0;k<3;k++)
00431 mat[j][k] = j+k;
00432 }
00433 assert(18 == Solver(mat));
00434 version (LocksVerboseUnittest)
00435 printf("finished locks.cyclicbarrier unittest\n");
00436 }
00437 }
00438
00439