Main Page | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Class Members | File Members | Related Pages

Exchanger.d

Go to the documentation of this file.
00001 
00014 module mango.locks.Exchanger;
00015 private {
00016   import mango.locks.Utils;
00017   import mango.locks.TimeUnit;
00018   import mango.locks.Exceptions;
00019   import mango.locks.ReentrantLock;
00020 
00021   // for unittest
00022   import std.thread;
00023   import mango.locks.Countdown;
00024 }
00025 
00066 class Exchanger(Value) {
00067   private ReentrantLock lock;
00068   private Condition taken;
00069 
00071   private Value item;
00072     
00077   private int arrivalCount;
00078 
00082   private Value doExchange(Value x, bool timed, long nanos) {
00083     lock.lock();
00084     try {
00085       Value other;
00086 
00087       // If arrival count already at two, we must wait for
00088       // a previous pair to finish and reset the count;
00089       while (arrivalCount == 2) {
00090         if (!timed)
00091           taken.wait();
00092         else if (nanos > 0) 
00093           nanos = taken.waitNanos(nanos);
00094         else 
00095           throw new TimeoutException();
00096       }
00097 
00098       int count = ++arrivalCount;
00099 
00100       // If item is already waiting, replace it and signal other thread
00101       if (count == 2) { 
00102         other = item;
00103         item = x;
00104         taken.notify();
00105         return other;
00106       }
00107 
00108       // Otherwise, set item and wait for another thread to
00109       // replace it and signal us.
00110 
00111       item = x;
00112       while (arrivalCount != 2) {
00113         if (!timed)
00114           taken.wait();
00115         else if (nanos > 0) 
00116           nanos = taken.waitNanos(nanos);
00117         else 
00118           break; // timed out
00119       }
00120 
00121       // Get and reset item and count after the wait.
00122       // (We need to do this even if wait was aborted.)
00123       other = item;
00124       item = Value.init;
00125       count = arrivalCount;
00126       arrivalCount = 0; 
00127       taken.notify();
00128             
00129       // If the other thread replaced item, then we must
00130       // continue even if cancelled.
00131       if (count == 2) {
00132         return other;
00133       }
00134 
00135       // If no one is waiting for us, we can back out
00136       throw new TimeoutException();
00137     } finally {
00138       lock.unlock();
00139     }
00140   }
00141 
00145   this() {
00146     lock = new ReentrantLock();
00147     taken = lock.newCondition();
00148   }
00149 
00166   Value exchange(Value x) {
00167     return doExchange(x, false, 0);
00168   }
00169 
00197   Value exchange(Value x, long timeout, TimeUnit unit) {
00198     return doExchange(x, true, toNanos(timeout,unit));
00199   }
00200 
00201 }
00202 
00203 unittest {
00204   version (LocksVerboseUnittest)
00205     printf("starting locks.exchanger unittest\n");
00206   Exchanger!(int) ex = new Exchanger!(int);
00207   CountDownLatch done = new CountDownLatch(2);
00208 
00209   Thread t1 = new Thread( delegate ThreadReturn() {
00210     int my_val = 10;
00211     my_val = ex.exchange(my_val);
00212     assert( my_val == 20 );
00213     done.countDown();
00214     return 0;
00215   });
00216   Thread t2 = new Thread( delegate ThreadReturn() {
00217     int my_val = 20;
00218     my_val = ex.exchange(my_val);
00219     assert( my_val == 10 );
00220     done.countDown();
00221     return 0;
00222   });
00223   t1.start();
00224   t2.start();
00225   done.wait();
00226   version (LocksVerboseUnittest)
00227     printf("finished locks.exchanger unittest\n");
00228 }
00229 
00230 

Generated on Mon Nov 14 10:59:37 2005 for Mango by  doxygen 1.4.0