00001
00014 module mango.locks.Exchanger;
00015 private {
00016 import mango.locks.Utils;
00017 import mango.locks.TimeUnit;
00018 import mango.locks.Exceptions;
00019 import mango.locks.ReentrantLock;
00020
00021
00022 import std.thread;
00023 import mango.locks.Countdown;
00024 }
00025
00066 class Exchanger(Value) {
00067 private ReentrantLock lock;
00068 private Condition taken;
00069
00071 private Value item;
00072
00077 private int arrivalCount;
00078
00082 private Value doExchange(Value x, bool timed, long nanos) {
00083 lock.lock();
00084 try {
00085 Value other;
00086
00087
00088
00089 while (arrivalCount == 2) {
00090 if (!timed)
00091 taken.wait();
00092 else if (nanos > 0)
00093 nanos = taken.waitNanos(nanos);
00094 else
00095 throw new TimeoutException();
00096 }
00097
00098 int count = ++arrivalCount;
00099
00100
00101 if (count == 2) {
00102 other = item;
00103 item = x;
00104 taken.notify();
00105 return other;
00106 }
00107
00108
00109
00110
00111 item = x;
00112 while (arrivalCount != 2) {
00113 if (!timed)
00114 taken.wait();
00115 else if (nanos > 0)
00116 nanos = taken.waitNanos(nanos);
00117 else
00118 break;
00119 }
00120
00121
00122
00123 other = item;
00124 item = Value.init;
00125 count = arrivalCount;
00126 arrivalCount = 0;
00127 taken.notify();
00128
00129
00130
00131 if (count == 2) {
00132 return other;
00133 }
00134
00135
00136 throw new TimeoutException();
00137 } finally {
00138 lock.unlock();
00139 }
00140 }
00141
00145 this() {
00146 lock = new ReentrantLock();
00147 taken = lock.newCondition();
00148 }
00149
00166 Value exchange(Value x) {
00167 return doExchange(x, false, 0);
00168 }
00169
00197 Value exchange(Value x, long timeout, TimeUnit unit) {
00198 return doExchange(x, true, toNanos(timeout,unit));
00199 }
00200
00201 }
00202
00203 unittest {
00204 version (LocksVerboseUnittest)
00205 printf("starting locks.exchanger unittest\n");
00206 Exchanger!(int) ex = new Exchanger!(int);
00207 CountDownLatch done = new CountDownLatch(2);
00208
00209 Thread t1 = new Thread( delegate ThreadReturn() {
00210 int my_val = 10;
00211 my_val = ex.exchange(my_val);
00212 assert( my_val == 20 );
00213 done.countDown();
00214 return 0;
00215 });
00216 Thread t2 = new Thread( delegate ThreadReturn() {
00217 int my_val = 20;
00218 my_val = ex.exchange(my_val);
00219 assert( my_val == 10 );
00220 done.countDown();
00221 return 0;
00222 });
00223 t1.start();
00224 t2.start();
00225 done.wait();
00226 version (LocksVerboseUnittest)
00227 printf("finished locks.exchanger unittest\n");
00228 }
00229
00230