00001 /******************************************************************************* 00002 00003 @file ClusterServer.d 00004 00005 Copyright (c) 2004 Kris Bell 00006 00007 This software is provided 'as-is', without any express or implied 00008 warranty. In no event will the authors be held liable for damages 00009 of any kind arising from the use of this software. 00010 00011 Permission is hereby granted to anyone to use this software for any 00012 purpose, including commercial applications, and to alter it and/or 00013 redistribute it freely, subject to the following restrictions: 00014 00015 1. The origin of this software must not be misrepresented; you must 00016 not claim that you wrote the original software. If you use this 00017 software in a product, an acknowledgment within documentation of 00018 said product would be appreciated but is not required. 00019 00020 2. Altered source versions must be plainly marked as such, and must 00021 not be misrepresented as being the original software. 00022 00023 3. This notice may not be removed or altered from any distribution 00024 of the source. 00025 00026 4. Derivative works are permitted, but they must carry this notice 00027 in full and credit the original source. 00028 00029 00030 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 00031 00032 00033 @version Initial version, July 2004 00034 @author Kris 00035 00036 00037 *******************************************************************************/ 00038 00039 module mango.cluster.qos.socket.ClusterServer; 00040 00041 version = NO_DEBUG; 00042 00043 private import mango.io.Buffer, 00044 mango.io.Exception, 00045 mango.io.ServerSocket, 00046 mango.io.SocketConduit, 00047 mango.io.PickleRegistry, 00048 mango.io.ArrayAllocator; 00049 00050 private import mango.io.model.IConduit; 00051 00052 private import mango.cache.model.ICache; 00053 00054 private import mango.utils.ServerThread, 00055 mango.utils.AbstractServer; 00056 00057 private import mango.cluster.qos.socket.Cluster, 00058 mango.cluster.qos.socket.RollCall, 00059 mango.cluster.qos.socket.ClusterCache, 00060 mango.cluster.qos.socket.ClusterThread, 00061 mango.cluster.qos.socket.ProtocolReader, 00062 mango.cluster.qos.socket.ProtocolWriter; 00063 00064 /****************************************************************************** 00065 00066 Extends the AbstractServer to glue cluster-server support together. 00067 Note that there may only be one server running for any given host 00068 name. This is to make it easier to manage the server(s) via one or 00069 more http clients. If you require more than one server per machine, 00070 virtual hosting will need to be provided. 00071 00072 ******************************************************************************/ 00073 00074 class ClusterServer : AbstractServer, IEventListener 00075 { 00076 private ClusterCache cache; 00077 private ClusterQueue queue; 00078 private Cluster cluster; 00079 private IChannel channel; 00080 private RollCall rollcall; 00081 private CacheServer taskServer; 00082 00083 /********************************************************************** 00084 00085 Construct this server with the requisite attributes. The 00086 'bind' address is the local address we'll be listening on, 00087 'threads' represents the number of socket-accept threads, 00088 and backlog is the number of "simultaneous" connection 00089 requests that a socket layer will buffer on our behalf. 00090 00091 We also set up a listener for client discovery-requests, 00092 and lastly, we tell active clients that we're available 00093 for work. Clients should be listening on the appropriate 00094 channel for an instance of the RollCall payload. 00095 00096 **********************************************************************/ 00097 00098 this (InternetAddress bind, int threads, ILogger logger = null) 00099 { 00100 super (bind, threads, 10, logger); 00101 logger = getLogger(); 00102 00103 // hook into the cluster 00104 cluster = new Cluster (logger); 00105 00106 // create an identity for ourselves 00107 rollcall = new RollCall (Socket.hostName(), bind.port(), 0); 00108 00109 // clients are listening on this channel ... 00110 channel = cluster.createChannel ("cluster.server.advertise"); 00111 00112 version (NO_DEBUG) 00113 { 00114 // ... and listen for subsequent server.advertise requests 00115 cluster.createConsumer (channel, IEvent.Style.Bulletin, this); 00116 00117 // construct a server for cache tasks 00118 InternetAddress addr = new InternetAddress (bind.toAddrString(), bind.port+1); 00119 taskServer = new CacheServer (this, addr, 20, logger); 00120 00121 // enroll cache tasks 00122 enroll (logger); 00123 } 00124 } 00125 00126 /********************************************************************** 00127 00128 **********************************************************************/ 00129 00130 void enroll (ILogger logger) 00131 { 00132 } 00133 00134 /********************************************************************** 00135 00136 **********************************************************************/ 00137 00138 void addCacheLoader (IPayload loader, bool enroll = false) 00139 { 00140 char[] name = loader.getGuid; 00141 00142 cluster.getLogger.info ("adding cache loader '" ~ name ~ "'"); 00143 00144 if (enroll) 00145 PickleRegistry.enroll (loader); 00146 } 00147 00148 /********************************************************************** 00149 00150 Start this server 00151 00152 **********************************************************************/ 00153 00154 void start () 00155 { 00156 // cache with 101 entries 00157 if (cache is null) 00158 cache = new ClusterCache (cluster, 101); 00159 00160 // queue with 64MB ceiling & maximum of 1 second dispatch 00161 if (queue is null) 00162 queue = new ClusterQueue (cluster, 64 * 1024 * 1024, 1_000_000); 00163 00164 super.start(); 00165 00166 version (NO_DEBUG) 00167 { 00168 taskServer.start(); 00169 } 00170 00171 // tell everyone about ourselves ... 00172 cluster.broadcast (channel, rollcall); 00173 } 00174 00175 /********************************************************************** 00176 00177 Use this before starting the server 00178 00179 **********************************************************************/ 00180 00181 void setCache (ClusterCache cache) 00182 { 00183 this.cache = cache; 00184 } 00185 00186 /********************************************************************** 00187 00188 **********************************************************************/ 00189 00190 ClusterCache getCache () 00191 { 00192 return cache; 00193 } 00194 00195 /********************************************************************** 00196 00197 Use this before starting the server 00198 00199 **********************************************************************/ 00200 00201 void setQueue (ClusterQueue queue) 00202 { 00203 this.queue = queue; 00204 } 00205 00206 /********************************************************************** 00207 00208 Return the protocol in use. 00209 00210 **********************************************************************/ 00211 00212 char[] getProtocol() 00213 { 00214 return "cluster"; 00215 } 00216 00217 /********************************************************************** 00218 00219 Interface method that's invoked when a client is making 00220 discovery requests. We just send back our identity in a 00221 reply. 00222 00223 **********************************************************************/ 00224 00225 void notify (IEvent event, IPayload payload) 00226 { 00227 RollCall input = cast(RollCall) payload; 00228 00229 // if this is a request, reply with our identity 00230 if (input.request) 00231 cluster.broadcast (channel, rollcall); 00232 } 00233 00234 /********************************************************************** 00235 00236 Return a text string identifying this server 00237 00238 **********************************************************************/ 00239 00240 override char[] toString() 00241 { 00242 return getProtocol() ~ "::Host"; 00243 } 00244 00245 /********************************************************************** 00246 00247 Create a ServerSocket instance. 00248 00249 **********************************************************************/ 00250 00251 override ServerSocket createSocket (InternetAddress bind, int backlog) 00252 { 00253 return new ServerSocket (bind, backlog); 00254 } 00255 00256 /********************************************************************** 00257 00258 Create a ServerThread instance. This can be overridden to 00259 create other thread-types, perhaps with additional thread- 00260 level data attached. 00261 00262 **********************************************************************/ 00263 00264 override ServerThread createThread (ServerSocket socket) 00265 { 00266 return new ServerThread (this, socket); 00267 } 00268 00269 /********************************************************************** 00270 00271 Factory method for servicing a request. We just create 00272 a new ClusterThread to handle requests from the client. 00273 The thread does not exit until the socket connection is 00274 broken by the client, or some other exception occurs. 00275 00276 **********************************************************************/ 00277 00278 override void service (ServerThread st, IConduit conduit) 00279 { 00280 ClusterThread thread = new ClusterThread (this, conduit, 00281 cluster, cache, queue); 00282 thread.start(); 00283 } 00284 } 00285 00286 00287 00288 /****************************************************************************** 00289 00290 ******************************************************************************/ 00291 00292 private class CacheServer : AbstractServer 00293 { 00294 private ClusterServer cs; 00295 00296 /********************************************************************** 00297 00298 **********************************************************************/ 00299 00300 this (ClusterServer cs, InternetAddress bind, int threads, ILogger logger = null) 00301 { 00302 super (bind, threads, 10, logger); 00303 this.cs = cs; 00304 } 00305 00306 /********************************************************************** 00307 00308 Return the protocol in use. 00309 00310 **********************************************************************/ 00311 00312 char[] getProtocol() 00313 { 00314 return "cluster"; 00315 } 00316 00317 /********************************************************************** 00318 00319 Return a text string identifying this server 00320 00321 **********************************************************************/ 00322 00323 override char[] toString() 00324 { 00325 return getProtocol() ~ "::Tasks"; 00326 } 00327 00328 /********************************************************************** 00329 00330 Create a ServerSocket instance. 00331 00332 **********************************************************************/ 00333 00334 override ServerSocket createSocket (InternetAddress bind, int backlog) 00335 { 00336 return new ServerSocket (bind, backlog); 00337 } 00338 00339 /********************************************************************** 00340 00341 Create a ServerThread instance. This can be overridden to 00342 create other thread-types, perhaps with additional thread- 00343 level data attached. 00344 00345 **********************************************************************/ 00346 00347 override ServerThread createThread (ServerSocket socket) 00348 { 00349 return new LoaderThread (this, socket, cs); 00350 } 00351 00352 /********************************************************************** 00353 00354 Factory method for servicing a request. 00355 00356 **********************************************************************/ 00357 00358 override void service (ServerThread st, IConduit conduit) 00359 { 00360 // we know what this is cos' we created it (above) 00361 LoaderThread tt = cast(LoaderThread) st; 00362 00363 // unpickle task and execute it 00364 tt.load (conduit); 00365 } 00366 00367 /********************************************************************** 00368 00369 00370 **********************************************************************/ 00371 00372 class LoaderThread : ServerThread 00373 { 00374 private ClusterCache cache; 00375 private IBuffer buffer; 00376 private ProtocolReader reader; 00377 private ProtocolWriter writer; 00378 private ILogger logger; 00379 00380 /************************************************************** 00381 00382 00383 **************************************************************/ 00384 00385 this (AbstractServer server, ServerSocket socket, ClusterServer cs) 00386 { 00387 super (server, socket); 00388 00389 // maximum of 8K for ITask instance 00390 // perhaps make this a GrowableBuffer? 00391 buffer = new Buffer (1024 * 8); 00392 00393 // hook protocol IO to the buffer 00394 writer = new ProtocolWriter (buffer); 00395 00396 // make the reader slice directly from the buffer content 00397 reader = new ProtocolReader (buffer); 00398 reader.setAllocator (new BufferAllocator); 00399 00400 // extract the ILogger instance 00401 logger = server.getLogger; 00402 00403 // extract the ClusterCache 00404 cache = cs.getCache (); 00405 } 00406 00407 /************************************************************** 00408 00409 00410 **************************************************************/ 00411 00412 void load (IConduit conduit) 00413 { 00414 ubyte cmd; 00415 IPayload entry; 00416 char[] element, 00417 channel; 00418 00419 // start afresh 00420 buffer.clear (); 00421 00422 // bind the buffer (and hence reader) to the conduit 00423 buffer.setConduit (conduit); 00424 00425 printf ("remote loader\n"); 00426 // instantiate the loader 00427 IPayload p = reader.getPayload (channel, element, cmd); 00428 long time = p.getTime (); 00429 00430 // check to see if it has already been updated or is 00431 // currently locked; go home if so, otherwise lock it 00432 if (! cache.lockWhereInvalid (channel, element, time)) 00433 writer.success().flush(); 00434 else 00435 try { 00436 // say what's going on 00437 if (logger.isEnabled (logger.Level.Trace)) 00438 logger.trace ("loading cache for channel '" ~ channel ~ 00439 "' via key '" ~ element ~ "'"); 00440 00441 // ensure this is the right object 00442 IRemoteCacheLoader loader = cast(IRemoteCacheLoader) p; 00443 if (loader) 00444 { 00445 // acknowledge the request. Do NOT wait for completion! 00446 writer.success ().flush(); 00447 00448 // get the new cache entry 00449 IPayload p = loader.load (element, time); 00450 00451 if (p) 00452 { 00453 // serialize new entry and stuff it into cache 00454 writer.put (writer.Command.OK, channel, p, element); 00455 cache.put (channel, element, reader.getPacket (channel, element, cmd)); 00456 } 00457 } 00458 else 00459 writer.exception ("invalid remote cache-loader").flush(); 00460 00461 } catch (Object o) 00462 writer.exception (o.toString).flush(); 00463 finally 00464 // ensure we unlock this one! 00465 cache.unlock (channel, element); 00466 } 00467 } 00468 } 00469