00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 module mango.cluster.qos.socket.ClusterCache;
00040
00041 private import std.thread;
00042
00043 private import mango.sys.System;
00044
00045 private import mango.utils.Random;
00046
00047 private import mango.cache.HashMap,
00048 mango.cache.Payload,
00049 mango.cache.QueuedCache;
00050
00051 private import mango.cache.model.IPayload;
00052
00053 private import mango.cluster.CacheInvalidatee;
00054
00055 private import mango.cluster.qos.socket.Cluster;
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065 class ClusterCache
00066 {
00067 private Cluster cluster;
00068 private HashMap cacheSet;
00069 private uint defaultSize;
00070
00071
00072
00073
00074
00075 this (Cluster cluster, uint defaultSize)
00076 {
00077 this.cacheSet = new HashMap (256);
00078 this.defaultSize = defaultSize;
00079 this.cluster = cluster;
00080 }
00081
00082
00083
00084
00085
00086 private final ChannelCache lookup (char[] channel)
00087 {
00088 return cast(ChannelCache) cacheSet.get (channel);
00089 }
00090
00091
00092
00093
00094
00095 ChannelCache addCache (char[] channel, uint size)
00096 {
00097
00098 ChannelCache cache = new ChannelCache (cluster, channel, size);
00099
00100
00101 cacheSet.put (channel, cache);
00102
00103 return cache;
00104 }
00105
00106
00107
00108
00109
00110 void put (char[] channel, char[] element, ClusterContent content)
00111 {
00112 ChannelCache cache = lookup (channel);
00113
00114 if (cache is null)
00115 cache = addCache (channel.dup, defaultSize);
00116
00117 cache.put (element, content);
00118 }
00119
00120
00121
00122
00123
00124 ClusterContent extract (char[] channel, char[] element)
00125 {
00126 ChannelCache cache = lookup (channel);
00127
00128 if (cache)
00129 return cache.extract (element);
00130
00131 return null;
00132 }
00133
00134
00135
00136
00137
00138 ClusterContent get (char[] channel, char[] element)
00139 {
00140 ChannelCache cache = lookup (channel);
00141
00142 if (cache)
00143 return cache.get (element);
00144
00145 return null;
00146 }
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156 bool lockWhereInvalid (char[] channel, char[] element, ulong time)
00157 {
00158 ChannelCache cache = lookup (channel);
00159
00160
00161 if (cache is null)
00162 cache = addCache (channel.dup, defaultSize);
00163
00164 return cache.lockWhereInvalid (element, time);
00165 }
00166
00167
00168
00169
00170
00171 void unlock (char[] channel, char[] element)
00172 {
00173 ChannelCache cache = lookup (channel);
00174
00175 if (cache)
00176 cache.unlock (element);
00177 }
00178 }
00179
00180
00181
00182
00183
00184
00185 class ClusterQueue : Thread
00186 {
00187 private uint size,
00188 used,
00189 sleep;
00190 private ILogger logger;
00191 private Cluster cluster;
00192 private HashMap queueSet;
00193
00194
00195
00196
00197
00198 this (Cluster cluster, uint size, uint sleep)
00199 {
00200 queueSet = new HashMap (256);
00201 logger = cluster.getLogger ();
00202 this.cluster = cluster;
00203 this.sleep = sleep;
00204 this.size = size;
00205
00206 start ();
00207 }
00208
00209
00210
00211
00212
00213 private final ChannelQueue lookup (char[] channel)
00214 {
00215 return cast(ChannelQueue) queueSet.get (channel);
00216 }
00217
00218
00219
00220
00221
00222 bool put (char[] name, ClusterContent content)
00223 {
00224 if ((used + content.length) < size)
00225 {
00226
00227 ChannelQueue queue = lookup (name);
00228 if (queue is null)
00229 {
00230
00231 name = name.dup;
00232
00233 logger.trace ("creating new queue for channel '" ~ name ~ "'");
00234
00235
00236 queueSet.put (name, queue = new ChannelQueue (cluster.createChannel (name)));
00237 }
00238
00239 queue.put (content);
00240 used += content.length;
00241 return true;
00242 }
00243 return false;
00244 }
00245
00246
00247
00248
00249
00250 synchronized ClusterContent get (char[] name)
00251 {
00252 ClusterContent ret = null;
00253 ChannelQueue queue = lookup (name);
00254
00255 if (queue)
00256 {
00257
00258
00259 ret = queue.get ();
00260 used -= ret.length;
00261 }
00262 return ret;
00263 }
00264
00265
00266
00267
00268
00269 version (Ares)
00270 alias void ThreadReturn;
00271 else
00272 alias int ThreadReturn;
00273
00274 override ThreadReturn run ()
00275 {
00276 while (true)
00277 {
00278 System.sleep (Random.get (sleep));
00279
00280 try {
00281 foreach (char[] k, Object o; queueSet)
00282 {
00283 ChannelQueue q = cast(ChannelQueue) o;
00284 if (q.count)
00285 {
00286 IChannel c = q.channel;
00287
00288 if (logger.isEnabled (logger.Level.Trace))
00289 logger.trace ("publishing queue channel '" ~
00290 c.getName ~ "'");
00291 cluster.broadcast (c);
00292 }
00293 }
00294 } catch (Object x)
00295 logger.error ("queue-publisher: "~x.toString);
00296 }
00297 return 0;
00298 }
00299 }
00300
00301
00302
00303
00304
00305
00306 private class ChannelCache
00307 {
00308 private HashMap locks;
00309 private QueuedCache cache;
00310
00311
00312
00313
00314
00315 this (Cluster cluster, char[] channel, int cacheSize)
00316 {
00317 cluster.getLogger.trace ("creating new cache for channel '" ~ channel ~ "'");
00318
00319
00320
00321
00322 locks = new HashMap (256, 0.75, 1);
00323
00324 cache = new QueuedCache (cacheSize);
00325
00326
00327 new CacheInvalidatee (cluster, channel, cache);
00328 }
00329
00330
00331
00332
00333
00334 void put (char[] key, ClusterContent content)
00335 in {
00336 assert (content);
00337 }
00338 body
00339 {
00340 ClusterEntry entry = ClusterEntry.create ();
00341
00342 entry.setData (content);
00343 cache.put (key, entry);
00344 }
00345
00346
00347
00348
00349
00350 ClusterContent extract (char[] key)
00351 {
00352 IPayload p = cache.extract (key);
00353
00354 if (p)
00355 {
00356 ClusterEntry entry = cast(ClusterEntry) cast(void*) p;
00357 return entry.getData;
00358 }
00359 return null;
00360 }
00361
00362
00363
00364
00365
00366 ClusterContent get (char[] key)
00367 {
00368 IPayload p = cache.get (key);
00369
00370 if (p)
00371 {
00372 ClusterEntry entry = cast(ClusterEntry) cast(void*) p;
00373 return entry.getData;
00374 }
00375 return null;
00376 }
00377
00378
00379
00380
00381
00382
00383
00384
00385
00386 bool lockWhereInvalid (char[] key, ulong time)
00387 {
00388 IPayload p = cache.get (key);
00389 if (p)
00390 {
00391 ClusterEntry entry = cast(ClusterEntry) cast(void*) p;
00392 if (entry.getTime () > time)
00393 return false;
00394 }
00395
00396 if (locks.get (key))
00397 return false;
00398
00399
00400 locks.put (key, this);
00401 return true;
00402 }
00403
00404
00405
00406
00407
00408 void unlock (char[] key)
00409 {
00410 locks.remove (key);
00411 }
00412 }
00413
00414
00415
00416
00417
00418
00419 private class ClusterEntry : Payload
00420 {
00421 private ClusterContent data;
00422 private ClusterEntry next;
00423 private static ClusterEntry freelist;
00424
00425
00426
00427
00428
00429 void setData (ClusterContent data)
00430 {
00431 this.data = data;
00432 }
00433
00434
00435
00436
00437
00438 ClusterContent getData ()
00439 {
00440 return data;
00441 }
00442
00443
00444
00445
00446
00447
00448
00449 static ClusterEntry create ()
00450 {
00451 ClusterEntry s;
00452
00453 if (freelist)
00454 {
00455 s = freelist;
00456 freelist = s.next;
00457 }
00458 else
00459 s = new ClusterEntry;
00460 return s;
00461 }
00462
00463
00464
00465
00466
00467
00468
00469 void destroy ()
00470 {
00471 data = null;
00472 next = freelist;
00473 freelist = this;
00474 }
00475 }
00476
00477
00478
00479
00480
00481
00482 private class ChannelQueue
00483 {
00484 private Link head,
00485 tail;
00486 private int count;
00487 IChannel channel;
00488
00489
00490
00491
00492
00493 private static class Link
00494 {
00495 Link prev,
00496 next;
00497 ClusterContent data;
00498
00499 static Link freeList;
00500
00501
00502
00503
00504
00505 Link append (Link after)
00506 {
00507 if (after)
00508 {
00509 next = after.next;
00510
00511
00512 if (next)
00513 next.prev = this;
00514
00515
00516 prev = after;
00517 after.next = this;
00518 }
00519 return this;
00520 }
00521
00522
00523
00524
00525
00526 Link unlink ()
00527 {
00528
00529 if (prev)
00530 prev.next = next;
00531
00532 if (next)
00533 next.prev = prev;
00534
00535
00536 next = prev = null;
00537 return this;
00538 }
00539
00540
00541
00542
00543
00544 Link create ()
00545 {
00546 Link l;
00547
00548 if (freeList)
00549 {
00550 l = freeList;
00551 freeList = l.next;
00552 }
00553 else
00554 l = new Link;
00555 return l;
00556 }
00557
00558
00559
00560
00561
00562 void destroy ()
00563 {
00564 next = freeList;
00565 freeList = this;
00566 this.data = null;
00567 }
00568 }
00569
00570
00571
00572
00573
00574
00575 this (IChannel channel)
00576 {
00577 head = tail = new Link;
00578 this.channel = channel;
00579 }
00580
00581
00582
00583
00584
00585
00586
00587
00588 void put (ClusterContent content)
00589 {
00590 tail.data = content;
00591 tail = tail.create.append (tail);
00592 ++count;
00593 }
00594
00595
00596
00597
00598
00599
00600
00601
00602
00603
00604 ClusterContent get ()
00605 {
00606 if (head !is tail)
00607 {
00608
00609 Link l = head;
00610 head = head.next;
00611 ClusterContent ret = l.data;
00612 l.unlink ();
00613 l.destroy ();
00614 --count;
00615 return ret;
00616 }
00617 return null;
00618 }
00619 }
00620
00621