Main Page | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | File Members | Related Pages

ClusterServer.d

Go to the documentation of this file.
00001 /*******************************************************************************
00002 
00003         @file ClusterServer.d
00004         
00005         Copyright (C) 2004 Kris Bell
00006         
00007         This software is provided 'as-is', without any express or implied
00008         warranty. In no event will the authors be held liable for damages
00009         of any kind arising from the use of this software.
00010         
00011         Permission is hereby granted to anyone to use this software for any 
00012         purpose, including commercial applications, and to alter it and/or 
00013         redistribute it freely, subject to the following restrictions:
00014         
00015         1. The origin of this software must not be misrepresented; you must 
00016            not claim that you wrote the original software. If you use this 
00017            software in a product, an acknowledgment within documentation of 
00018            said product would be appreciated but is not required.
00019 
00020         2. Altered source versions must be plainly marked as such, and must 
00021            not be misrepresented as being the original software.
00022 
00023         3. This notice may not be removed or altered from any distribution
00024            of the source.
00025 
00026 
00027                         ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
00028 
00029         
00030         @version        Initial version, July 2004      
00031         @author         Kris
00032 
00033 
00034 *******************************************************************************/
00035 
00036 module mango.cluster.qos.socket.ClusterServer;
00037 
00038 version = NO_DEBUG;
00039 
00040 private import  mango.io.Buffer,
00041                 mango.io.Exception,
00042                 mango.io.ServerSocket,
00043                 mango.io.SocketConduit,
00044                 mango.io.PickleRegistry,
00045                 mango.io.ArrayAllocator;
00046 
00047 private import  mango.io.model.IConduit;
00048 
00049 private import  mango.cache.model.ICache;
00050 
00051 private import  mango.utils.ServerThread,
00052                 mango.utils.AbstractServer;
00053 
00054 private import  mango.cluster.qos.socket.Cluster,
00055                 mango.cluster.qos.socket.RollCall,
00056                 mango.cluster.qos.socket.ClusterCache,
00057                 mango.cluster.qos.socket.ClusterThread,
00058                 mango.cluster.qos.socket.ProtocolReader,
00059                 mango.cluster.qos.socket.ProtocolWriter;
00060                 
00061 /******************************************************************************
00062         
00063         Extends the AbstractServer to glue cluster-server support together.
00064         Note that there may only be one server running for any given host
00065         name. This is to make it easier to manage the server(s) via one or
00066         more http clients. If you require more than one server per machine,
00067         virtual hosting will need to be provided.
00068 
00069 ******************************************************************************/
00070 
00071 class ClusterServer : AbstractServer, IEventListener
00072 {
00073         private ClusterCache    cache;
00074         private ClusterQueue    queue;
00075         private Cluster         cluster;
00076         private IChannel        channel;
00077         private RollCall        rollcall;
00078         private CacheServer     taskServer;
00079 
00080         /**********************************************************************
00081 
00082                 Construct this server with the requisite attributes. The 
00083                 'bind' address is the local address we'll be listening on, 
00084                 'threads' represents the number of socket-accept threads, 
00085                 and backlog is the number of "simultaneous" connection 
00086                 requests that a socket layer will buffer on our behalf.
00087 
00088                 We also set up a listener for client discovery-requests, 
00089                 and lastly, we tell active clients that we're available 
00090                 for work. Clients should be listening on the appropriate 
00091                 channel for an instance of the RollCall payload.
00092 
00093         **********************************************************************/
00094 
00095         this (InternetAddress bind, int threads, ILogger logger = null)
00096         {
00097                 super (bind, threads, 10, logger);
00098                 logger = getLogger();
00099 
00100                 // hook into the cluster
00101                 cluster = new Cluster (logger);
00102 
00103                 // create an identity for ourselves
00104                 rollcall = new RollCall (Socket.hostName(), bind.port(), 0);
00105 
00106                 // clients are listening on this channel ...
00107                 channel = cluster.createChannel ("cluster.server.advertise");
00108 
00109                 version (NO_DEBUG)
00110                 {
00111                 // ... and listen for subsequent server.advertise requests
00112                 cluster.createConsumer (channel, IEvent.Style.Bulletin, this);
00113 
00114                 // construct a server for cache tasks
00115                 InternetAddress addr = new InternetAddress (bind.toAddrString(), bind.port+1);
00116                 taskServer = new CacheServer (this, addr, 20, logger);
00117                 
00118                 // enroll cache tasks
00119                 enroll (logger);
00120                 }
00121         }
00122 
00123         /**********************************************************************
00124 
00125         **********************************************************************/
00126 
00127         void enroll (ILogger logger)
00128         {
00129         }
00130 
00131         /**********************************************************************
00132 
00133         **********************************************************************/
00134 
00135         void addCacheLoader (IPayload loader, bool enroll = false)
00136         {
00137                 char[] name = loader.getGuid;
00138 
00139                 cluster.getLogger.info ("adding cache loader '" ~ name ~ "'");
00140         
00141                 if (enroll)
00142                     PickleRegistry.enroll (loader);
00143         }
00144 
00145         /**********************************************************************
00146 
00147                 Start this server
00148 
00149         **********************************************************************/
00150 
00151         void start ()
00152         {
00153                 // cache with 101 entries
00154                 if (cache is null)
00155                     cache = new ClusterCache (cluster, 101);
00156 
00157                 // queue with 64MB ceiling & maximum of 1 second dispatch
00158                 if (queue is null)
00159                     queue = new ClusterQueue (cluster, 64 * 1024 * 1024, 1_000_000);
00160                 
00161                 super.start();
00162 
00163                 version (NO_DEBUG)
00164                 {
00165                 taskServer.start();
00166                 }
00167 
00168                 // tell everyone about ourselves ...
00169                 cluster.broadcast (channel, rollcall);
00170         }
00171 
00172         /**********************************************************************
00173 
00174                 Use this before starting the server
00175 
00176         **********************************************************************/
00177 
00178         void setCache (ClusterCache cache)
00179         {
00180                 this.cache = cache;
00181         }
00182 
00183         /**********************************************************************
00184 
00185         **********************************************************************/
00186 
00187         ClusterCache getCache ()
00188         {
00189                 return cache;
00190         }
00191 
00192         /**********************************************************************
00193 
00194                 Use this before starting the server
00195 
00196         **********************************************************************/
00197 
00198         void setQueue (ClusterQueue queue)
00199         {
00200                 this.queue = queue;
00201         }
00202 
00203         /**********************************************************************
00204 
00205                 Return the protocol in use.
00206 
00207         **********************************************************************/
00208 
00209         char[] getProtocol()
00210         {
00211                 return "cluster";
00212         }
00213 
00214         /**********************************************************************
00215 
00216                 Interface method that's invoked when a client is making
00217                 discovery requests. We just send back our identity in a
00218                 reply.
00219 
00220         **********************************************************************/
00221 
00222         void notify (IEvent event, IPayload payload)
00223         {
00224                 RollCall input = cast(RollCall) payload;
00225 
00226                 // if this is a request, reply with our identity
00227                 if (input.request)
00228                     cluster.broadcast (channel, rollcall);
00229         }
00230 
00231         /**********************************************************************
00232 
00233                 Return a text string identifying this server
00234 
00235         **********************************************************************/
00236 
00237         override char[] toString()
00238         {
00239                 return getProtocol() ~ "::Host";
00240         }
00241 
00242         /**********************************************************************
00243 
00244                 Create a ServerSocket instance. 
00245 
00246         **********************************************************************/
00247 
00248         override ServerSocket createSocket (InternetAddress bind, int backlog)
00249         {
00250                 return new ServerSocket (bind, backlog);
00251         }
00252 
00253         /**********************************************************************
00254 
00255                 Create a ServerThread instance. This can be overridden to 
00256                 create other thread-types, perhaps with additional thread-
00257                 level data attached.
00258 
00259         **********************************************************************/
00260 
00261         override ServerThread createThread (ServerSocket socket)
00262         {
00263                 return new ServerThread (this, socket);
00264         }
00265 
00266         /**********************************************************************
00267 
00268                 Factory method for servicing a request. We just create
00269                 a new ClusterThread to handle requests from the client.
00270                 The thread does not exit until the socket connection is
00271                 broken by the client, or some other exception occurs. 
00272 
00273         **********************************************************************/
00274 
00275         override void service (ServerThread st, IConduit conduit)
00276         {
00277                 ClusterThread thread = new ClusterThread (this, conduit, 
00278                                                           cluster, cache, queue);
00279                 thread.start();
00280         }
00281 }
00282 
00283 
00284 
00285 /******************************************************************************
00286         
00287 ******************************************************************************/
00288 
00289 private class CacheServer : AbstractServer
00290 {
00291         private ClusterServer cs;
00292 
00293         /**********************************************************************
00294 
00295         **********************************************************************/
00296 
00297         this (ClusterServer cs, InternetAddress bind, int threads, ILogger logger = null)
00298         {
00299                 super (bind, threads, 10, logger);
00300                 this.cs = cs;
00301         }
00302 
00303         /**********************************************************************
00304 
00305                 Return the protocol in use.
00306 
00307         **********************************************************************/
00308 
00309         char[] getProtocol()
00310         {
00311                 return "cluster";
00312         }
00313 
00314         /**********************************************************************
00315 
00316                 Return a text string identifying this server
00317 
00318         **********************************************************************/
00319 
00320         override char[] toString()
00321         {
00322                 return getProtocol() ~ "::Tasks";
00323         }
00324 
00325         /**********************************************************************
00326 
00327                 Create a ServerSocket instance. 
00328 
00329         **********************************************************************/
00330 
00331         override ServerSocket createSocket (InternetAddress bind, int backlog)
00332         {
00333                 return new ServerSocket (bind, backlog);
00334         }
00335 
00336         /**********************************************************************
00337 
00338                 Create a ServerThread instance. This can be overridden to 
00339                 create other thread-types, perhaps with additional thread-
00340                 level data attached.
00341 
00342         **********************************************************************/
00343 
00344         override ServerThread createThread (ServerSocket socket)
00345         {
00346                 return new LoaderThread (this, socket, cs);
00347         }
00348 
00349         /**********************************************************************
00350 
00351                 Factory method for servicing a request.  
00352 
00353         **********************************************************************/
00354 
00355         override void service (ServerThread st, IConduit conduit)
00356         {
00357                 // we know what this is cos' we created it (above)
00358                 LoaderThread tt = cast(LoaderThread) st;
00359         
00360                 // unpickle task and execute it
00361                 tt.load (conduit);
00362         }
00363 
00364         /**********************************************************************
00365 
00366 
00367         **********************************************************************/
00368 
00369         class LoaderThread : ServerThread
00370         {
00371                 private ClusterCache    cache;
00372                 private IBuffer         buffer;
00373                 private ProtocolReader  reader;
00374                 private ProtocolWriter  writer;
00375                 private ILogger         logger;
00376 
00377                 /**************************************************************
00378 
00379 
00380                 **************************************************************/
00381 
00382                 this (AbstractServer server, ServerSocket socket, ClusterServer cs)
00383                 {
00384                         super (server, socket);
00385 
00386                         // maximum of 8K for ITask instance 
00387                         // perhaps make this a GrowableBuffer?
00388                         buffer = new Buffer (1024 * 8);
00389 
00390                         // hook protocol IO to the buffer
00391                         writer = new ProtocolWriter (buffer);
00392 
00393                         // make the reader slice directly from the buffer content
00394                         reader = new ProtocolReader (buffer);
00395                         reader.setAllocator (new BufferAllocator (reader));
00396                         
00397                         // extract the ILogger instance
00398                         logger = server.getLogger;
00399 
00400                         // extract the ClusterCache
00401                         cache = cs.getCache ();
00402                 }
00403 
00404                 /**************************************************************
00405 
00406 
00407                 **************************************************************/
00408 
00409                 void load (IConduit conduit)
00410                 {
00411                         ubyte           cmd;
00412                         IPayload        entry;
00413                         char[]          element,
00414                                         channel;
00415 
00416                         // start afresh
00417                         buffer.clear ();
00418 
00419                         // bind the buffer (and hence reader) to the conduit
00420                         buffer.setConduit (conduit);
00421 
00422                         printf ("remote loader\n");
00423                         // instantiate the loader
00424                         IPayload p = reader.getPayload (channel, element, cmd);
00425                         long time = p.getTime ();
00426 
00427                         // check to see if it has already been updated or is
00428                         // currently locked; go home if so, otherwise lock it
00429                         if (! cache.lockWhereInvalid (channel, element, time))
00430                               writer.success().flush();
00431                         else
00432                            try {                                                
00433                                // say what's going on
00434                                if (logger.isEnabled (logger.Level.Trace))
00435                                    logger.trace ("loading cache for channel '" ~ channel ~
00436                                                  "' via key '" ~ element ~ "'");
00437 
00438                                // ensure this is the right object
00439                                IRemoteCacheLoader loader = cast(IRemoteCacheLoader) p;
00440                                if (loader)
00441                                   {
00442                                   // acknowledge the request. Do NOT wait for completion!
00443                                   writer.success ().flush();
00444 
00445                                   // get the new cache entry
00446                                   IPayload p = loader.load (element, time);
00447 
00448                                   if (p)
00449                                      {
00450                                      // serialize new entry and stuff it into cache
00451                                      writer.put (writer.Command.OK, channel, p, element);
00452                                      cache.put  (channel, element, reader.getPacket (channel, element, cmd));
00453                                      }
00454                                   }
00455                                else
00456                                   writer.exception ("invalid remote cache-loader").flush();
00457 
00458                                } catch (Object o)
00459                                         writer.exception (o.toString).flush();
00460                                  finally 
00461                                        // ensure we unlock this one!
00462                                        cache.unlock (channel, element);
00463                 }
00464         }
00465 }
00466 

Generated on Sun Nov 7 19:06:50 2004 for Mango by doxygen 1.3.6