00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 module mango.cluster.qos.socket.ClusterCache;
00040
00041 private import std.thread;
00042
00043 private import mango.base.System;
00044
00045 private import mango.utils.Random;
00046
00047 private import mango.cache.HashMap,
00048 mango.cache.Payload,
00049 mango.cache.QueuedCache;
00050
00051 private import mango.cache.model.IPayload;
00052
00053 private import mango.cluster.CacheInvalidatee;
00054
00055 private import mango.cluster.qos.socket.Cluster;
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065 class ClusterCache
00066 {
00067 private Cluster cluster;
00068 private HashMap cacheSet;
00069 private uint defaultSize;
00070
00071
00072
00073
00074
00075 this (Cluster cluster, uint defaultSize)
00076 {
00077 this.cacheSet = new HashMap (256);
00078 this.defaultSize = defaultSize;
00079 this.cluster = cluster;
00080 }
00081
00082
00083
00084
00085
00086 private final ChannelCache lookup (char[] channel)
00087 {
00088 return cast(ChannelCache) cacheSet.get (channel);
00089 }
00090
00091
00092
00093
00094
00095 ChannelCache addCache (char[] channel, uint size)
00096 {
00097
00098 ChannelCache cache = new ChannelCache (cluster, channel, size);
00099
00100
00101 cacheSet.put (channel, cache);
00102
00103 return cache;
00104 }
00105
00106
00107
00108
00109
00110 void put (char[] channel, char[] element, ClusterContent content)
00111 {
00112 ChannelCache cache = lookup (channel);
00113
00114 if (cache is null)
00115 cache = addCache (channel.dup, defaultSize);
00116
00117 cache.put (element, content);
00118 }
00119
00120
00121
00122
00123
00124 ClusterContent extract (char[] channel, char[] element)
00125 {
00126 ChannelCache cache = lookup (channel);
00127
00128 if (cache)
00129 return cache.extract (element);
00130
00131 return null;
00132 }
00133
00134
00135
00136
00137
00138 ClusterContent get (char[] channel, char[] element)
00139 {
00140 ChannelCache cache = lookup (channel);
00141
00142 if (cache)
00143 return cache.get (element);
00144
00145 return null;
00146 }
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156 bool lockWhereInvalid (char[] channel, char[] element, long time)
00157 {
00158 ChannelCache cache = lookup (channel);
00159
00160
00161 if (cache is null)
00162 cache = addCache (channel.dup, defaultSize);
00163
00164 return cache.lockWhereInvalid (element, time);
00165 }
00166
00167
00168
00169
00170
00171 void unlock (char[] channel, char[] element)
00172 {
00173 ChannelCache cache = lookup (channel);
00174
00175 if (cache)
00176 cache.unlock (element);
00177 }
00178 }
00179
00180
00181
00182
00183
00184
00185 class ClusterQueue : Thread
00186 {
00187 private uint size,
00188 used,
00189 sleep;
00190 private ILogger logger;
00191 private Cluster cluster;
00192 private HashMap queueSet;
00193
00194
00195
00196
00197
00198 this (Cluster cluster, uint size, uint sleep)
00199 {
00200 queueSet = new HashMap (256);
00201 logger = cluster.getLogger ();
00202 this.cluster = cluster;
00203 this.sleep = sleep;
00204 this.size = size;
00205
00206 start ();
00207 }
00208
00209
00210
00211
00212
00213 private final ChannelQueue lookup (char[] channel)
00214 {
00215 return cast(ChannelQueue) queueSet.get (channel);
00216 }
00217
00218
00219
00220
00221
00222 bool put (char[] name, ClusterContent content)
00223 {
00224 if ((used + content.length) < size)
00225 {
00226
00227 ChannelQueue queue = lookup (name);
00228 if (queue is null)
00229 {
00230
00231 name = name.dup;
00232
00233 logger.trace ("creating new queue for channel '" ~ name ~ "'");
00234
00235
00236 queueSet.put (name, queue = new ChannelQueue (cluster.createChannel (name)));
00237 }
00238
00239 queue.put (content);
00240 used += content.length;
00241 return true;
00242 }
00243 return false;
00244 }
00245
00246
00247
00248
00249
00250 synchronized ClusterContent get (char[] name)
00251 {
00252 ClusterContent ret = null;
00253 ChannelQueue queue = lookup (name);
00254
00255 if (queue)
00256 {
00257
00258
00259 ret = queue.get ();
00260 used -= ret.length;
00261 }
00262 return ret;
00263 }
00264
00265
00266
00267
00268
00269 override int run ()
00270 {
00271 while (true)
00272 {
00273 System.sleep (Random.get (sleep));
00274
00275 try {
00276 foreach (char[] k, Object o; queueSet)
00277 {
00278 ChannelQueue q = cast(ChannelQueue) o;
00279 if (q.count)
00280 {
00281 IChannel c = q.channel;
00282
00283 if (logger.isEnabled (logger.Level.Trace))
00284 logger.trace ("publishing queue channel '" ~
00285 c.getName ~ "'");
00286 cluster.broadcast (c);
00287 }
00288 }
00289 } catch (Object x)
00290 logger.error ("queue-publisher: "~x.toString);
00291 }
00292 return 0;
00293 }
00294 }
00295
00296
00297
00298
00299
00300
00301 private class ChannelCache
00302 {
00303 private HashMap locks;
00304 private QueuedCache cache;
00305
00306
00307
00308
00309
00310 this (Cluster cluster, char[] channel, int cacheSize)
00311 {
00312 cluster.getLogger.trace ("creating new cache for channel '" ~ channel ~ "'");
00313
00314
00315
00316
00317 locks = new HashMap (256, 0.75, 1);
00318
00319 cache = new QueuedCache (cacheSize);
00320
00321
00322 new CacheInvalidatee (cluster, channel, cache);
00323 }
00324
00325
00326
00327
00328
00329 void put (char[] key, ClusterContent content)
00330 in {
00331 assert (content);
00332 }
00333 body
00334 {
00335 ClusterEntry entry = ClusterEntry.create ();
00336
00337 entry.setData (content);
00338 cache.put (key, entry);
00339 }
00340
00341
00342
00343
00344
00345 ClusterContent extract (char[] key)
00346 {
00347 IPayload p = cache.extract (key);
00348
00349 if (p)
00350 {
00351 ClusterEntry entry = cast(ClusterEntry) p;
00352 return entry.getData;
00353 }
00354 return null;
00355 }
00356
00357
00358
00359
00360
00361 ClusterContent get (char[] key)
00362 {
00363 IPayload p = cache.get (key);
00364
00365 if (p)
00366 {
00367 ClusterEntry entry = cast(ClusterEntry) p;
00368 return entry.getData;
00369 }
00370 return null;
00371 }
00372
00373
00374
00375
00376
00377
00378
00379
00380
00381 bool lockWhereInvalid (char[] key, long time)
00382 {
00383 IPayload p = cache.get (key);
00384 if (p)
00385 {
00386 ClusterEntry entry = cast(ClusterEntry) p;
00387 if (entry.getTime () > time)
00388 return false;
00389 }
00390
00391 if (locks.get (key))
00392 return false;
00393
00394
00395 locks.put (key, this);
00396 return true;
00397 }
00398
00399
00400
00401
00402
00403 void unlock (char[] key)
00404 {
00405 locks.remove (key);
00406 }
00407 }
00408
00409
00410
00411
00412
00413
00414 private class ClusterEntry : Payload
00415 {
00416 private ClusterContent data;
00417 private ClusterEntry next;
00418 private static ClusterEntry freelist;
00419
00420
00421
00422
00423
00424 void setData (ClusterContent data)
00425 {
00426 this.data = data;
00427 }
00428
00429
00430
00431
00432
00433 ClusterContent getData ()
00434 {
00435 return data;
00436 }
00437
00438
00439
00440
00441
00442
00443
00444 static ClusterEntry create ()
00445 {
00446 ClusterEntry s;
00447
00448 if (freelist)
00449 {
00450 s = freelist;
00451 freelist = s.next;
00452 }
00453 else
00454 s = new ClusterEntry;
00455 return s;
00456 }
00457
00458
00459
00460
00461
00462
00463
00464 void destroy ()
00465 {
00466 data = null;
00467 next = freelist;
00468 freelist = this;
00469 }
00470 }
00471
00472
00473
00474
00475
00476
00477 private class ChannelQueue
00478 {
00479 private Link head,
00480 tail;
00481 private int count;
00482 IChannel channel;
00483
00484
00485
00486
00487
00488 private class Link
00489 {
00490 Link prev,
00491 next;
00492 ClusterContent data;
00493
00494 static Link freeList;
00495
00496
00497
00498
00499
00500 Link append (Link after)
00501 {
00502 if (after)
00503 {
00504 next = after.next;
00505
00506
00507 if (next)
00508 next.prev = this;
00509
00510
00511 prev = after;
00512 after.next = this;
00513 }
00514 return this;
00515 }
00516
00517
00518
00519
00520
00521 Link unlink ()
00522 {
00523
00524 if (prev)
00525 prev.next = next;
00526
00527 if (next)
00528 next.prev = prev;
00529
00530
00531 next = prev = null;
00532 return this;
00533 }
00534
00535
00536
00537
00538
00539 Link create ()
00540 {
00541 Link l;
00542
00543 if (freeList)
00544 {
00545 l = freeList;
00546 freeList = l.next;
00547 }
00548 else
00549 l = new Link;
00550 return l;
00551 }
00552
00553
00554
00555
00556
00557 void destroy ()
00558 {
00559 next = freeList;
00560 freeList = this;
00561 this.data = null;
00562 }
00563 }
00564
00565
00566
00567
00568
00569
00570 this (IChannel channel)
00571 {
00572 head = tail = new Link;
00573 this.channel = channel;
00574 }
00575
00576
00577
00578
00579
00580
00581
00582
00583 void put (ClusterContent content)
00584 {
00585 tail.data = content;
00586 tail = tail.create.append (tail);
00587 ++count;
00588 }
00589
00590
00591
00592
00593
00594
00595
00596
00597
00598
00599 ClusterContent get ()
00600 {
00601 if (head !== tail)
00602 {
00603
00604 Link l = head;
00605 head = head.next;
00606 ClusterContent ret = l.data;
00607 l.unlink ();
00608 l.destroy ();
00609 --count;
00610 return ret;
00611 }
00612 return null;
00613 }
00614 }
00615
00616