00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 module mango.cluster.qos.socket.ClusterCache;
00037
00038 private import std.thread;
00039
00040 private import mango.base.System;
00041
00042 private import mango.utils.Random;
00043
00044 private import mango.cache.HashMap,
00045 mango.cache.Payload,
00046 mango.cache.QueuedCache;
00047
00048 private import mango.cache.model.IPayload;
00049
00050 private import mango.cluster.CacheInvalidatee;
00051
00052 private import mango.cluster.qos.socket.Cluster;
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062 class ClusterCache
00063 {
00064 private Cluster cluster;
00065 private HashMap cacheSet;
00066 private uint defaultSize;
00067
00068
00069
00070
00071
00072 this (Cluster cluster, uint defaultSize)
00073 {
00074 this.cacheSet = new HashMap (256);
00075 this.defaultSize = defaultSize;
00076 this.cluster = cluster;
00077 }
00078
00079
00080
00081
00082
00083 private final ChannelCache lookup (char[] channel)
00084 {
00085 return cast(ChannelCache) cacheSet.get (channel);
00086 }
00087
00088
00089
00090
00091
00092 ChannelCache addCache (char[] channel, uint size)
00093 {
00094
00095 ChannelCache cache = new ChannelCache (cluster, channel, size);
00096
00097
00098 cacheSet.put (channel, cache);
00099
00100 return cache;
00101 }
00102
00103
00104
00105
00106
00107 void put (char[] channel, char[] element, ClusterContent content)
00108 {
00109 ChannelCache cache = lookup (channel);
00110
00111 if (cache is null)
00112 cache = addCache (channel.dup, defaultSize);
00113
00114 cache.put (element, content);
00115 }
00116
00117
00118
00119
00120
00121 ClusterContent extract (char[] channel, char[] element)
00122 {
00123 ChannelCache cache = lookup (channel);
00124
00125 if (cache)
00126 return cache.extract (element);
00127
00128 return null;
00129 }
00130
00131
00132
00133
00134
00135 ClusterContent get (char[] channel, char[] element)
00136 {
00137 ChannelCache cache = lookup (channel);
00138
00139 if (cache)
00140 return cache.get (element);
00141
00142 return null;
00143 }
00144
00145
00146
00147
00148
00149
00150
00151
00152
00153 bool lockWhereInvalid (char[] channel, char[] element, long time)
00154 {
00155 ChannelCache cache = lookup (channel);
00156
00157
00158 if (cache is null)
00159 cache = addCache (channel.dup, defaultSize);
00160
00161 return cache.lockWhereInvalid (element, time);
00162 }
00163
00164
00165
00166
00167
00168 void unlock (char[] channel, char[] element)
00169 {
00170 ChannelCache cache = lookup (channel);
00171
00172 if (cache)
00173 cache.unlock (element);
00174 }
00175 }
00176
00177
00178
00179
00180
00181
00182 class ClusterQueue : Thread
00183 {
00184 private uint size,
00185 used,
00186 sleep;
00187 private ILogger logger;
00188 private Cluster cluster;
00189 private HashMap queueSet;
00190
00191
00192
00193
00194
00195 this (Cluster cluster, uint size, uint sleep)
00196 {
00197 queueSet = new HashMap (256);
00198 logger = cluster.getLogger ();
00199 this.cluster = cluster;
00200 this.sleep = sleep;
00201 this.size = size;
00202
00203 start ();
00204 }
00205
00206
00207
00208
00209
00210 private final ChannelQueue lookup (char[] channel)
00211 {
00212 return cast(ChannelQueue) queueSet.get (channel);
00213 }
00214
00215
00216
00217
00218
00219 bool put (char[] name, ClusterContent content)
00220 {
00221 if ((used + content.length) < size)
00222 {
00223
00224 ChannelQueue queue = lookup (name);
00225 if (queue is null)
00226 {
00227
00228 name = name.dup;
00229
00230 logger.trace ("creating new queue for channel '" ~ name ~ "'");
00231
00232
00233 queueSet.put (name, queue = new ChannelQueue (cluster.createChannel (name)));
00234 }
00235
00236 queue.put (content);
00237 used += content.length;
00238 return true;
00239 }
00240 return false;
00241 }
00242
00243
00244
00245
00246
00247 synchronized ClusterContent get (char[] name)
00248 {
00249 ClusterContent ret = null;
00250 ChannelQueue queue = lookup (name);
00251
00252 if (queue)
00253 {
00254
00255
00256 ret = queue.get ();
00257 used -= ret.length;
00258 }
00259 return ret;
00260 }
00261
00262
00263
00264
00265
00266 override int run ()
00267 {
00268 while (true)
00269 {
00270 System.sleep (Random.get (sleep));
00271
00272 try {
00273 foreach (char[] k, Object o; queueSet)
00274 {
00275 ChannelQueue q = cast(ChannelQueue) o;
00276 if (q.count)
00277 {
00278 IChannel c = q.channel;
00279
00280 if (logger.isEnabled (logger.Level.Trace))
00281 logger.trace ("publishing queue channel '" ~
00282 c.getName ~ "'");
00283 cluster.broadcast (c);
00284 }
00285 }
00286 } catch (Object x)
00287 logger.error ("queue-publisher: "~x.toString);
00288 }
00289 return 0;
00290 }
00291 }
00292
00293
00294
00295
00296
00297
00298 private class ChannelCache
00299 {
00300 private HashMap locks;
00301 private QueuedCache cache;
00302
00303
00304
00305
00306
00307 this (Cluster cluster, char[] channel, int cacheSize)
00308 {
00309 cluster.getLogger.trace ("creating new cache for channel '" ~ channel ~ "'");
00310
00311
00312
00313
00314 locks = new HashMap (256, 0.75, 1);
00315
00316 cache = new QueuedCache (cacheSize);
00317
00318
00319 new CacheInvalidatee (cluster, channel, cache);
00320 }
00321
00322
00323
00324
00325
00326 void put (char[] key, ClusterContent content)
00327 in {
00328 assert (content);
00329 }
00330 body
00331 {
00332 ClusterEntry entry = ClusterEntry.create ();
00333
00334 entry.setData (content);
00335 cache.put (key, entry);
00336 }
00337
00338
00339
00340
00341
00342 ClusterContent extract (char[] key)
00343 {
00344 IPayload p = cache.extract (key);
00345
00346 if (p)
00347 {
00348 ClusterEntry entry = cast(ClusterEntry) p;
00349 return entry.getData;
00350 }
00351 return null;
00352 }
00353
00354
00355
00356
00357
00358 ClusterContent get (char[] key)
00359 {
00360 IPayload p = cache.get (key);
00361
00362 if (p)
00363 {
00364 ClusterEntry entry = cast(ClusterEntry) p;
00365 return entry.getData;
00366 }
00367 return null;
00368 }
00369
00370
00371
00372
00373
00374
00375
00376
00377
00378 bool lockWhereInvalid (char[] key, long time)
00379 {
00380 IPayload p = cache.get (key);
00381 if (p)
00382 {
00383 ClusterEntry entry = cast(ClusterEntry) p;
00384 if (entry.getTime () > time)
00385 return false;
00386 }
00387
00388 if (locks.get (key))
00389 return false;
00390
00391
00392 locks.put (key, this);
00393 return true;
00394 }
00395
00396
00397
00398
00399
00400 void unlock (char[] key)
00401 {
00402 locks.remove (key);
00403 }
00404 }
00405
00406
00407
00408
00409
00410
00411 private class ClusterEntry : Payload
00412 {
00413 private ClusterContent data;
00414 private ClusterEntry next;
00415 private static ClusterEntry freelist;
00416
00417
00418
00419
00420
00421 void setData (ClusterContent data)
00422 {
00423 this.data = data;
00424 }
00425
00426
00427
00428
00429
00430 ClusterContent getData ()
00431 {
00432 return data;
00433 }
00434
00435
00436
00437
00438
00439
00440
00441 static ClusterEntry create ()
00442 {
00443 ClusterEntry s;
00444
00445 if (freelist)
00446 {
00447 s = freelist;
00448 freelist = s.next;
00449 }
00450 else
00451 s = new ClusterEntry;
00452 return s;
00453 }
00454
00455
00456
00457
00458
00459
00460
00461 void destroy ()
00462 {
00463 data = null;
00464 next = freelist;
00465 freelist = this;
00466 }
00467 }
00468
00469
00470
00471
00472
00473
00474 private class ChannelQueue
00475 {
00476 private Link head,
00477 tail;
00478 private int count;
00479 IChannel channel;
00480
00481
00482
00483
00484
00485 private class Link
00486 {
00487 Link prev,
00488 next;
00489 ClusterContent data;
00490
00491 static Link freeList;
00492
00493
00494
00495
00496
00497 Link append (Link after)
00498 {
00499 if (after)
00500 {
00501 next = after.next;
00502
00503
00504 if (next)
00505 next.prev = this;
00506
00507
00508 prev = after;
00509 after.next = this;
00510 }
00511 return this;
00512 }
00513
00514
00515
00516
00517
00518 Link unlink ()
00519 {
00520
00521 if (prev)
00522 prev.next = next;
00523
00524 if (next)
00525 next.prev = prev;
00526
00527
00528 next = prev = null;
00529 return this;
00530 }
00531
00532
00533
00534
00535
00536 Link create ()
00537 {
00538 Link l;
00539
00540 if (freeList)
00541 {
00542 l = freeList;
00543 freeList = l.next;
00544 }
00545 else
00546 l = new Link;
00547 return l;
00548 }
00549
00550
00551
00552
00553
00554 void destroy ()
00555 {
00556 next = freeList;
00557 freeList = this;
00558 this.data = null;
00559 }
00560 }
00561
00562
00563
00564
00565
00566
00567 this (IChannel channel)
00568 {
00569 head = tail = new Link;
00570 this.channel = channel;
00571 }
00572
00573
00574
00575
00576
00577
00578
00579
00580 void put (ClusterContent content)
00581 {
00582 tail.data = content;
00583 tail = tail.create.append (tail);
00584 ++count;
00585 }
00586
00587
00588
00589
00590
00591
00592
00593
00594
00595
00596 ClusterContent get ()
00597 {
00598 if (head !== tail)
00599 {
00600
00601 Link l = head;
00602 head = head.next;
00603 ClusterContent ret = l.data;
00604 l.unlink ();
00605 l.destroy ();
00606 --count;
00607 return ret;
00608 }
00609 return null;
00610 }
00611 }
00612
00613