00001 /******************************************************************************* 00002 00003 @file NetworkCache.d 00004 00005 Copyright (c) 2004 Kris Bell 00006 00007 This software is provided 'as-is', without any express or implied 00008 warranty. In no event will the authors be held liable for damages 00009 of any kind arising from the use of this software. 00010 00011 Permission is hereby granted to anyone to use this software for any 00012 purpose, including commercial applications, and to alter it and/or 00013 redistribute it freely, subject to the following restrictions: 00014 00015 1. The origin of this software must not be misrepresented; you must 00016 not claim that you wrote the original software. If you use this 00017 software in a product, an acknowledgment within documentation of 00018 said product would be appreciated but is not required. 00019 00020 2. Altered source versions must be plainly marked as such, and must 00021 not be misrepresented as being the original software. 00022 00023 3. This notice may not be removed or altered from any distribution 00024 of the source. 00025 00026 4. Derivative works are permitted, but they must carry this notice 00027 in full and credit the original source. 00028 00029 00030 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 00031 00032 00033 @version Initial version, July 2004 00034 @author Kris 00035 00036 00037 *******************************************************************************/ 00038 00039 module mango.cluster.NetworkCache; 00040 00041 private import mango.sys.System; 00042 00043 private import mango.cache.model.ICache; 00044 00045 private import mango.cluster.CacheInvalidator, 00046 mango.cluster.CacheInvalidatee; 00047 00048 /******************************************************************************* 00049 00050 A gateway to the network cache. From here you can easily place 00051 IPayload objects into the network cluster, copy them and remove 00052 them. A cluster cache is spread out across many servers within 00053 the network. Each cache entry is associated with a 'channel', 00054 which is effectively the name of a cache instance within the 00055 cluster. See ComboCache also. The basic procedure is so: 00056 00057 @code 00058 import mango.cluster.NetworkCache; 00059 import mango.cluster.qos.socket.Cluster; 00060 00061 ICluster cluster = new Cluster (...); 00062 NetworkCache cache = new NetworkCache (cluster, ...); 00063 00064 cache.put (...); 00065 cache.get (...); 00066 cache.invalidate (...); 00067 @endcode 00068 00069 Note that any content placed into the cache must implement the 00070 IPayload interface, and must be enrolled with PickleRegistry, as 00071 it will be frozen and thawed as it travels around the network. 00072 00073 *******************************************************************************/ 00074 00075 class NetworkCache : CacheInvalidator 00076 { 00077 /*********************************************************************** 00078 00079 Construct a NetworkCache using the QOS (cluster) provided, 00080 and hook it onto the specified channel. Each subsequent 00081 operation is tied to this channel. 00082 00083 ***********************************************************************/ 00084 00085 this (ICluster cluster, char[] channel) 00086 { 00087 super (cluster, channel); 00088 } 00089 00090 /*********************************************************************** 00091 00092 Returns a copy of the cluster cache entry corresponding to 00093 the provided key. Returns null if there is no such entry. 00094 00095 ***********************************************************************/ 00096 00097 IPayload get (char[] key) 00098 in { 00099 assert (key.length); 00100 } 00101 body 00102 { 00103 return getCluster.getCache (getChannel, key, false); 00104 } 00105 00106 /*********************************************************************** 00107 00108 Remove and return the cache entry corresponding to the 00109 provided key. 00110 00111 ***********************************************************************/ 00112 00113 IPayload extract (char[] key) 00114 in { 00115 assert (key.length); 00116 } 00117 body 00118 { 00119 return getCluster.getCache (getChannel, key, true); 00120 } 00121 00122 /*********************************************************************** 00123 00124 Add a cluster cache entry. The entry will be placed in 00125 one or more of the cluster servers (depending upon QOS). 00126 00127 ***********************************************************************/ 00128 00129 IPayload put (char[] key, IPayload payload) 00130 in { 00131 assert (key.length); 00132 assert (payload); 00133 } 00134 body 00135 { 00136 return getCluster.putCache (getChannel, key, payload); 00137 } 00138 00139 00140 /********************************************************************** 00141 00142 Bind a local loader to this network cache. The loader 00143 will check that a cache entry is valid, and load up a 00144 fresh instance where a stale one is found. Each stale 00145 entry with the equivalent timestamp is removed across 00146 the entire network, 00147 00148 **********************************************************************/ 00149 00150 ICache bind (ICacheLoader loader) 00151 { 00152 return new LocalLoader (this, loader); 00153 } 00154 00155 /********************************************************************** 00156 00157 Bind a remote cache loader. Remote loaders are great for 00158 gating/synchronizing access to a particular resource over 00159 the entire cluster. When a cache entry fails its validity 00160 test, the loader is executed remotely by the cache host; 00161 the cache entry itself is 'locked' for the duration, such 00162 that requests from any cluster node will stall until the 00163 new entry is loaded. This is a convenient way to restrict 00164 load on an expensive resource (such as a slow or very busy 00165 back-end server). 00166 00167 **********************************************************************/ 00168 00169 ICache bind (IRemoteCacheLoader loader) 00170 { 00171 return bind (new RemoteLoader (this, loader)); 00172 } 00173 00174 /********************************************************************** 00175 00176 00177 **********************************************************************/ 00178 00179 private class LocalLoader : ICache 00180 { 00181 NetworkCache cache; 00182 ICacheLoader loader; 00183 00184 /************************************************************** 00185 00186 **************************************************************/ 00187 00188 this (NetworkCache cache, ICacheLoader loader) 00189 { 00190 this.cache = cache; 00191 this.loader = loader; 00192 } 00193 00194 /************************************************************** 00195 00196 **************************************************************/ 00197 00198 IPayload get (char[] key) 00199 { 00200 long time = long.min; 00201 IPayload p = cache.get (key); 00202 00203 if (p) 00204 { 00205 if (loader.test (p)) 00206 return p; 00207 00208 // set the "newer than" time to the old entry 00209 time = p.getTime (); 00210 00211 // invalidate the entire network. This is done 00212 // so that subsequent lookups on a local cache 00213 // will check the cluster first, before trying 00214 // to load yet another new instance. 00215 cache.invalidate (key, time); 00216 } 00217 00218 p = loader.load (key, time); 00219 if (p) 00220 cache.put (key, p); 00221 return p; 00222 } 00223 } 00224 00225 /********************************************************************** 00226 00227 **********************************************************************/ 00228 00229 private class RemoteLoader : ICacheLoader 00230 { 00231 NetworkCache cache; 00232 IRemoteCacheLoader loader; 00233 00234 /************************************************************** 00235 00236 **************************************************************/ 00237 00238 this (NetworkCache cache, IRemoteCacheLoader loader) 00239 { 00240 this.cache = cache; 00241 this.loader = loader; 00242 } 00243 00244 /************************************************************** 00245 00246 **************************************************************/ 00247 00248 bool test (IPayload p) 00249 { 00250 return loader.test (p); 00251 } 00252 00253 /************************************************************** 00254 00255 **************************************************************/ 00256 00257 IPayload load (char[] key, long newerThan) 00258 { 00259 uint wait, 00260 pause; 00261 00262 // set the loader time to be that of the old cache 00263 // entry (zero if there was none). 00264 loader.setTime (newerThan); 00265 00266 // tell cache to load this entry remotely & atomically. 00267 // This may be ignored if someone else is loading the 00268 // entry already, or the current entry is new enough. 00269 cache.getCluster.loadCache (cache.getChannel(), key, loader); 00270 00271 // wait for it to appear ... 00272 do { 00273 // get current entry 00274 IPayload r = cache.get (key); 00275 00276 // ready to go home? We might pick up an older version 00277 if (r && loader.test (r)) 00278 return r; 00279 00280 pause = loader.pause (wait); 00281 System.sleep (pause); 00282 wait += pause; 00283 } while (pause); 00284 00285 return null; 00286 } 00287 } 00288 } 00289 00290 00291 /******************************************************************************* 00292 00293 A combination of a local cache, cluster cache, and CacheInvalidatee. 00294 The two cache instances are combined such that they represent a 00295 classic level1/level2 cache. The CacheInvalidatee ensures that the 00296 level1 cache maintains coherency with the cluster. 00297 00298 *******************************************************************************/ 00299 00300 class NetworkCombo : NetworkCache 00301 { 00302 private IMutableCache cache; 00303 private CacheInvalidatee invalidatee; 00304 00305 /*********************************************************************** 00306 00307 Construct a ComboCache for the specified local cache, and 00308 on the given cluster channel. 00309 00310 ***********************************************************************/ 00311 00312 this (ICluster cluster, char[] channel, IMutableCache cache) 00313 in { 00314 assert (cache); 00315 } 00316 body 00317 { 00318 super (cluster, channel); 00319 00320 this.cache = cache; 00321 invalidatee = new CacheInvalidatee (cluster, channel, cache); 00322 } 00323 00324 /*********************************************************************** 00325 00326 Return the IMutableCache instance provided during construction 00327 00328 ***********************************************************************/ 00329 00330 IMutableCache getCache () 00331 { 00332 return cache; 00333 } 00334 00335 /*********************************************************************** 00336 00337 Get an IPayload from the local cache, and revert to the 00338 cluster cache if it's not found. Cluster lookups will 00339 place new content into the local cache. 00340 00341 ***********************************************************************/ 00342 00343 IPayload get (char[] key) 00344 { 00345 IPayload payload; 00346 00347 if ((payload = cache.get (key)) is null) 00348 { 00349 payload = super.get (key); 00350 if (payload) 00351 cache.put (key, payload); 00352 } 00353 return payload; 00354 } 00355 00356 /*********************************************************************** 00357 00358 Place a new entry into the cache. This will also place 00359 the entry into the cluster, and optionally invalidate 00360 all other local cache instances across the network. If 00361 a cache entry exists with the same key, it is replaced. 00362 Note that when using the coherency option you should 00363 ensure your IPayload has a valid time stamp, since that 00364 is used to set the cluster-wide "invalidation level". 00365 You can use the getTime() method to retrieve the current 00366 millisecond count. 00367 00368 ***********************************************************************/ 00369 00370 void put (char[] key, IPayload payload, bool coherent = false) 00371 { 00372 // this will throw an exception if there's a problem 00373 super.put (key, payload); 00374 00375 // place into local cache also 00376 cache.put (key, payload); 00377 00378 // invalidate all other cache instances except this new one, 00379 // such that no other listening cache has the same key 00380 if (coherent) 00381 invalidate (key, payload.getTime); 00382 } 00383 00384 /*********************************************************************** 00385 00386 Remove and return the cache entry corresponding to the 00387 provided key. 00388 00389 ***********************************************************************/ 00390 00391 IPayload extract (char[] key) 00392 { 00393 cache.extract (key); 00394 return super.extract (key); 00395 } 00396 } 00397 00398 00399