00001 /******************************************************************************* 00002 00003 @file NetworkCache.d 00004 00005 Copyright (C) 2004 Kris Bell 00006 00007 This software is provided 'as-is', without any express or implied 00008 warranty. In no event will the authors be held liable for damages 00009 of any kind arising from the use of this software. 00010 00011 Permission is hereby granted to anyone to use this software for any 00012 purpose, including commercial applications, and to alter it and/or 00013 redistribute it freely, subject to the following restrictions: 00014 00015 1. The origin of this software must not be misrepresented; you must 00016 not claim that you wrote the original software. If you use this 00017 software in a product, an acknowledgment within documentation of 00018 said product would be appreciated but is not required. 00019 00020 2. Altered source versions must be plainly marked as such, and must 00021 not be misrepresented as being the original software. 00022 00023 3. This notice may not be removed or altered from any distribution 00024 of the source. 00025 00026 00027 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 00028 00029 00030 @version Initial version, July 2004 00031 @author Kris 00032 00033 00034 *******************************************************************************/ 00035 00036 module mango.cluster.NetworkCache; 00037 00038 private import mango.base.System; 00039 00040 private import mango.cache.model.ICache; 00041 00042 private import mango.cluster.CacheInvalidator, 00043 mango.cluster.CacheInvalidatee; 00044 00045 /******************************************************************************* 00046 00047 A gateway to the network cache. From here you can easily place 00048 IPayload objects into the network cluster, copy them and remove 00049 them. A cluster cache is spread out across many servers within 00050 the network. Each cache entry is associated with a 'channel', 00051 which is effectively the name of a cache instance within the 00052 cluster. See ComboCache also. The basic procedure is so: 00053 00054 @code 00055 import mango.cluster.NetworkCache; 00056 import mango.cluster.qos.socket.Cluster; 00057 00058 ICluster cluster = new Cluster (...); 00059 NetworkCache cache = new NetworkCache (cluster, ...); 00060 00061 cache.put (...); 00062 cache.get (...); 00063 cache.invalidate (...); 00064 @endcode 00065 00066 Note that any content placed into the cache must implement the 00067 IPayload interface, and must be enrolled with PickleRegistry, as 00068 it will be frozen and thawed as it travels around the network. 00069 00070 *******************************************************************************/ 00071 00072 class NetworkCache : CacheInvalidator 00073 { 00074 /*********************************************************************** 00075 00076 Construct a NetworkCache using the QOS (cluster) provided, 00077 and hook it onto the specified channel. Each subsequent 00078 operation is tied to this channel. 00079 00080 ***********************************************************************/ 00081 00082 this (ICluster cluster, char[] channel) 00083 { 00084 super (cluster, channel); 00085 } 00086 00087 /*********************************************************************** 00088 00089 Returns a copy of the cluster cache entry corresponding to 00090 the provided key. Returns null if there is no such entry. 00091 00092 ***********************************************************************/ 00093 00094 IPayload get (char[] key) 00095 in { 00096 assert (key.length); 00097 } 00098 body 00099 { 00100 return getCluster.getCache (getChannel, key, false); 00101 } 00102 00103 /*********************************************************************** 00104 00105 Remove and return the cache entry corresponding to the 00106 provided key. 00107 00108 ***********************************************************************/ 00109 00110 IPayload extract (char[] key) 00111 in { 00112 assert (key.length); 00113 } 00114 body 00115 { 00116 return getCluster.getCache (getChannel, key, true); 00117 } 00118 00119 /*********************************************************************** 00120 00121 Add a cluster cache entry. The entry will be placed in 00122 one or more of the cluster servers (depending upon QOS). 00123 00124 ***********************************************************************/ 00125 00126 IPayload put (char[] key, IPayload payload) 00127 in { 00128 assert (key.length); 00129 assert (payload); 00130 } 00131 body 00132 { 00133 return getCluster.putCache (getChannel, key, payload); 00134 } 00135 00136 00137 /********************************************************************** 00138 00139 Bind a local loader to this network cache. The loader 00140 will check that a cache entry is valid, and load up a 00141 fresh instance where a stale one is found. Each stale 00142 entry with the equivalent timestamp is removed across 00143 the entire network, 00144 00145 **********************************************************************/ 00146 00147 ICache bind (ICacheLoader loader) 00148 { 00149 return new LocalLoader (this, loader); 00150 } 00151 00152 /********************************************************************** 00153 00154 Bind a remote cache loader. Remote loaders are great for 00155 gating/synchronizing access to a particular resource over 00156 the entire cluster. When a cache entry fails its validity 00157 test, the loader is executed remotely by the cache host; 00158 the cache entry itself is 'locked' for the duration, such 00159 that requests from any cluster node will stall until the 00160 new entry is loaded. This is a convenient way to restrict 00161 load on an expensive resource (such as a slow or very busy 00162 back-end server). 00163 00164 **********************************************************************/ 00165 00166 ICache bind (IRemoteCacheLoader loader) 00167 { 00168 return bind (new RemoteLoader (this, loader)); 00169 } 00170 00171 /********************************************************************** 00172 00173 00174 **********************************************************************/ 00175 00176 private class LocalLoader : ICache 00177 { 00178 NetworkCache cache; 00179 ICacheLoader loader; 00180 00181 /************************************************************** 00182 00183 **************************************************************/ 00184 00185 this (NetworkCache cache, ICacheLoader loader) 00186 { 00187 this.cache = cache; 00188 this.loader = loader; 00189 } 00190 00191 /************************************************************** 00192 00193 **************************************************************/ 00194 00195 IPayload get (char[] key) 00196 { 00197 long time = long.min; 00198 IPayload p = cache.get (key); 00199 00200 if (p) 00201 { 00202 if (loader.test (p)) 00203 return p; 00204 00205 // set the "newer than" time to the old entry 00206 time = p.getTime (); 00207 00208 // invalidate the entire network. This is done 00209 // so that subsequent lookups on a local cache 00210 // will check the cluster first, before trying 00211 // to load yet another new instance. 00212 cache.invalidate (key, time); 00213 } 00214 00215 p = loader.load (key, time); 00216 if (p) 00217 cache.put (key, p); 00218 return p; 00219 } 00220 } 00221 00222 /********************************************************************** 00223 00224 **********************************************************************/ 00225 00226 private class RemoteLoader : ICacheLoader 00227 { 00228 NetworkCache cache; 00229 IRemoteCacheLoader loader; 00230 00231 /************************************************************** 00232 00233 **************************************************************/ 00234 00235 this (NetworkCache cache, IRemoteCacheLoader loader) 00236 { 00237 this.cache = cache; 00238 this.loader = loader; 00239 } 00240 00241 /************************************************************** 00242 00243 **************************************************************/ 00244 00245 bool test (IPayload p) 00246 { 00247 return loader.test (p); 00248 } 00249 00250 /************************************************************** 00251 00252 **************************************************************/ 00253 00254 IPayload load (char[] key, long newerThan) 00255 { 00256 uint wait, 00257 pause; 00258 00259 // set the loader time to be that of the old cache 00260 // entry (zero if there was none). 00261 loader.setTime (newerThan); 00262 00263 // tell cache to load this entry remotely & atomically. 00264 // This may be ignored if someone else is loading the 00265 // entry already, or the current entry is new enough. 00266 cache.getCluster.loadCache (cache.getChannel(), key, loader); 00267 00268 // wait for it to appear ... 00269 do { 00270 // get current entry 00271 IPayload r = cache.get (key); 00272 00273 // ready to go home? We might pick up an older version 00274 if (r && loader.test (r)) 00275 return r; 00276 00277 pause = loader.pause (wait); 00278 System.sleep (pause); 00279 wait += pause; 00280 } while (pause); 00281 00282 return null; 00283 } 00284 } 00285 } 00286 00287 00288 /******************************************************************************* 00289 00290 A combination of a local cache, cluster cache, and CacheInvalidatee. 00291 The two cache instances are combined such that they represent a 00292 classic level1/level2 cache. The CacheInvalidatee ensures that the 00293 level1 cache maintains coherency with the cluster. 00294 00295 *******************************************************************************/ 00296 00297 class NetworkCombo : NetworkCache 00298 { 00299 private IMutableCache cache; 00300 private CacheInvalidatee invalidatee; 00301 00302 /*********************************************************************** 00303 00304 Construct a ComboCache for the specified local cache, and 00305 on the given cluster channel. 00306 00307 ***********************************************************************/ 00308 00309 this (ICluster cluster, char[] channel, IMutableCache cache) 00310 in { 00311 assert (cache); 00312 } 00313 body 00314 { 00315 super (cluster, channel); 00316 00317 this.cache = cache; 00318 invalidatee = new CacheInvalidatee (cluster, channel, cache); 00319 } 00320 00321 /*********************************************************************** 00322 00323 Return the IMutableCache instance provided during construction 00324 00325 ***********************************************************************/ 00326 00327 IMutableCache getCache () 00328 { 00329 return cache; 00330 } 00331 00332 /*********************************************************************** 00333 00334 Get an IPayload from the local cache, and revert to the 00335 cluster cache if it's not found. Cluster lookups will 00336 place new content into the local cache. 00337 00338 ***********************************************************************/ 00339 00340 IPayload get (char[] key) 00341 { 00342 IPayload payload; 00343 00344 if ((payload = cache.get (key)) is null) 00345 { 00346 payload = super.get (key); 00347 if (payload) 00348 cache.put (key, payload); 00349 } 00350 return payload; 00351 } 00352 00353 /*********************************************************************** 00354 00355 Place a new entry into the cache. This will also place 00356 the entry into the cluster, and optionally invalidate 00357 all other local cache instances across the network. If 00358 a cache entry exists with the same key, it is replaced. 00359 Note that when using the coherency option you should 00360 ensure your IPayload has a valid time stamp, since that 00361 is used to set the cluster-wide "invalidation level". 00362 You can use the getTime() method to retrieve the current 00363 millisecond count. 00364 00365 ***********************************************************************/ 00366 00367 void put (char[] key, IPayload payload, bool coherent = false) 00368 { 00369 // this will throw an exception if there's a problem 00370 super.put (key, payload); 00371 00372 // place into local cache also 00373 cache.put (key, payload); 00374 00375 // invalidate all other cache instances except this new one, 00376 // such that no other listening cache has the same key 00377 if (coherent) 00378 invalidate (key, payload.getTime); 00379 } 00380 00381 /*********************************************************************** 00382 00383 Remove and return the cache entry corresponding to the 00384 provided key. 00385 00386 ***********************************************************************/ 00387 00388 IPayload extract (char[] key) 00389 { 00390 cache.extract (key); 00391 return super.extract (key); 00392 } 00393 } 00394 00395 00396