Main Page | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Class Members | File Members | Related Pages

ClusterServer.d

Go to the documentation of this file.
00001 /*******************************************************************************
00002 
00003         @file ClusterServer.d
00004         
00005         Copyright (c) 2004 Kris Bell
00006         
00007         This software is provided 'as-is', without any express or implied
00008         warranty. In no event will the authors be held liable for damages
00009         of any kind arising from the use of this software.
00010         
00011         Permission is hereby granted to anyone to use this software for any 
00012         purpose, including commercial applications, and to alter it and/or 
00013         redistribute it freely, subject to the following restrictions:
00014         
00015         1. The origin of this software must not be misrepresented; you must 
00016            not claim that you wrote the original software. If you use this 
00017            software in a product, an acknowledgment within documentation of 
00018            said product would be appreciated but is not required.
00019 
00020         2. Altered source versions must be plainly marked as such, and must 
00021            not be misrepresented as being the original software.
00022 
00023         3. This notice may not be removed or altered from any distribution
00024            of the source.
00025 
00026         4. Derivative works are permitted, but they must carry this notice
00027            in full and credit the original source.
00028 
00029 
00030                         ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
00031 
00032         
00033         @version        Initial version, July 2004      
00034         @author         Kris
00035 
00036 
00037 *******************************************************************************/
00038 
00039 module mango.cluster.qos.socket.ClusterServer;
00040 
00041 version = NO_DEBUG;
00042 
00043 private import  mango.io.Buffer,
00044                 mango.io.Exception,
00045                 mango.io.ServerSocket,
00046                 mango.io.SocketConduit,
00047                 mango.io.PickleRegistry,
00048                 mango.io.ArrayAllocator;
00049 
00050 private import  mango.io.model.IConduit;
00051 
00052 private import  mango.cache.model.ICache;
00053 
00054 private import  mango.utils.ServerThread,
00055                 mango.utils.AbstractServer;
00056 
00057 private import  mango.cluster.qos.socket.Cluster,
00058                 mango.cluster.qos.socket.RollCall,
00059                 mango.cluster.qos.socket.ClusterCache,
00060                 mango.cluster.qos.socket.ClusterThread,
00061                 mango.cluster.qos.socket.ProtocolReader,
00062                 mango.cluster.qos.socket.ProtocolWriter;
00063                 
00064 /******************************************************************************
00065         
00066         Extends the AbstractServer to glue cluster-server support together.
00067         Note that there may only be one server running for any given host
00068         name. This is to make it easier to manage the server(s) via one or
00069         more http clients. If you require more than one server per machine,
00070         virtual hosting will need to be provided.
00071 
00072 ******************************************************************************/
00073 
00074 class ClusterServer : AbstractServer, IEventListener
00075 {
00076         private ClusterCache    cache;
00077         private ClusterQueue    queue;
00078         private Cluster         cluster;
00079         private IChannel        channel;
00080         private RollCall        rollcall;
00081         private CacheServer     taskServer;
00082 
00083         /**********************************************************************
00084 
00085                 Construct this server with the requisite attributes. The 
00086                 'bind' address is the local address we'll be listening on, 
00087                 'threads' represents the number of socket-accept threads, 
00088                 and backlog is the number of "simultaneous" connection 
00089                 requests that a socket layer will buffer on our behalf.
00090 
00091                 We also set up a listener for client discovery-requests, 
00092                 and lastly, we tell active clients that we're available 
00093                 for work. Clients should be listening on the appropriate 
00094                 channel for an instance of the RollCall payload.
00095 
00096         **********************************************************************/
00097 
00098         this (InternetAddress bind, int threads, ILogger logger = null)
00099         {
00100                 super (bind, threads, 10, logger);
00101                 logger = getLogger();
00102 
00103                 // hook into the cluster
00104                 cluster = new Cluster (logger);
00105 
00106                 // create an identity for ourselves
00107                 rollcall = new RollCall (Socket.hostName(), bind.port(), 0);
00108 
00109                 // clients are listening on this channel ...
00110                 channel = cluster.createChannel ("cluster.server.advertise");
00111 
00112                 version (NO_DEBUG)
00113                 {
00114                 // ... and listen for subsequent server.advertise requests
00115                 cluster.createConsumer (channel, IEvent.Style.Bulletin, this);
00116 
00117                 // construct a server for cache tasks
00118                 InternetAddress addr = new InternetAddress (bind.toAddrString(), bind.port+1);
00119                 taskServer = new CacheServer (this, addr, 20, logger);
00120                 
00121                 // enroll cache tasks
00122                 enroll (logger);
00123                 }
00124         }
00125 
00126         /**********************************************************************
00127 
00128         **********************************************************************/
00129 
00130         void enroll (ILogger logger)
00131         {
00132         }
00133 
00134         /**********************************************************************
00135 
00136         **********************************************************************/
00137 
00138         void addCacheLoader (IPayload loader, bool enroll = false)
00139         {
00140                 char[] name = loader.getGuid;
00141 
00142                 cluster.getLogger.info ("adding cache loader '" ~ name ~ "'");
00143         
00144                 if (enroll)
00145                     PickleRegistry.enroll (loader);
00146         }
00147 
00148         /**********************************************************************
00149 
00150                 Start this server
00151 
00152         **********************************************************************/
00153 
00154         void start ()
00155         {
00156                 // cache with 101 entries
00157                 if (cache is null)
00158                     cache = new ClusterCache (cluster, 101);
00159 
00160                 // queue with 64MB ceiling & maximum of 1 second dispatch
00161                 if (queue is null)
00162                     queue = new ClusterQueue (cluster, 64 * 1024 * 1024, 1_000_000);
00163                 
00164                 super.start();
00165 
00166                 version (NO_DEBUG)
00167                 {
00168                 taskServer.start();
00169                 }
00170 
00171                 // tell everyone about ourselves ...
00172                 cluster.broadcast (channel, rollcall);
00173         }
00174 
00175         /**********************************************************************
00176 
00177                 Use this before starting the server
00178 
00179         **********************************************************************/
00180 
00181         void setCache (ClusterCache cache)
00182         {
00183                 this.cache = cache;
00184         }
00185 
00186         /**********************************************************************
00187 
00188         **********************************************************************/
00189 
00190         ClusterCache getCache ()
00191         {
00192                 return cache;
00193         }
00194 
00195         /**********************************************************************
00196 
00197                 Use this before starting the server
00198 
00199         **********************************************************************/
00200 
00201         void setQueue (ClusterQueue queue)
00202         {
00203                 this.queue = queue;
00204         }
00205 
00206         /**********************************************************************
00207 
00208                 Return the protocol in use.
00209 
00210         **********************************************************************/
00211 
00212         char[] getProtocol()
00213         {
00214                 return "cluster";
00215         }
00216 
00217         /**********************************************************************
00218 
00219                 Interface method that's invoked when a client is making
00220                 discovery requests. We just send back our identity in a
00221                 reply.
00222 
00223         **********************************************************************/
00224 
00225         void notify (IEvent event, IPayload payload)
00226         {
00227                 RollCall input = cast(RollCall) payload;
00228 
00229                 // if this is a request, reply with our identity
00230                 if (input.request)
00231                     cluster.broadcast (channel, rollcall);
00232         }
00233 
00234         /**********************************************************************
00235 
00236                 Return a text string identifying this server
00237 
00238         **********************************************************************/
00239 
00240         override char[] toString()
00241         {
00242                 return getProtocol() ~ "::Host";
00243         }
00244 
00245         /**********************************************************************
00246 
00247                 Create a ServerSocket instance. 
00248 
00249         **********************************************************************/
00250 
00251         override ServerSocket createSocket (InternetAddress bind, int backlog)
00252         {
00253                 return new ServerSocket (bind, backlog);
00254         }
00255 
00256         /**********************************************************************
00257 
00258                 Create a ServerThread instance. This can be overridden to 
00259                 create other thread-types, perhaps with additional thread-
00260                 level data attached.
00261 
00262         **********************************************************************/
00263 
00264         override ServerThread createThread (ServerSocket socket)
00265         {
00266                 return new ServerThread (this, socket);
00267         }
00268 
00269         /**********************************************************************
00270 
00271                 Factory method for servicing a request. We just create
00272                 a new ClusterThread to handle requests from the client.
00273                 The thread does not exit until the socket connection is
00274                 broken by the client, or some other exception occurs. 
00275 
00276         **********************************************************************/
00277 
00278         override void service (ServerThread st, IConduit conduit)
00279         {
00280                 ClusterThread thread = new ClusterThread (this, conduit, 
00281                                                           cluster, cache, queue);
00282                 thread.start();
00283         }
00284 }
00285 
00286 
00287 
00288 /******************************************************************************
00289         
00290 ******************************************************************************/
00291 
00292 private class CacheServer : AbstractServer
00293 {
00294         private ClusterServer cs;
00295 
00296         /**********************************************************************
00297 
00298         **********************************************************************/
00299 
00300         this (ClusterServer cs, InternetAddress bind, int threads, ILogger logger = null)
00301         {
00302                 super (bind, threads, 10, logger);
00303                 this.cs = cs;
00304         }
00305 
00306         /**********************************************************************
00307 
00308                 Return the protocol in use.
00309 
00310         **********************************************************************/
00311 
00312         char[] getProtocol()
00313         {
00314                 return "cluster";
00315         }
00316 
00317         /**********************************************************************
00318 
00319                 Return a text string identifying this server
00320 
00321         **********************************************************************/
00322 
00323         override char[] toString()
00324         {
00325                 return getProtocol() ~ "::Tasks";
00326         }
00327 
00328         /**********************************************************************
00329 
00330                 Create a ServerSocket instance. 
00331 
00332         **********************************************************************/
00333 
00334         override ServerSocket createSocket (InternetAddress bind, int backlog)
00335         {
00336                 return new ServerSocket (bind, backlog);
00337         }
00338 
00339         /**********************************************************************
00340 
00341                 Create a ServerThread instance. This can be overridden to 
00342                 create other thread-types, perhaps with additional thread-
00343                 level data attached.
00344 
00345         **********************************************************************/
00346 
00347         override ServerThread createThread (ServerSocket socket)
00348         {
00349                 return new LoaderThread (this, socket, cs);
00350         }
00351 
00352         /**********************************************************************
00353 
00354                 Factory method for servicing a request.  
00355 
00356         **********************************************************************/
00357 
00358         override void service (ServerThread st, IConduit conduit)
00359         {
00360                 // we know what this is cos' we created it (above)
00361                 LoaderThread tt = cast(LoaderThread) st;
00362         
00363                 // unpickle task and execute it
00364                 tt.load (conduit);
00365         }
00366 
00367         /**********************************************************************
00368 
00369 
00370         **********************************************************************/
00371 
00372         class LoaderThread : ServerThread
00373         {
00374                 private ClusterCache    cache;
00375                 private IBuffer         buffer;
00376                 private ProtocolReader  reader;
00377                 private ProtocolWriter  writer;
00378                 private ILogger         logger;
00379 
00380                 /**************************************************************
00381 
00382 
00383                 **************************************************************/
00384 
00385                 this (AbstractServer server, ServerSocket socket, ClusterServer cs)
00386                 {
00387                         super (server, socket);
00388 
00389                         // maximum of 8K for ITask instance 
00390                         // perhaps make this a GrowableBuffer?
00391                         buffer = new Buffer (1024 * 8);
00392 
00393                         // hook protocol IO to the buffer
00394                         writer = new ProtocolWriter (buffer);
00395 
00396                         // make the reader slice directly from the buffer content
00397                         reader = new ProtocolReader (buffer);
00398                         reader.setAllocator (new BufferAllocator);
00399                         
00400                         // extract the ILogger instance
00401                         logger = server.getLogger;
00402 
00403                         // extract the ClusterCache
00404                         cache = cs.getCache ();
00405                 }
00406 
00407                 /**************************************************************
00408 
00409 
00410                 **************************************************************/
00411 
00412                 void load (IConduit conduit)
00413                 {
00414                         ubyte           cmd;
00415                         IPayload        entry;
00416                         char[]          element,
00417                                         channel;
00418 
00419                         // start afresh
00420                         buffer.clear ();
00421 
00422                         // bind the buffer (and hence reader) to the conduit
00423                         buffer.setConduit (conduit);
00424 
00425                         //printf ("remote loader\n");
00426                         // instantiate the loader
00427                         IPayload p = reader.getPayload (channel, element, cmd);
00428                         ulong time = p.getTime ();
00429 
00430                         // check to see if it has already been updated or is
00431                         // currently locked; go home if so, otherwise lock it
00432                         if (! cache.lockWhereInvalid (channel, element, time))
00433                               writer.success().flush();
00434                         else
00435                            try {                                                
00436                                // say what's going on
00437                                if (logger.isEnabled (logger.Level.Trace))
00438                                    logger.trace ("loading cache for channel '" ~ channel ~
00439                                                  "' via key '" ~ element ~ "'");
00440 
00441                                // ensure this is the right object
00442                                IRemoteCacheLoader loader = cast(IRemoteCacheLoader) p;
00443                                if (loader)
00444                                   {
00445                                   // acknowledge the request. Do NOT wait for completion!
00446                                   writer.success ().flush();
00447 
00448                                   // get the new cache entry
00449                                   IPayload p = loader.load (element, time);
00450 
00451                                   if (p)
00452                                      {
00453                                      // serialize new entry and stuff it into cache
00454                                      writer.put (writer.Command.OK, channel, p, element);
00455                                      cache.put  (channel, element, reader.getPacket (channel, element, cmd));
00456                                      }
00457                                   }
00458                                else
00459                                   writer.exception ("invalid remote cache-loader").flush();
00460 
00461                                } catch (Object o)
00462                                         writer.exception (o.toString).flush();
00463                                  finally 
00464                                        // ensure we unlock this one!
00465                                        cache.unlock (channel, element);
00466                 }
00467         }
00468 }
00469 

Generated on Sat Dec 24 17:28:32 2005 for Mango by  doxygen 1.4.0