Main Page | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Class Members | File Members | Related Pages

ClusterCache.d

Go to the documentation of this file.
00001 /*******************************************************************************
00002 
00003         @file ClusterCache.d
00004         
00005         Copyright (c) 2004 Kris Bell
00006         
00007         This software is provided 'as-is', without any express or implied
00008         warranty. In no event will the authors be held liable for damages
00009         of any kind arising from the use of this software.
00010         
00011         Permission is hereby granted to anyone to use this software for any 
00012         purpose, including commercial applications, and to alter it and/or 
00013         redistribute it freely, subject to the following restrictions:
00014         
00015         1. The origin of this software must not be misrepresented; you must 
00016            not claim that you wrote the original software. If you use this 
00017            software in a product, an acknowledgment within documentation of 
00018            said product would be appreciated but is not required.
00019 
00020         2. Altered source versions must be plainly marked as such, and must 
00021            not be misrepresented as being the original software.
00022 
00023         3. This notice may not be removed or altered from any distribution
00024            of the source.
00025 
00026         4. Derivative works are permitted, but they must carry this notice
00027            in full and credit the original source.
00028 
00029 
00030                         ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
00031 
00032         
00033         @version        Initial version, April 2004      
00034         @author         Kris
00035 
00036 
00037 *******************************************************************************/
00038 
00039 module mango.cluster.qos.socket.ClusterCache;
00040 
00041 private import  std.thread;
00042 
00043 private import  mango.sys.System;
00044 
00045 private import  mango.utils.Random;
00046 
00047 private import  mango.cache.HashMap,
00048                 mango.cache.Payload,
00049                 mango.cache.QueuedCache;
00050 
00051 private import  mango.cache.model.IPayload;
00052 
00053 private import  mango.cluster.CacheInvalidatee;
00054 
00055 private import  mango.cluster.qos.socket.Cluster;
00056 
00057 
00058 /******************************************************************************
00059        
00060         The socket QOS cache containers. These are created by ClusterServer
00061         and maintained via ClusterThread.
00062          
00063 ******************************************************************************/
00064 
00065 class ClusterCache
00066 {
00067         private Cluster         cluster;
00068         private HashMap         cacheSet;
00069         private uint            defaultSize;
00070         
00071         /**********************************************************************
00072 
00073         **********************************************************************/
00074 
00075         this (Cluster cluster, uint defaultSize)
00076         {
00077                 this.cacheSet = new HashMap (256);
00078                 this.defaultSize = defaultSize;
00079                 this.cluster = cluster;
00080         }
00081 
00082         /**********************************************************************
00083 
00084         **********************************************************************/
00085 
00086         private final ChannelCache lookup (char[] channel)
00087         {       
00088                 return cast(ChannelCache) cacheSet.get (channel);
00089         }       
00090 
00091         /**********************************************************************
00092 
00093         **********************************************************************/
00094 
00095         ChannelCache addCache (char[] channel, uint size)
00096         {       
00097                 // create new cache instance
00098                 ChannelCache cache = new ChannelCache (cluster, channel, size);
00099 
00100                 // add to current list
00101                 cacheSet.put (channel, cache); 
00102 
00103                 return cache;
00104         }       
00105 
00106         /**********************************************************************
00107 
00108         **********************************************************************/
00109 
00110         void put (char[] channel, char[] element, ClusterContent content)
00111         {       
00112                 ChannelCache cache = lookup (channel);
00113 
00114                 if (cache is null)
00115                     cache = addCache (channel.dup, defaultSize);
00116 
00117                 cache.put (element, content);
00118         }       
00119 
00120         /**********************************************************************
00121 
00122         **********************************************************************/
00123 
00124         ClusterContent extract (char[] channel, char[] element)
00125         {
00126                 ChannelCache cache = lookup (channel);
00127 
00128                 if (cache)
00129                     return cache.extract (element);
00130 
00131                 return null;
00132         }
00133 
00134         /**********************************************************************
00135 
00136         **********************************************************************/
00137 
00138         ClusterContent get (char[] channel, char[] element)
00139         {
00140                 ChannelCache cache = lookup (channel);
00141 
00142                 if (cache)
00143                     return cache.get (element);
00144 
00145                 return null;
00146         }
00147 
00148         /**********************************************************************
00149 
00150                 Add a cache lock where the entry is invalid or unlocked.
00151                 Returns true if locked by this call, false otherwise. Note
00152                 that this will return false if the entry is already locked.
00153 
00154         **********************************************************************/
00155 
00156         bool lockWhereInvalid (char[] channel, char[] element, long time)
00157         {
00158                 ChannelCache cache = lookup (channel);
00159 
00160                 // create new cache if necessary (we're about to load an entry)
00161                 if (cache is null)
00162                     cache = addCache (channel.dup, defaultSize);
00163 
00164                 return cache.lockWhereInvalid (element, time);
00165         }
00166 
00167         /**********************************************************************
00168 
00169         **********************************************************************/
00170 
00171         void unlock (char[] channel, char[] element)
00172         {
00173                 ChannelCache cache = lookup (channel);
00174 
00175                 if (cache)
00176                     cache.unlock (element);
00177         }
00178 }
00179 
00180 
00181 /******************************************************************************
00182         
00183 ******************************************************************************/
00184 
00185 class ClusterQueue : Thread
00186 {
00187         private uint            size,
00188                                 used,
00189                                 sleep;
00190         private ILogger         logger;
00191         private Cluster         cluster;
00192         private HashMap         queueSet;
00193 
00194         /**********************************************************************
00195 
00196         **********************************************************************/
00197 
00198         this (Cluster cluster, uint size, uint sleep)
00199         {
00200                 queueSet = new HashMap (256);
00201                 logger = cluster.getLogger ();
00202                 this.cluster = cluster;
00203                 this.sleep = sleep;
00204                 this.size = size;
00205 
00206                 start ();
00207         }
00208 
00209         /**********************************************************************
00210 
00211         **********************************************************************/
00212 
00213         private final ChannelQueue lookup (char[] channel)
00214         {
00215                 return cast(ChannelQueue) queueSet.get (channel);
00216         }
00217 
00218         /**********************************************************************
00219 
00220         **********************************************************************/
00221 
00222         bool put (char[] name, ClusterContent content)
00223         {       
00224                 if ((used + content.length) < size)
00225                    {
00226                    // select the appropriate queue
00227                    ChannelQueue queue = lookup (name);
00228                    if (queue is null)
00229                       {
00230                       // name is currently a reference only; copy it
00231                       name = name.dup;
00232 
00233                       logger.trace ("creating new queue for channel '" ~ name ~ "'");
00234 
00235                       // place new ChannelQueue into the list
00236                       queueSet.put (name, queue = new ChannelQueue (cluster.createChannel (name)));
00237                       }
00238 
00239                    queue.put (content);
00240                    used += content.length;
00241                    return true;
00242                    }
00243                 return false;
00244         }       
00245 
00246         /**********************************************************************
00247 
00248         **********************************************************************/
00249 
00250         synchronized ClusterContent get (char[] name)
00251         {
00252                 ClusterContent ret = null;
00253                 ChannelQueue   queue = lookup (name);
00254 
00255                 if (queue)
00256                    {
00257                    // printf ("get from queue '%.*s' (%d)\n", name, queue.count);
00258 
00259                    ret = queue.get ();
00260                    used -= ret.length;
00261                    }
00262                 return ret;
00263         }   
00264         
00265         /**********************************************************************
00266 
00267         **********************************************************************/
00268 
00269         version (Ares) 
00270                  alias void ThreadReturn;
00271               else
00272                  alias int ThreadReturn;
00273 
00274         override ThreadReturn run ()
00275         {       
00276                 while (true)
00277                       {
00278                       System.sleep (Random.get (sleep));
00279 
00280                       try {
00281                           foreach (char[] k, Object o; queueSet)
00282                                   {
00283                                   ChannelQueue q = cast(ChannelQueue) o;
00284                                   if (q.count)
00285                                      {
00286                                      IChannel c = q.channel;
00287 
00288                                      if (logger.isEnabled (logger.Level.Trace))
00289                                          logger.trace ("publishing queue channel '" ~ 
00290                                                         c.getName ~ "'");
00291                                      cluster.broadcast (c);
00292                                      }
00293                                   }
00294                           } catch (Object x)
00295                                    logger.error ("queue-publisher: "~x.toString);
00296                       }
00297                 return 0;
00298         }           
00299 }
00300 
00301 
00302 /******************************************************************************
00303        
00304 ******************************************************************************/
00305 
00306 private class ChannelCache 
00307 {
00308         private HashMap         locks;
00309         private QueuedCache     cache;
00310 
00311         /**********************************************************************
00312 
00313         **********************************************************************/
00314 
00315         this (Cluster cluster, char[] channel, int cacheSize)
00316         {
00317                 cluster.getLogger.trace ("creating new cache for channel '" ~ channel ~ "'");
00318 
00319                 // disallow concurrent updates to the lock map, since the
00320                 // whole point of these locks is to allow just one thread 
00321                 // to update the cache
00322                 locks = new HashMap (256, 0.75, 1);
00323 
00324                 cache = new QueuedCache (cacheSize);
00325 
00326                 // construct an invalidation listener for this cache
00327                 new CacheInvalidatee (cluster, channel, cache);
00328         }
00329 
00330         /**********************************************************************
00331 
00332         **********************************************************************/
00333 
00334         void put (char[] key, ClusterContent content)
00335         in {
00336            assert (content);
00337            }
00338         body
00339         {
00340                 ClusterEntry entry = ClusterEntry.create ();
00341 
00342                 entry.setData (content);
00343                 cache.put (key, entry);
00344         }
00345 
00346         /**********************************************************************
00347 
00348         **********************************************************************/
00349 
00350         ClusterContent extract (char[] key)
00351         {
00352                 IPayload p = cache.extract (key);
00353 
00354                 if (p)
00355                    {
00356                    ClusterEntry entry = cast(ClusterEntry) cast(void*) p;
00357                    return entry.getData;
00358                    }
00359                 return null;
00360         }
00361 
00362         /**********************************************************************
00363 
00364         **********************************************************************/
00365 
00366         ClusterContent get (char[] key)
00367         {
00368                 IPayload p = cache.get (key);
00369 
00370                 if (p)
00371                    {
00372                    ClusterEntry entry = cast(ClusterEntry) cast(void*) p;
00373                    return entry.getData;
00374                    }
00375                 return null;
00376         }
00377 
00378         /**********************************************************************
00379 
00380                 Add a cache lock where the entry is invalid or unlocked.
00381                 Returns true if locked by this call, false otherwise. Note
00382                 that this will return false if the entry is already locked.
00383 
00384         **********************************************************************/
00385 
00386         bool lockWhereInvalid (char[] key, long time)
00387         {
00388                 IPayload p = cache.get (key);
00389                 if (p)
00390                    {
00391                    ClusterEntry entry = cast(ClusterEntry) cast(void*) p;
00392                    if (entry.getTime () > time)
00393                        return false;
00394                    }
00395 
00396                 if (locks.get (key))
00397                     return false;
00398 
00399                 // place any old object in the lock list
00400                 locks.put (key, this);
00401                 return true;
00402         }
00403 
00404         /**********************************************************************
00405 
00406         **********************************************************************/
00407 
00408         void unlock (char[] key)
00409         {
00410                 locks.remove (key);
00411         }
00412 }
00413 
00414 
00415 /*******************************************************************************
00416 
00417 *******************************************************************************/
00418 
00419 private class ClusterEntry : Payload
00420 {
00421         private ClusterContent          data;
00422         private ClusterEntry            next;   
00423         private static ClusterEntry     freelist;
00424 
00425         /***********************************************************************
00426 
00427         ***********************************************************************/
00428 
00429         void setData (ClusterContent data)
00430         {
00431                 this.data = data;
00432         }
00433 
00434         /***********************************************************************
00435 
00436         ***********************************************************************/
00437 
00438         ClusterContent getData ()
00439         {
00440                 return data;
00441         }
00442 
00443         /***********************************************************************
00444 
00445                 Allocate an entry from a list rather than creating a new one
00446 
00447         ***********************************************************************/
00448 
00449         static ClusterEntry create ()
00450         {  
00451                 ClusterEntry s;
00452         
00453                 if (freelist)
00454                    {
00455                    s = freelist;
00456                    freelist = s.next;
00457                    }
00458                 else
00459                    s = new ClusterEntry;
00460                 return s;
00461         }
00462 
00463         /***********************************************************************
00464 
00465                 Return this Payload to the free-list
00466 
00467         ***********************************************************************/
00468 
00469         void destroy ()
00470         {
00471                 data = null;
00472                 next = freelist;
00473                 freelist = this;
00474         }
00475 }
00476 
00477 
00478 /******************************************************************************
00479         
00480 ******************************************************************************/
00481 
00482 private class ChannelQueue
00483 {
00484         private Link            head,
00485                                 tail;
00486         private int             count;
00487         IChannel                channel;
00488 
00489         /**********************************************************************
00490 
00491         **********************************************************************/
00492 
00493         private class Link
00494         {
00495                 Link            prev,
00496                                 next;
00497                 ClusterContent  data;
00498 
00499                 static Link     freeList;
00500 
00501                 /**************************************************************
00502 
00503                 **************************************************************/
00504 
00505                 Link append (Link after)
00506                 {
00507                         if (after)
00508                            {
00509                            next = after.next;
00510 
00511                            // patch 'next' to point at me
00512                            if (next)
00513                                next.prev = this;
00514 
00515                            //patch 'after' to point at me
00516                            prev = after;
00517                            after.next = this;
00518                            }
00519                         return this;
00520                 }
00521 
00522                 /**************************************************************
00523 
00524                 **************************************************************/
00525 
00526                 Link unlink ()
00527                 {
00528                         // make 'prev' and 'next' entries see each other
00529                         if (prev)
00530                             prev.next = next;
00531 
00532                         if (next)
00533                             next.prev = prev;
00534 
00535                         // Murphy's law 
00536                         next = prev = null;
00537                         return this;
00538                 }
00539 
00540                 /**************************************************************
00541 
00542                 **************************************************************/
00543 
00544                 Link create ()
00545                 {
00546                         Link l;
00547 
00548                         if (freeList)
00549                            {
00550                            l = freeList;
00551                            freeList = l.next;
00552                            }
00553                         else
00554                            l = new Link;
00555                         return l;                       
00556                 }
00557 
00558                 /**************************************************************
00559 
00560                 **************************************************************/
00561 
00562                 void destroy ()
00563                 {
00564                         next = freeList;
00565                         freeList = this;
00566                         this.data = null;
00567                 }
00568         }
00569 
00570 
00571         /**********************************************************************
00572 
00573         **********************************************************************/
00574 
00575         this (IChannel channel)
00576         {
00577                 head = tail = new Link;
00578                 this.channel = channel;
00579         }
00580 
00581         /**********************************************************************
00582 
00583                 Add the specified content to the queue at the current
00584                 tail position, and bump tail to the next Link
00585 
00586         **********************************************************************/
00587 
00588         void put (ClusterContent content)
00589         {
00590                 tail.data = content;
00591                 tail = tail.create.append (tail);
00592                 ++count;
00593         }       
00594 
00595         /**********************************************************************
00596 
00597                 Extract from the head, which is the oldest item in the 
00598                 queue. The removed Link is then appended to the tail, 
00599                 ready for another put. Head is adjusted to point at the
00600                 next valid queue entry.
00601 
00602         **********************************************************************/
00603 
00604         ClusterContent get ()
00605         {
00606                 if (head !== tail)
00607                    {
00608                    // printf ("removing link\n");
00609                    Link l = head;
00610                    head = head.next;
00611                    ClusterContent ret = l.data;
00612                    l.unlink ();
00613                    l.destroy ();
00614                    --count;
00615                    return ret;
00616                    }
00617                 return null;                   
00618         }       
00619 }
00620 
00621 

Generated on Fri May 27 18:11:55 2005 for Mango by  doxygen 1.4.0