Main Page | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | File Members | Related Pages

ClusterCache.d

Go to the documentation of this file.
00001 /*******************************************************************************
00002 
00003         @file ClusterCache.d
00004         
00005         Copyright (c) 2004 Kris Bell
00006         
00007         This software is provided 'as-is', without any express or implied
00008         warranty. In no event will the authors be held liable for damages
00009         of any kind arising from the use of this software.
00010         
00011         Permission is hereby granted to anyone to use this software for any 
00012         purpose, including commercial applications, and to alter it and/or 
00013         redistribute it freely, subject to the following restrictions:
00014         
00015         1. The origin of this software must not be misrepresented; you must 
00016            not claim that you wrote the original software. If you use this 
00017            software in a product, an acknowledgment within documentation of 
00018            said product would be appreciated but is not required.
00019 
00020         2. Altered source versions must be plainly marked as such, and must 
00021            not be misrepresented as being the original software.
00022 
00023         3. This notice may not be removed or altered from any distribution
00024            of the source.
00025 
00026         4. Derivative works are permitted, but they must carry this notice
00027            in full and credit the original source.
00028 
00029 
00030                         ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
00031 
00032         
00033         @version        Initial version, April 2004      
00034         @author         Kris
00035 
00036 
00037 *******************************************************************************/
00038 
00039 module mango.cluster.qos.socket.ClusterCache;
00040 
00041 private import  std.thread;
00042 
00043 private import  mango.base.System;
00044 
00045 private import  mango.utils.Random;
00046 
00047 private import  mango.cache.HashMap,
00048                 mango.cache.Payload,
00049                 mango.cache.QueuedCache;
00050 
00051 private import  mango.cache.model.IPayload;
00052 
00053 private import  mango.cluster.CacheInvalidatee;
00054 
00055 private import  mango.cluster.qos.socket.Cluster;
00056 
00057 
00058 /******************************************************************************
00059        
00060         The socket QOS cache containers. These are created by ClusterServer
00061         and maintained via ClusterThread.
00062          
00063 ******************************************************************************/
00064 
00065 class ClusterCache
00066 {
00067         private Cluster         cluster;
00068         private HashMap         cacheSet;
00069         private uint            defaultSize;
00070         
00071         /**********************************************************************
00072 
00073         **********************************************************************/
00074 
00075         this (Cluster cluster, uint defaultSize)
00076         {
00077                 this.cacheSet = new HashMap (256);
00078                 this.defaultSize = defaultSize;
00079                 this.cluster = cluster;
00080         }
00081 
00082         /**********************************************************************
00083 
00084         **********************************************************************/
00085 
00086         private final ChannelCache lookup (char[] channel)
00087         {       
00088                 return cast(ChannelCache) cacheSet.get (channel);
00089         }       
00090 
00091         /**********************************************************************
00092 
00093         **********************************************************************/
00094 
00095         ChannelCache addCache (char[] channel, uint size)
00096         {       
00097                 // create new cache instance
00098                 ChannelCache cache = new ChannelCache (cluster, channel, size);
00099 
00100                 // add to current list
00101                 cacheSet.put (channel, cache); 
00102 
00103                 return cache;
00104         }       
00105 
00106         /**********************************************************************
00107 
00108         **********************************************************************/
00109 
00110         void put (char[] channel, char[] element, ClusterContent content)
00111         {       
00112                 ChannelCache cache = lookup (channel);
00113 
00114                 if (cache is null)
00115                     cache = addCache (channel.dup, defaultSize);
00116 
00117                 cache.put (element, content);
00118         }       
00119 
00120         /**********************************************************************
00121 
00122         **********************************************************************/
00123 
00124         ClusterContent extract (char[] channel, char[] element)
00125         {
00126                 ChannelCache cache = lookup (channel);
00127 
00128                 if (cache)
00129                     return cache.extract (element);
00130 
00131                 return null;
00132         }
00133 
00134         /**********************************************************************
00135 
00136         **********************************************************************/
00137 
00138         ClusterContent get (char[] channel, char[] element)
00139         {
00140                 ChannelCache cache = lookup (channel);
00141 
00142                 if (cache)
00143                     return cache.get (element);
00144 
00145                 return null;
00146         }
00147 
00148         /**********************************************************************
00149 
00150                 Add a cache lock where the entry is invalid or unlocked.
00151                 Returns true if locked by this call, false otherwise. Note
00152                 that this will return false if the entry is already locked.
00153 
00154         **********************************************************************/
00155 
00156         bool lockWhereInvalid (char[] channel, char[] element, long time)
00157         {
00158                 ChannelCache cache = lookup (channel);
00159 
00160                 // create new cache if necessary (we're about to load an entry)
00161                 if (cache is null)
00162                     cache = addCache (channel.dup, defaultSize);
00163 
00164                 return cache.lockWhereInvalid (element, time);
00165         }
00166 
00167         /**********************************************************************
00168 
00169         **********************************************************************/
00170 
00171         void unlock (char[] channel, char[] element)
00172         {
00173                 ChannelCache cache = lookup (channel);
00174 
00175                 if (cache)
00176                     cache.unlock (element);
00177         }
00178 }
00179 
00180 
00181 /******************************************************************************
00182         
00183 ******************************************************************************/
00184 
00185 class ClusterQueue : Thread
00186 {
00187         private uint            size,
00188                                 used,
00189                                 sleep;
00190         private ILogger         logger;
00191         private Cluster         cluster;
00192         private HashMap         queueSet;
00193 
00194         /**********************************************************************
00195 
00196         **********************************************************************/
00197 
00198         this (Cluster cluster, uint size, uint sleep)
00199         {
00200                 queueSet = new HashMap (256);
00201                 logger = cluster.getLogger ();
00202                 this.cluster = cluster;
00203                 this.sleep = sleep;
00204                 this.size = size;
00205 
00206                 start ();
00207         }
00208 
00209         /**********************************************************************
00210 
00211         **********************************************************************/
00212 
00213         private final ChannelQueue lookup (char[] channel)
00214         {
00215                 return cast(ChannelQueue) queueSet.get (channel);
00216         }
00217 
00218         /**********************************************************************
00219 
00220         **********************************************************************/
00221 
00222         bool put (char[] name, ClusterContent content)
00223         {       
00224                 if ((used + content.length) < size)
00225                    {
00226                    // select the appropriate queue
00227                    ChannelQueue queue = lookup (name);
00228                    if (queue is null)
00229                       {
00230                       // name is currently a reference only; copy it
00231                       name = name.dup;
00232 
00233                       logger.trace ("creating new queue for channel '" ~ name ~ "'");
00234 
00235                       // place new ChannelQueue into the list
00236                       queueSet.put (name, queue = new ChannelQueue (cluster.createChannel (name)));
00237                       }
00238 
00239                    queue.put (content);
00240                    used += content.length;
00241                    return true;
00242                    }
00243                 return false;
00244         }       
00245 
00246         /**********************************************************************
00247 
00248         **********************************************************************/
00249 
00250         synchronized ClusterContent get (char[] name)
00251         {
00252                 ClusterContent ret = null;
00253                 ChannelQueue   queue = lookup (name);
00254 
00255                 if (queue)
00256                    {
00257                    // printf ("get from queue '%.*s' (%d)\n", name, queue.count);
00258 
00259                    ret = queue.get ();
00260                    used -= ret.length;
00261                    }
00262                 return ret;
00263         }   
00264         
00265         /**********************************************************************
00266 
00267         **********************************************************************/
00268 
00269         override int run ()
00270         {       
00271                 while (true)
00272                       {
00273                       System.sleep (Random.get (sleep));
00274 
00275                       try {
00276                           foreach (char[] k, Object o; queueSet)
00277                                   {
00278                                   ChannelQueue q = cast(ChannelQueue) o;
00279                                   if (q.count)
00280                                      {
00281                                      IChannel c = q.channel;
00282 
00283                                      if (logger.isEnabled (logger.Level.Trace))
00284                                          logger.trace ("publishing queue channel '" ~ 
00285                                                         c.getName ~ "'");
00286                                      cluster.broadcast (c);
00287                                      }
00288                                   }
00289                           } catch (Object x)
00290                                    logger.error ("queue-publisher: "~x.toString);
00291                       }
00292                 return 0;
00293         }           
00294 }
00295 
00296 
00297 /******************************************************************************
00298        
00299 ******************************************************************************/
00300 
00301 private class ChannelCache 
00302 {
00303         private HashMap         locks;
00304         private QueuedCache     cache;
00305 
00306         /**********************************************************************
00307 
00308         **********************************************************************/
00309 
00310         this (Cluster cluster, char[] channel, int cacheSize)
00311         {
00312                 cluster.getLogger.trace ("creating new cache for channel '" ~ channel ~ "'");
00313 
00314                 // disallow concurrent updates to the lock map, since the
00315                 // whole point of these locks is to allow just one thread 
00316                 // to update the cache
00317                 locks = new HashMap (256, 0.75, 1);
00318 
00319                 cache = new QueuedCache (cacheSize);
00320 
00321                 // construct an invalidation listener for this cache
00322                 new CacheInvalidatee (cluster, channel, cache);
00323         }
00324 
00325         /**********************************************************************
00326 
00327         **********************************************************************/
00328 
00329         void put (char[] key, ClusterContent content)
00330         in {
00331            assert (content);
00332            }
00333         body
00334         {
00335                 ClusterEntry entry = ClusterEntry.create ();
00336 
00337                 entry.setData (content);
00338                 cache.put (key, entry);
00339         }
00340 
00341         /**********************************************************************
00342 
00343         **********************************************************************/
00344 
00345         ClusterContent extract (char[] key)
00346         {
00347                 IPayload p = cache.extract (key);
00348 
00349                 if (p)
00350                    {
00351                    ClusterEntry entry = cast(ClusterEntry) p;
00352                    return entry.getData;
00353                    }
00354                 return null;
00355         }
00356 
00357         /**********************************************************************
00358 
00359         **********************************************************************/
00360 
00361         ClusterContent get (char[] key)
00362         {
00363                 IPayload p = cache.get (key);
00364 
00365                 if (p)
00366                    {
00367                    ClusterEntry entry = cast(ClusterEntry) p;
00368                    return entry.getData;
00369                    }
00370                 return null;
00371         }
00372 
00373         /**********************************************************************
00374 
00375                 Add a cache lock where the entry is invalid or unlocked.
00376                 Returns true if locked by this call, false otherwise. Note
00377                 that this will return false if the entry is already locked.
00378 
00379         **********************************************************************/
00380 
00381         bool lockWhereInvalid (char[] key, long time)
00382         {
00383                 IPayload p = cache.get (key);
00384                 if (p)
00385                    {
00386                    ClusterEntry entry = cast(ClusterEntry) p;
00387                    if (entry.getTime () > time)
00388                        return false;
00389                    }
00390 
00391                 if (locks.get (key))
00392                     return false;
00393 
00394                 // place any old object in the lock list
00395                 locks.put (key, this);
00396                 return true;
00397         }
00398 
00399         /**********************************************************************
00400 
00401         **********************************************************************/
00402 
00403         void unlock (char[] key)
00404         {
00405                 locks.remove (key);
00406         }
00407 }
00408 
00409 
00410 /*******************************************************************************
00411 
00412 *******************************************************************************/
00413 
00414 private class ClusterEntry : Payload
00415 {
00416         private ClusterContent          data;
00417         private ClusterEntry            next;   
00418         private static ClusterEntry     freelist;
00419 
00420         /***********************************************************************
00421 
00422         ***********************************************************************/
00423 
00424         void setData (ClusterContent data)
00425         {
00426                 this.data = data;
00427         }
00428 
00429         /***********************************************************************
00430 
00431         ***********************************************************************/
00432 
00433         ClusterContent getData ()
00434         {
00435                 return data;
00436         }
00437 
00438         /***********************************************************************
00439 
00440                 Allocate an entry from a list rather than creating a new one
00441 
00442         ***********************************************************************/
00443 
00444         static ClusterEntry create ()
00445         {  
00446                 ClusterEntry s;
00447         
00448                 if (freelist)
00449                    {
00450                    s = freelist;
00451                    freelist = s.next;
00452                    }
00453                 else
00454                    s = new ClusterEntry;
00455                 return s;
00456         }
00457 
00458         /***********************************************************************
00459 
00460                 Return this Payload to the free-list
00461 
00462         ***********************************************************************/
00463 
00464         void destroy ()
00465         {
00466                 data = null;
00467                 next = freelist;
00468                 freelist = this;
00469         }
00470 }
00471 
00472 
00473 /******************************************************************************
00474         
00475 ******************************************************************************/
00476 
00477 private class ChannelQueue
00478 {
00479         private Link            head,
00480                                 tail;
00481         private int             count;
00482         IChannel                channel;
00483 
00484         /**********************************************************************
00485 
00486         **********************************************************************/
00487 
00488         private class Link
00489         {
00490                 Link            prev,
00491                                 next;
00492                 ClusterContent  data;
00493 
00494                 static Link     freeList;
00495 
00496                 /**************************************************************
00497 
00498                 **************************************************************/
00499 
00500                 Link append (Link after)
00501                 {
00502                         if (after)
00503                            {
00504                            next = after.next;
00505 
00506                            // patch 'next' to point at me
00507                            if (next)
00508                                next.prev = this;
00509 
00510                            //patch 'after' to point at me
00511                            prev = after;
00512                            after.next = this;
00513                            }
00514                         return this;
00515                 }
00516 
00517                 /**************************************************************
00518 
00519                 **************************************************************/
00520 
00521                 Link unlink ()
00522                 {
00523                         // make 'prev' and 'next' entries see each other
00524                         if (prev)
00525                             prev.next = next;
00526 
00527                         if (next)
00528                             next.prev = prev;
00529 
00530                         // Murphy's law 
00531                         next = prev = null;
00532                         return this;
00533                 }
00534 
00535                 /**************************************************************
00536 
00537                 **************************************************************/
00538 
00539                 Link create ()
00540                 {
00541                         Link l;
00542 
00543                         if (freeList)
00544                            {
00545                            l = freeList;
00546                            freeList = l.next;
00547                            }
00548                         else
00549                            l = new Link;
00550                         return l;                       
00551                 }
00552 
00553                 /**************************************************************
00554 
00555                 **************************************************************/
00556 
00557                 void destroy ()
00558                 {
00559                         next = freeList;
00560                         freeList = this;
00561                         this.data = null;
00562                 }
00563         }
00564 
00565 
00566         /**********************************************************************
00567 
00568         **********************************************************************/
00569 
00570         this (IChannel channel)
00571         {
00572                 head = tail = new Link;
00573                 this.channel = channel;
00574         }
00575 
00576         /**********************************************************************
00577 
00578                 Add the specified content to the queue at the current
00579                 tail position, and bump tail to the next Link
00580 
00581         **********************************************************************/
00582 
00583         void put (ClusterContent content)
00584         {
00585                 tail.data = content;
00586                 tail = tail.create.append (tail);
00587                 ++count;
00588         }       
00589 
00590         /**********************************************************************
00591 
00592                 Extract from the head, which is the oldest item in the 
00593                 queue. The removed Link is then appended to the tail, 
00594                 ready for another put. Head is adjusted to point at the
00595                 next valid queue entry.
00596 
00597         **********************************************************************/
00598 
00599         ClusterContent get ()
00600         {
00601                 if (head !== tail)
00602                    {
00603                    // printf ("removing link\n");
00604                    Link l = head;
00605                    head = head.next;
00606                    ClusterContent ret = l.data;
00607                    l.unlink ();
00608                    l.destroy ();
00609                    --count;
00610                    return ret;
00611                    }
00612                 return null;                   
00613         }       
00614 }
00615 
00616 

Generated on Sat Apr 9 20:11:25 2005 for Mango by doxygen 1.3.6