00001 /******************************************************************************* 00002 00003 @file ClusterServer.d 00004 00005 Copyright (C) 2004 Kris Bell 00006 00007 This software is provided 'as-is', without any express or implied 00008 warranty. In no event will the authors be held liable for damages 00009 of any kind arising from the use of this software. 00010 00011 Permission is hereby granted to anyone to use this software for any 00012 purpose, including commercial applications, and to alter it and/or 00013 redistribute it freely, subject to the following restrictions: 00014 00015 1. The origin of this software must not be misrepresented; you must 00016 not claim that you wrote the original software. If you use this 00017 software in a product, an acknowledgment within documentation of 00018 said product would be appreciated but is not required. 00019 00020 2. Altered source versions must be plainly marked as such, and must 00021 not be misrepresented as being the original software. 00022 00023 3. This notice may not be removed or altered from any distribution 00024 of the source. 00025 00026 00027 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 00028 00029 00030 @version Initial version, July 2004 00031 @author Kris 00032 00033 00034 *******************************************************************************/ 00035 00036 module mango.cluster.qos.socket.ClusterServer; 00037 00038 version = NO_DEBUG; 00039 00040 private import mango.io.Buffer, 00041 mango.io.Exception, 00042 mango.io.ServerSocket, 00043 mango.io.SocketConduit, 00044 mango.io.PickleRegistry, 00045 mango.io.ArrayAllocator; 00046 00047 private import mango.io.model.IConduit; 00048 00049 private import mango.cache.model.ICache; 00050 00051 private import mango.utils.ServerThread, 00052 mango.utils.AbstractServer; 00053 00054 private import mango.cluster.qos.socket.Cluster, 00055 mango.cluster.qos.socket.RollCall, 00056 mango.cluster.qos.socket.ClusterCache, 00057 mango.cluster.qos.socket.ClusterThread, 00058 mango.cluster.qos.socket.ProtocolReader, 00059 mango.cluster.qos.socket.ProtocolWriter; 00060 00061 /****************************************************************************** 00062 00063 Extends the AbstractServer to glue cluster-server support together. 00064 Note that there may only be one server running for any given host 00065 name. This is to make it easier to manage the server(s) via one or 00066 more http clients. If you require more than one server per machine, 00067 virtual hosting will need to be provided. 00068 00069 ******************************************************************************/ 00070 00071 class ClusterServer : AbstractServer, IEventListener 00072 { 00073 private ClusterCache cache; 00074 private ClusterQueue queue; 00075 private Cluster cluster; 00076 private IChannel channel; 00077 private RollCall rollcall; 00078 private CacheServer taskServer; 00079 00080 /********************************************************************** 00081 00082 Construct this server with the requisite attributes. The 00083 'bind' address is the local address we'll be listening on, 00084 'threads' represents the number of socket-accept threads, 00085 and backlog is the number of "simultaneous" connection 00086 requests that a socket layer will buffer on our behalf. 00087 00088 We also set up a listener for client discovery-requests, 00089 and lastly, we tell active clients that we're available 00090 for work. Clients should be listening on the appropriate 00091 channel for an instance of the RollCall payload. 00092 00093 **********************************************************************/ 00094 00095 this (InternetAddress bind, int threads, ILogger logger = null) 00096 { 00097 super (bind, threads, 10, logger); 00098 logger = getLogger(); 00099 00100 // hook into the cluster 00101 cluster = new Cluster (logger); 00102 00103 // create an identity for ourselves 00104 rollcall = new RollCall (Socket.hostName(), bind.port(), 0); 00105 00106 // clients are listening on this channel ... 00107 channel = cluster.createChannel ("cluster.server.advertise"); 00108 00109 version (NO_DEBUG) 00110 { 00111 // ... and listen for subsequent server.advertise requests 00112 cluster.createConsumer (channel, IEvent.Style.Bulletin, this); 00113 00114 // construct a server for cache tasks 00115 InternetAddress addr = new InternetAddress (bind.toAddrString(), bind.port+1); 00116 taskServer = new CacheServer (this, addr, 20, logger); 00117 00118 // enroll cache tasks 00119 enroll (logger); 00120 } 00121 } 00122 00123 /********************************************************************** 00124 00125 **********************************************************************/ 00126 00127 void enroll (ILogger logger) 00128 { 00129 } 00130 00131 /********************************************************************** 00132 00133 **********************************************************************/ 00134 00135 void addCacheLoader (IPayload loader, bool enroll = false) 00136 { 00137 char[] name = loader.getGuid; 00138 00139 cluster.getLogger.info ("adding cache loader '" ~ name ~ "'"); 00140 00141 if (enroll) 00142 PickleRegistry.enroll (loader); 00143 } 00144 00145 /********************************************************************** 00146 00147 Start this server 00148 00149 **********************************************************************/ 00150 00151 void start () 00152 { 00153 // cache with 101 entries 00154 if (cache is null) 00155 cache = new ClusterCache (cluster, 101); 00156 00157 // queue with 64MB ceiling & maximum of 1 second dispatch 00158 if (queue is null) 00159 queue = new ClusterQueue (cluster, 64 * 1024 * 1024, 1_000_000); 00160 00161 super.start(); 00162 00163 version (NO_DEBUG) 00164 { 00165 taskServer.start(); 00166 } 00167 00168 // tell everyone about ourselves ... 00169 cluster.broadcast (channel, rollcall); 00170 } 00171 00172 /********************************************************************** 00173 00174 Use this before starting the server 00175 00176 **********************************************************************/ 00177 00178 void setCache (ClusterCache cache) 00179 { 00180 this.cache = cache; 00181 } 00182 00183 /********************************************************************** 00184 00185 **********************************************************************/ 00186 00187 ClusterCache getCache () 00188 { 00189 return cache; 00190 } 00191 00192 /********************************************************************** 00193 00194 Use this before starting the server 00195 00196 **********************************************************************/ 00197 00198 void setQueue (ClusterQueue queue) 00199 { 00200 this.queue = queue; 00201 } 00202 00203 /********************************************************************** 00204 00205 Return the protocol in use. 00206 00207 **********************************************************************/ 00208 00209 char[] getProtocol() 00210 { 00211 return "cluster"; 00212 } 00213 00214 /********************************************************************** 00215 00216 Interface method that's invoked when a client is making 00217 discovery requests. We just send back our identity in a 00218 reply. 00219 00220 **********************************************************************/ 00221 00222 void notify (IEvent event, IPayload payload) 00223 { 00224 RollCall input = cast(RollCall) payload; 00225 00226 // if this is a request, reply with our identity 00227 if (input.request) 00228 cluster.broadcast (channel, rollcall); 00229 } 00230 00231 /********************************************************************** 00232 00233 Return a text string identifying this server 00234 00235 **********************************************************************/ 00236 00237 override char[] toString() 00238 { 00239 return getProtocol() ~ "::Host"; 00240 } 00241 00242 /********************************************************************** 00243 00244 Create a ServerSocket instance. 00245 00246 **********************************************************************/ 00247 00248 override ServerSocket createSocket (InternetAddress bind, int backlog) 00249 { 00250 return new ServerSocket (bind, backlog); 00251 } 00252 00253 /********************************************************************** 00254 00255 Create a ServerThread instance. This can be overridden to 00256 create other thread-types, perhaps with additional thread- 00257 level data attached. 00258 00259 **********************************************************************/ 00260 00261 override ServerThread createThread (ServerSocket socket) 00262 { 00263 return new ServerThread (this, socket); 00264 } 00265 00266 /********************************************************************** 00267 00268 Factory method for servicing a request. We just create 00269 a new ClusterThread to handle requests from the client. 00270 The thread does not exit until the socket connection is 00271 broken by the client, or some other exception occurs. 00272 00273 **********************************************************************/ 00274 00275 override void service (ServerThread st, IConduit conduit) 00276 { 00277 ClusterThread thread = new ClusterThread (this, conduit, 00278 cluster, cache, queue); 00279 thread.start(); 00280 } 00281 } 00282 00283 00284 00285 /****************************************************************************** 00286 00287 ******************************************************************************/ 00288 00289 private class CacheServer : AbstractServer 00290 { 00291 private ClusterServer cs; 00292 00293 /********************************************************************** 00294 00295 **********************************************************************/ 00296 00297 this (ClusterServer cs, InternetAddress bind, int threads, ILogger logger = null) 00298 { 00299 super (bind, threads, 10, logger); 00300 this.cs = cs; 00301 } 00302 00303 /********************************************************************** 00304 00305 Return the protocol in use. 00306 00307 **********************************************************************/ 00308 00309 char[] getProtocol() 00310 { 00311 return "cluster"; 00312 } 00313 00314 /********************************************************************** 00315 00316 Return a text string identifying this server 00317 00318 **********************************************************************/ 00319 00320 override char[] toString() 00321 { 00322 return getProtocol() ~ "::Tasks"; 00323 } 00324 00325 /********************************************************************** 00326 00327 Create a ServerSocket instance. 00328 00329 **********************************************************************/ 00330 00331 override ServerSocket createSocket (InternetAddress bind, int backlog) 00332 { 00333 return new ServerSocket (bind, backlog); 00334 } 00335 00336 /********************************************************************** 00337 00338 Create a ServerThread instance. This can be overridden to 00339 create other thread-types, perhaps with additional thread- 00340 level data attached. 00341 00342 **********************************************************************/ 00343 00344 override ServerThread createThread (ServerSocket socket) 00345 { 00346 return new LoaderThread (this, socket, cs); 00347 } 00348 00349 /********************************************************************** 00350 00351 Factory method for servicing a request. 00352 00353 **********************************************************************/ 00354 00355 override void service (ServerThread st, IConduit conduit) 00356 { 00357 // we know what this is cos' we created it (above) 00358 LoaderThread tt = cast(LoaderThread) st; 00359 00360 // unpickle task and execute it 00361 tt.load (conduit); 00362 } 00363 00364 /********************************************************************** 00365 00366 00367 **********************************************************************/ 00368 00369 class LoaderThread : ServerThread 00370 { 00371 private ClusterCache cache; 00372 private IBuffer buffer; 00373 private ProtocolReader reader; 00374 private ProtocolWriter writer; 00375 private ILogger logger; 00376 00377 /************************************************************** 00378 00379 00380 **************************************************************/ 00381 00382 this (AbstractServer server, ServerSocket socket, ClusterServer cs) 00383 { 00384 super (server, socket); 00385 00386 // maximum of 8K for ITask instance 00387 // perhaps make this a GrowableBuffer? 00388 buffer = new Buffer (1024 * 8); 00389 00390 // hook protocol IO to the buffer 00391 writer = new ProtocolWriter (buffer); 00392 00393 // make the reader slice directly from the buffer content 00394 reader = new ProtocolReader (buffer); 00395 reader.setAllocator (new BufferAllocator (reader)); 00396 00397 // extract the ILogger instance 00398 logger = server.getLogger; 00399 00400 // extract the ClusterCache 00401 cache = cs.getCache (); 00402 } 00403 00404 /************************************************************** 00405 00406 00407 **************************************************************/ 00408 00409 void load (IConduit conduit) 00410 { 00411 ubyte cmd; 00412 IPayload entry; 00413 char[] element, 00414 channel; 00415 00416 // start afresh 00417 buffer.clear (); 00418 00419 // bind the buffer (and hence reader) to the conduit 00420 buffer.setConduit (conduit); 00421 00422 printf ("remote loader\n"); 00423 // instantiate the loader 00424 IPayload p = reader.getPayload (channel, element, cmd); 00425 long time = p.getTime (); 00426 00427 // check to see if it has already been updated or is 00428 // currently locked; go home if so, otherwise lock it 00429 if (! cache.lockWhereInvalid (channel, element, time)) 00430 writer.success().flush(); 00431 else 00432 try { 00433 // say what's going on 00434 if (logger.isEnabled (logger.Level.Trace)) 00435 logger.trace ("loading cache for channel '" ~ channel ~ 00436 "' via key '" ~ element ~ "'"); 00437 00438 // ensure this is the right object 00439 IRemoteCacheLoader loader = cast(IRemoteCacheLoader) p; 00440 if (loader) 00441 { 00442 // acknowledge the request. Do NOT wait for completion! 00443 writer.success ().flush(); 00444 00445 // get the new cache entry 00446 IPayload p = loader.load (element, time); 00447 00448 if (p) 00449 { 00450 // serialize new entry and stuff it into cache 00451 writer.put (writer.Command.OK, channel, p, element); 00452 cache.put (channel, element, reader.getPacket (channel, element, cmd)); 00453 } 00454 } 00455 else 00456 writer.exception ("invalid remote cache-loader").flush(); 00457 00458 } catch (Object o) 00459 writer.exception (o.toString).flush(); 00460 finally 00461 // ensure we unlock this one! 00462 cache.unlock (channel, element); 00463 } 00464 } 00465 } 00466