Main Page | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | File Members | Related Pages

ClusterCache.d

Go to the documentation of this file.
00001 /*******************************************************************************
00002 
00003         @file ClusterCache.d
00004         
00005         Copyright (C) 2004 Kris Bell
00006         
00007         This software is provided 'as-is', without any express or implied
00008         warranty. In no event will the authors be held liable for damages
00009         of any kind arising from the use of this software.
00010         
00011         Permission is hereby granted to anyone to use this software for any 
00012         purpose, including commercial applications, and to alter it and/or 
00013         redistribute it freely, subject to the following restrictions:
00014         
00015         1. The origin of this software must not be misrepresented; you must 
00016            not claim that you wrote the original software. If you use this 
00017            software in a product, an acknowledgment within documentation of 
00018            said product would be appreciated but is not required.
00019 
00020         2. Altered source versions must be plainly marked as such, and must 
00021            not be misrepresented as being the original software.
00022 
00023         3. This notice may not be removed or altered from any distribution
00024            of the source.
00025 
00026 
00027                         ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
00028 
00029         
00030         @version        Initial version, April 2004      
00031         @author         Kris
00032 
00033 
00034 *******************************************************************************/
00035 
00036 module mango.cluster.qos.socket.ClusterCache;
00037 
00038 private import  std.thread;
00039 
00040 private import  mango.base.System;
00041 
00042 private import  mango.utils.Random;
00043 
00044 private import  mango.cache.HashMap,
00045                 mango.cache.Payload,
00046                 mango.cache.QueuedCache;
00047 
00048 private import  mango.cache.model.IPayload;
00049 
00050 private import  mango.cluster.CacheInvalidatee;
00051 
00052 private import  mango.cluster.qos.socket.Cluster;
00053 
00054 
00055 /******************************************************************************
00056        
00057         The socket QOS cache containers. These are created by ClusterServer
00058         and maintained via ClusterThread.
00059          
00060 ******************************************************************************/
00061 
00062 class ClusterCache
00063 {
00064         private Cluster         cluster;
00065         private HashMap         cacheSet;
00066         private uint            defaultSize;
00067         
00068         /**********************************************************************
00069 
00070         **********************************************************************/
00071 
00072         this (Cluster cluster, uint defaultSize)
00073         {
00074                 this.cacheSet = new HashMap (256);
00075                 this.defaultSize = defaultSize;
00076                 this.cluster = cluster;
00077         }
00078 
00079         /**********************************************************************
00080 
00081         **********************************************************************/
00082 
00083         private final ChannelCache lookup (char[] channel)
00084         {       
00085                 return cast(ChannelCache) cacheSet.get (channel);
00086         }       
00087 
00088         /**********************************************************************
00089 
00090         **********************************************************************/
00091 
00092         ChannelCache addCache (char[] channel, uint size)
00093         {       
00094                 // create new cache instance
00095                 ChannelCache cache = new ChannelCache (cluster, channel, size);
00096 
00097                 // add to current list
00098                 cacheSet.put (channel, cache); 
00099 
00100                 return cache;
00101         }       
00102 
00103         /**********************************************************************
00104 
00105         **********************************************************************/
00106 
00107         void put (char[] channel, char[] element, ClusterContent content)
00108         {       
00109                 ChannelCache cache = lookup (channel);
00110 
00111                 if (cache is null)
00112                     cache = addCache (channel.dup, defaultSize);
00113 
00114                 cache.put (element, content);
00115         }       
00116 
00117         /**********************************************************************
00118 
00119         **********************************************************************/
00120 
00121         ClusterContent extract (char[] channel, char[] element)
00122         {
00123                 ChannelCache cache = lookup (channel);
00124 
00125                 if (cache)
00126                     return cache.extract (element);
00127 
00128                 return null;
00129         }
00130 
00131         /**********************************************************************
00132 
00133         **********************************************************************/
00134 
00135         ClusterContent get (char[] channel, char[] element)
00136         {
00137                 ChannelCache cache = lookup (channel);
00138 
00139                 if (cache)
00140                     return cache.get (element);
00141 
00142                 return null;
00143         }
00144 
00145         /**********************************************************************
00146 
00147                 Add a cache lock where the entry is invalid or unlocked.
00148                 Returns true if locked by this call, false otherwise. Note
00149                 that this will return false if the entry is already locked.
00150 
00151         **********************************************************************/
00152 
00153         bool lockWhereInvalid (char[] channel, char[] element, long time)
00154         {
00155                 ChannelCache cache = lookup (channel);
00156 
00157                 // create new cache if necessary (we're about to load an entry)
00158                 if (cache is null)
00159                     cache = addCache (channel.dup, defaultSize);
00160 
00161                 return cache.lockWhereInvalid (element, time);
00162         }
00163 
00164         /**********************************************************************
00165 
00166         **********************************************************************/
00167 
00168         void unlock (char[] channel, char[] element)
00169         {
00170                 ChannelCache cache = lookup (channel);
00171 
00172                 if (cache)
00173                     cache.unlock (element);
00174         }
00175 }
00176 
00177 
00178 /******************************************************************************
00179         
00180 ******************************************************************************/
00181 
00182 class ClusterQueue : Thread
00183 {
00184         private uint            size,
00185                                 used,
00186                                 sleep;
00187         private ILogger         logger;
00188         private Cluster         cluster;
00189         private HashMap         queueSet;
00190 
00191         /**********************************************************************
00192 
00193         **********************************************************************/
00194 
00195         this (Cluster cluster, uint size, uint sleep)
00196         {
00197                 queueSet = new HashMap (256);
00198                 logger = cluster.getLogger ();
00199                 this.cluster = cluster;
00200                 this.sleep = sleep;
00201                 this.size = size;
00202 
00203                 start ();
00204         }
00205 
00206         /**********************************************************************
00207 
00208         **********************************************************************/
00209 
00210         private final ChannelQueue lookup (char[] channel)
00211         {
00212                 return cast(ChannelQueue) queueSet.get (channel);
00213         }
00214 
00215         /**********************************************************************
00216 
00217         **********************************************************************/
00218 
00219         bool put (char[] name, ClusterContent content)
00220         {       
00221                 if ((used + content.length) < size)
00222                    {
00223                    // select the appropriate queue
00224                    ChannelQueue queue = lookup (name);
00225                    if (queue is null)
00226                       {
00227                       // name is currently a reference only; copy it
00228                       name = name.dup;
00229 
00230                       logger.trace ("creating new queue for channel '" ~ name ~ "'");
00231 
00232                       // place new ChannelQueue into the list
00233                       queueSet.put (name, queue = new ChannelQueue (cluster.createChannel (name)));
00234                       }
00235 
00236                    queue.put (content);
00237                    used += content.length;
00238                    return true;
00239                    }
00240                 return false;
00241         }       
00242 
00243         /**********************************************************************
00244 
00245         **********************************************************************/
00246 
00247         synchronized ClusterContent get (char[] name)
00248         {
00249                 ClusterContent ret = null;
00250                 ChannelQueue   queue = lookup (name);
00251 
00252                 if (queue)
00253                    {
00254                    // printf ("get from queue '%.*s' (%d)\n", name, queue.count);
00255 
00256                    ret = queue.get ();
00257                    used -= ret.length;
00258                    }
00259                 return ret;
00260         }   
00261         
00262         /**********************************************************************
00263 
00264         **********************************************************************/
00265 
00266         override int run ()
00267         {       
00268                 while (true)
00269                       {
00270                       System.sleep (Random.get (sleep));
00271 
00272                       try {
00273                           foreach (char[] k, Object o; queueSet)
00274                                   {
00275                                   ChannelQueue q = cast(ChannelQueue) o;
00276                                   if (q.count)
00277                                      {
00278                                      IChannel c = q.channel;
00279 
00280                                      if (logger.isEnabled (logger.Level.Trace))
00281                                          logger.trace ("publishing queue channel '" ~ 
00282                                                         c.getName ~ "'");
00283                                      cluster.broadcast (c);
00284                                      }
00285                                   }
00286                           } catch (Object x)
00287                                    logger.error ("queue-publisher: "~x.toString);
00288                       }
00289                 return 0;
00290         }           
00291 }
00292 
00293 
00294 /******************************************************************************
00295        
00296 ******************************************************************************/
00297 
00298 private class ChannelCache 
00299 {
00300         private HashMap         locks;
00301         private QueuedCache     cache;
00302 
00303         /**********************************************************************
00304 
00305         **********************************************************************/
00306 
00307         this (Cluster cluster, char[] channel, int cacheSize)
00308         {
00309                 cluster.getLogger.trace ("creating new cache for channel '" ~ channel ~ "'");
00310 
00311                 // dissalow concurrent updates to the lock map, since the
00312                 // whole point of these locks is to allow just one thread 
00313                 // to update the cache
00314                 locks = new HashMap (256, 0.75, 1);
00315 
00316                 cache = new QueuedCache (cacheSize);
00317 
00318                 // construct an invalidation listener for this cache
00319                 new CacheInvalidatee (cluster, channel, cache);
00320         }
00321 
00322         /**********************************************************************
00323 
00324         **********************************************************************/
00325 
00326         void put (char[] key, ClusterContent content)
00327         in {
00328            assert (content);
00329            }
00330         body
00331         {
00332                 ClusterEntry entry = ClusterEntry.create ();
00333 
00334                 entry.setData (content);
00335                 cache.put (key, entry);
00336         }
00337 
00338         /**********************************************************************
00339 
00340         **********************************************************************/
00341 
00342         ClusterContent extract (char[] key)
00343         {
00344                 IPayload p = cache.extract (key);
00345 
00346                 if (p)
00347                    {
00348                    ClusterEntry entry = cast(ClusterEntry) p;
00349                    return entry.getData;
00350                    }
00351                 return null;
00352         }
00353 
00354         /**********************************************************************
00355 
00356         **********************************************************************/
00357 
00358         ClusterContent get (char[] key)
00359         {
00360                 IPayload p = cache.get (key);
00361 
00362                 if (p)
00363                    {
00364                    ClusterEntry entry = cast(ClusterEntry) p;
00365                    return entry.getData;
00366                    }
00367                 return null;
00368         }
00369 
00370         /**********************************************************************
00371 
00372                 Add a cache lock where the entry is invalid or unlocked.
00373                 Returns true if locked by this call, false otherwise. Note
00374                 that this will return false if the entry is already locked.
00375 
00376         **********************************************************************/
00377 
00378         bool lockWhereInvalid (char[] key, long time)
00379         {
00380                 IPayload p = cache.get (key);
00381                 if (p)
00382                    {
00383                    ClusterEntry entry = cast(ClusterEntry) p;
00384                    if (entry.getTime () > time)
00385                        return false;
00386                    }
00387 
00388                 if (locks.get (key))
00389                     return false;
00390 
00391                 // place any old object in the lock list
00392                 locks.put (key, this);
00393                 return true;
00394         }
00395 
00396         /**********************************************************************
00397 
00398         **********************************************************************/
00399 
00400         void unlock (char[] key)
00401         {
00402                 locks.remove (key);
00403         }
00404 }
00405 
00406 
00407 /*******************************************************************************
00408 
00409 *******************************************************************************/
00410 
00411 private class ClusterEntry : Payload
00412 {
00413         private ClusterContent          data;
00414         private ClusterEntry            next;   
00415         private static ClusterEntry     freelist;
00416 
00417         /***********************************************************************
00418 
00419         ***********************************************************************/
00420 
00421         void setData (ClusterContent data)
00422         {
00423                 this.data = data;
00424         }
00425 
00426         /***********************************************************************
00427 
00428         ***********************************************************************/
00429 
00430         ClusterContent getData ()
00431         {
00432                 return data;
00433         }
00434 
00435         /***********************************************************************
00436 
00437                 Allocate an entry from a list rather than creating a new one
00438 
00439         ***********************************************************************/
00440 
00441         static ClusterEntry create ()
00442         {  
00443                 ClusterEntry s;
00444         
00445                 if (freelist)
00446                    {
00447                    s = freelist;
00448                    freelist = s.next;
00449                    }
00450                 else
00451                    s = new ClusterEntry;
00452                 return s;
00453         }
00454 
00455         /***********************************************************************
00456 
00457                 Return this Payload to the free-list
00458 
00459         ***********************************************************************/
00460 
00461         void destroy ()
00462         {
00463                 data = null;
00464                 next = freelist;
00465                 freelist = this;
00466         }
00467 }
00468 
00469 
00470 /******************************************************************************
00471         
00472 ******************************************************************************/
00473 
00474 private class ChannelQueue
00475 {
00476         private Link            head,
00477                                 tail;
00478         private int             count;
00479         IChannel                channel;
00480 
00481         /**********************************************************************
00482 
00483         **********************************************************************/
00484 
00485         private class Link
00486         {
00487                 Link            prev,
00488                                 next;
00489                 ClusterContent  data;
00490 
00491                 static Link     freeList;
00492 
00493                 /**************************************************************
00494 
00495                 **************************************************************/
00496 
00497                 Link append (Link after)
00498                 {
00499                         if (after)
00500                            {
00501                            next = after.next;
00502 
00503                            // patch 'next' to point at me
00504                            if (next)
00505                                next.prev = this;
00506 
00507                            //patch 'after' to point at me
00508                            prev = after;
00509                            after.next = this;
00510                            }
00511                         return this;
00512                 }
00513 
00514                 /**************************************************************
00515 
00516                 **************************************************************/
00517 
00518                 Link unlink ()
00519                 {
00520                         // make 'prev' and 'next' entries see each other
00521                         if (prev)
00522                             prev.next = next;
00523 
00524                         if (next)
00525                             next.prev = prev;
00526 
00527                         // Murphy's law 
00528                         next = prev = null;
00529                         return this;
00530                 }
00531 
00532                 /**************************************************************
00533 
00534                 **************************************************************/
00535 
00536                 Link create ()
00537                 {
00538                         Link l;
00539 
00540                         if (freeList)
00541                            {
00542                            l = freeList;
00543                            freeList = l.next;
00544                            }
00545                         else
00546                            l = new Link;
00547                         return l;                       
00548                 }
00549 
00550                 /**************************************************************
00551 
00552                 **************************************************************/
00553 
00554                 void destroy ()
00555                 {
00556                         next = freeList;
00557                         freeList = this;
00558                         this.data = null;
00559                 }
00560         }
00561 
00562 
00563         /**********************************************************************
00564 
00565         **********************************************************************/
00566 
00567         this (IChannel channel)
00568         {
00569                 head = tail = new Link;
00570                 this.channel = channel;
00571         }
00572 
00573         /**********************************************************************
00574 
00575                 Add the specified content to the queue at the current
00576                 tail position, and bump tail to the next Link
00577 
00578         **********************************************************************/
00579 
00580         void put (ClusterContent content)
00581         {
00582                 tail.data = content;
00583                 tail = tail.create.append (tail);
00584                 ++count;
00585         }       
00586 
00587         /**********************************************************************
00588 
00589                 Extract from the head, which is the oldest item in the 
00590                 queue. The removed Link is then appended to the tail, 
00591                 ready for another put. Head is adjusted to point at the
00592                 next valid queue entry.
00593 
00594         **********************************************************************/
00595 
00596         ClusterContent get ()
00597         {
00598                 if (head !== tail)
00599                    {
00600                    // printf ("removing link\n");
00601                    Link l = head;
00602                    head = head.next;
00603                    ClusterContent ret = l.data;
00604                    l.unlink ();
00605                    l.destroy ();
00606                    --count;
00607                    return ret;
00608                    }
00609                 return null;                   
00610         }       
00611 }
00612 
00613 

Generated on Sun Nov 7 19:06:50 2004 for Mango by doxygen 1.3.6