Main Page | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | File Members | Related Pages

Cluster.d

Go to the documentation of this file.
00001 /*******************************************************************************
00002 
00003         @file Cluster.d
00004         
00005         Copyright (c) 2004 Kris Bell
00006         
00007         This software is provided 'as-is', without any express or implied
00008         warranty. In no event will the authors be held liable for damages
00009         of any kind arising from the use of this software.
00010         
00011         Permission is hereby granted to anyone to use this software for any 
00012         purpose, including commercial applications, and to alter it and/or 
00013         redistribute it freely, subject to the following restrictions:
00014         
00015         1. The origin of this software must not be misrepresented; you must 
00016            not claim that you wrote the original software. If you use this 
00017            software in a product, an acknowledgment within documentation of 
00018            said product would be appreciated but is not required.
00019 
00020         2. Altered source versions must be plainly marked as such, and must 
00021            not be misrepresented as being the original software.
00022 
00023         3. This notice may not be removed or altered from any distribution
00024            of the source.
00025 
00026         4. Derivative works are permitted, but they must carry this notice
00027            in full and credit the original source.
00028 
00029 
00030                         ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
00031 
00032 
00033         @version        Initial version, July 2004      
00034         @author         Kris
00035 
00036 
00037 *******************************************************************************/
00038 
00039 module mango.cluster.qos.socket.Cluster;
00040 
00041 private import  mango.cache.HashMap;
00042 
00043 private import  mango.utils.Text,
00044                 mango.utils.Random;
00045 
00046 private import  mango.base.System;
00047 
00048 private import  mango.io.Buffer,
00049                 mango.io.Socket,
00050                 mango.io.Exception,
00051                 mango.io.Properties,
00052                 mango.io.ArrayAllocator,
00053                 mango.io.SocketConduit,
00054                 mango.io.SocketListener,
00055                 mango.io.MulticastSocket;
00056 
00057 private import  mango.io.model.IConduit;
00058 
00059 private import  mango.log.model.ILogger;
00060 
00061 private import  mango.cluster.Client;
00062 
00063 public  import  mango.cluster.model.ICluster;
00064 
00065 private import  mango.cluster.qos.socket.RollCall,
00066                 mango.cluster.qos.socket.ClusterEvent,
00067                 mango.cluster.qos.socket.ProtocolReader,
00068                 mango.cluster.qos.socket.ProtocolWriter;
00069 
00070 
00071 /*******************************************************************************
00072         
00073         QOS implementation for sockets. All cluster-client activity is 
00074         gated through here by the higher level classes; NetworkQueue &
00075         NetworkCache for example. You gain access to the cluster by 
00076         creating an instance of the QOS (quality of service) you desire
00077         and mapping client classes onto it. For example:
00078 
00079         @code
00080         import mango.cluster.NetworkCache;
00081         import mango.cluster.qos.socket.Cluster;
00082 
00083         ICluster cluster = new Cluster (...);
00084         NetworkCache cache = new NetworkCache (cluster, ...);
00085 
00086         cache.put (...);
00087         cache.get (...);
00088         cache.invalidate (...);
00089         @endcode
00090 
00091         Please see the cluster clients for additional details. Currently
00092         these include CacheInvalidator, CacheInvalidatee, NetworkMessage, 
00093         NetworkTask, NetworkQueue, NetworkCache, NetworkCombo, plus the 
00094         Client base-class.
00095 
00096 *******************************************************************************/
00097 
00098 class Cluster : ICluster, IEventListener
00099 {  
00100         private static HashMap                  groups;
00101         private ILogger                         logger;
00102         private NodeSet                         nodeSet;
00103         private Buffer                          mBuffer;
00104         private ProtocolWriter                  mWriter;
00105         private MulticastSocket                 mSocket;
00106 
00107         private ushort                          groupTTL = 1;
00108         private ushort                          groupPort = 3333;
00109         private ubyte                           groupPrefix = 225;
00110      
00111         /***********************************************************************
00112 
00113                 Setup a hashmap for multicast group addresses
00114 
00115         ***********************************************************************/
00116         
00117         static this ()
00118         {
00119                 groups = new HashMap (128, 0.75, 2);
00120         }
00121 
00122         /***********************************************************************
00123 
00124                 Setup a Cluster instance. Currently the buffer & writer
00125                 are shared for all bulletin serialization; this should
00126                 probably change at some point such that we can support 
00127                 multiple threads broadcasting concurrently to different 
00128                 output ports.
00129 
00130         ***********************************************************************/
00131         
00132         this (ILogger logger = null)
00133         {
00134                 this.logger = logger;
00135                 nodeSet = new NodeSet (logger);
00136                 mBuffer = new Buffer (1024 * 4);
00137                 mSocket = new MulticastSocket;
00138                 mWriter = new ProtocolWriter (mBuffer);
00139         }
00140 
00141         /***********************************************************************
00142 
00143                 Setup a Cluster instance. Currently the buffer & writer
00144                 are shared for all bulletin serialization; this should
00145                 probably change at some point such that we can support 
00146                 multiple threads broadcasting concurrently to different 
00147                 output ports.
00148 
00149         ***********************************************************************/
00150         
00151         this (ILogger logger, IConduit conduit)
00152         in {
00153            assert (logger);
00154            assert (conduit);
00155            }
00156         body
00157         {
00158                 this (logger);
00159 
00160                 // callback for loading cluster configuration 
00161                 void loader (char[] name, char[] value)
00162                 {
00163                         logger.info ("cluster config: "~name~" = "~value);
00164                         if (name == "node")
00165                             nodeSet.addNode (new Node (logger, value));
00166                         else
00167                         if (name == "multicast_port")
00168                             groupPort = Text.atoi (value);
00169                         else
00170                            if (name == "multicast_prefix")
00171                                groupPrefix = Text.atoi (value);
00172                         else
00173                            if (name == "multicast_ttl")
00174                                groupTTL = Text.atoi (value);
00175                         else
00176                            throw new ClusterException ("Unrecognized attribute '"~name~"' in socket.Cluster configuration");
00177                 }
00178 
00179 
00180                 // load up the cluster configuration
00181                 Properties.load (conduit, &loader);
00182 
00183                 // finalize nodeSet 
00184                 nodeSet.optimize ();
00185 
00186                 // listen for cluster servers
00187                 IChannel channel = createChannel ("cluster.server.advertise");
00188                 createConsumer (channel, IEvent.Style.Bulletin, this);
00189 
00190                 // ask who's currently running
00191                 logger.trace ("discovering active cluster servers ...");
00192                 broadcast (channel, new RollCall (Socket.hostName(), 0, 0, true));
00193 
00194                 // wait for enabled servers to respond ...
00195                 System.sleep (System.Interval.Millisec * 250);
00196         }
00197 
00198         /***********************************************************************
00199 
00200                 Setup a Cluster instance. Currently the buffer & writer
00201                 are shared for all bulletin serialization; this should
00202                 probably change at some point such that we can support 
00203                 multiple threads broadcasting concurrently to different 
00204                 output ports.
00205 
00206         ***********************************************************************/
00207         
00208         this (ILogger logger, uint serverPort)
00209         in {
00210            assert (logger);
00211            assert (serverPort > 1024);
00212            }
00213         body
00214         {
00215                 this (logger);
00216 
00217                 Node node = new Node (logger, "local");
00218                 node.setCache (new InternetAddress ("localhost", serverPort));
00219                 node.setTasks (new InternetAddress ("localhost", serverPort+1));
00220                 node.setEnabled (true);
00221                 nodeSet.addNode (node);
00222                 nodeSet.optimize ();
00223         }
00224 
00225         /***********************************************************************
00226 
00227                 IEventListener interface method for listening to RollCall
00228                 responses. These are sent out by cluster servers both when 
00229                 they get a RollCall request, and when they begin execution.
00230 
00231         ***********************************************************************/
00232 
00233         void notify (IEvent event, IPayload payload)
00234         {
00235                 RollCall rollcall = cast(RollCall) payload;
00236 
00237                 // ignore requests from clients (we're on a common channel)
00238                 if (! rollcall.request)
00239                       nodeSet.enable (rollcall.name, rollcall.port1, rollcall.port2);
00240         }
00241 
00242         /***********************************************************************
00243 
00244                 Create a channel instance. Our channel implementation 
00245                 includes a number of cached IO helpers (ProtolcolWriter
00246                 and so on) which simplifies and speeds up execution.
00247 
00248         ***********************************************************************/
00249         
00250         IChannel createChannel (char[] channel)
00251         {
00252                 return new Channel (channel);
00253         }
00254 
00255         /***********************************************************************
00256 
00257                 Return the logger instance provided during construction.
00258                 
00259         ***********************************************************************/
00260         
00261         ILogger getLogger ()
00262         {
00263                 return logger;
00264         }
00265 
00266         /***********************************************************************
00267 
00268                 Broadcast a payload on the specified channel. This uses
00269                 IP/Multicast to scatter the payload to all registered
00270                 listeners (on the same multicast group). Note that the
00271                 maximum payload size is limited to that of an Ethernet 
00272                 data frame, minus the IP/UDP header size (1472 bytes).
00273 
00274         ***********************************************************************/
00275         
00276         synchronized void broadcast (IChannel channel, IPayload payload = null)
00277         {
00278                 // serialize content
00279                 mBuffer.clear ();
00280                 mWriter.put (ProtocolWriter.Command.OK, channel.getName, payload);
00281 
00282                 // Ethernet data-frame size minus the 28 byte UDP/IP header:
00283                 if (mBuffer.getPosition > 1472)
00284                     throw new ClusterException ("payload is too large to broadcast");
00285 
00286                 // send it to the appropriate multicast group
00287                 mSocket.write (mBuffer, getGroup (channel.getName));
00288         }
00289 
00290         /***********************************************************************
00291 
00292                 Create a listener of the specified type. Listeners are 
00293                 run within their own thread, since they spend the vast 
00294                 majority of their time blocked on a Socket read. Would
00295                 be good to support multiplexed reading instead, such 
00296                 that a thread pool could be applied instead.
00297                  
00298         ***********************************************************************/
00299         
00300         IConsumer createConsumer (IChannel channel, IEvent.Style style, 
00301                                   IEventListener notify)
00302         {
00303                 IEvent event = new ClusterEvent (this, channel, style, notify);
00304                 
00305                 if (logger)
00306                     logger.info ("creating " ~ event.getStyleName ~ 
00307                                  " consumer for channel '" ~ channel.getName ~ "'");
00308 
00309                 switch (style)
00310                        {
00311                        case IEvent.Style.Message:
00312                             return new MessageConsumer (this, event);
00313 
00314                        case IEvent.Style.Bulletin:
00315                             return new BulletinConsumer (this, event);
00316                        }     
00317         }
00318 
00319         /***********************************************************************
00320 
00321                 Return a entry from the network cache, and optionally
00322                 remove it. This is a synchronous operation as opposed
00323                 to the asynchronous nature of an invalidate broadcast.
00324 
00325         ***********************************************************************/
00326         
00327         IPayload getCache (IChannel channel, char[] key, bool remove)
00328         {
00329                 Channel c = cast(Channel) channel;
00330                 
00331                 c.writer.put (remove ? ProtocolWriter.Command.Remove : 
00332                               ProtocolWriter.Command.Copy, c.getName, null, key);
00333                 return nodeSet.request (c.writer, c.reader, key);
00334         }
00335 
00336         /***********************************************************************
00337 
00338                 Place an entry into the network cache, replacing the
00339                 entry with the identical key. Note that this may cause
00340                 the oldest entry in the cache to be displaced if the
00341                 cache is already full.
00342 
00343         ***********************************************************************/
00344         
00345         IPayload putCache (IChannel channel, char[] key, IPayload payload)
00346         {
00347                 Channel c = cast(Channel) channel;
00348 
00349                 c.writer.put (ProtocolWriter.Command.Add, c.getName, payload, key);
00350                 return nodeSet.request (c.writer, c.reader, key);
00351         }
00352 
00353         /***********************************************************************
00354 
00355                 Add an entry to the specified network queue. May throw a
00356                 QueueFullException if there's no room available.
00357 
00358         ***********************************************************************/
00359         
00360         IPayload putQueue (IChannel channel, IPayload payload)
00361         {
00362                 Channel c = cast(Channel) channel;
00363                 
00364                 c.writer.put (ProtocolWriter.Command.AddQueue, c.getName, payload);
00365                 nodeSet.request (c.writer, c.reader);
00366                 return payload;
00367         }
00368 
00369 
00370         /***********************************************************************
00371                 
00372                 Query the cluster for queued entries on the corresponding 
00373                 channel. Returns, and removes, the first matching entry 
00374                 from the cluster. Note that this sweeps the cluster for
00375                 matching entries, and is synchronous in nature. The more
00376                 common approach is to setup a queue listener, which will
00377                 grab and dispatch queue entries asynchronously.
00378 
00379         ***********************************************************************/
00380         
00381         IPayload getQueue (IChannel channel)
00382         {
00383                 IPayload payload;
00384 
00385                 Channel c = cast(Channel) channel;
00386 
00387                 // callback for NodeSet.scan()
00388                 bool scan (Node node)
00389                 {
00390                         // ask this node ...
00391                         c.writer.put (ProtocolWriter.Command.RemoveQueue, c.getName);
00392                         node.request (node.cache, c.writer, c.reader, payload);
00393                         return payload !== null;
00394                 }
00395 
00396                 // make a pass over each Node, looking for channel entries
00397                 nodeSet.scan (&scan);
00398                 return payload;
00399         }
00400 
00401         /***********************************************************************
00402                 
00403                 Load a network cache entry remotely. This sends the given
00404                 Payload over the network to the cache host, where it will
00405                 be executed locally. The benefit of doing so it that the
00406                 host may deny access to the cache entry for the duration
00407                 of the load operation. This, in turn, provides an elegant 
00408                 mechanism for gating/synchronizing multiple network clients  
00409                 over a given cache entry; handy for those entries that are 
00410                 relatively expensive to construct or access. 
00411 
00412         ***********************************************************************/
00413         
00414         void loadCache (IChannel channel, char[] key, IPayload payload)
00415         {               
00416                 Channel c = cast(Channel) channel;
00417 
00418                 c.writer.put (ProtocolWriter.Command.OK, c.getName, payload, key);
00419                 Node node = nodeSet.selectNode (key);
00420                 node.request (node.tasks, c.writer, c.reader, payload);
00421         }
00422 
00423         /***********************************************************************
00424 
00425                 Return an internet address representing the multicast
00426                 group for the specified channel. We use three of the
00427                 four address segments to represent the channel itself
00428                 (via a hash on the channel name), and set the primary
00429                 segment to be that of the broadcast prefix (above).
00430 
00431         ***********************************************************************/
00432         
00433         synchronized InternetAddress getGroup (char[] channel)
00434         {
00435                 InternetAddress group = cast(InternetAddress) groups.get (channel);
00436 
00437                 if (group is null)
00438                    {
00439                    char[] address;
00440 
00441                    // construct a group address from the prefix & channel-hash,
00442                    // where the hash is folded down to 24 bits
00443                    uint hash = groups.jhash (channel);
00444                    hash = (hash >> 24) ^ (hash & 0x00ffffff);
00445 
00446                    address = Text.toString (groupPrefix) ~ "." ~
00447                              Text.toString ((hash >> 16) & 0xff) ~ "." ~
00448                              Text.toString ((hash >> 8) & 0xff) ~ "." ~
00449                              Text.toString (hash & 0xff);
00450 
00451                    //printf ("channel '%.*s' == '%.*s'\n", channel, address);
00452         
00453                    // insert InternetAddress into hashmap
00454                    group = new InternetAddress (address, groupPort);
00455                    groups.put (channel, group);
00456                    }
00457                 return group;              
00458         }
00459 }
00460 
00461 
00462 /*******************************************************************************         
00463 
00464         A listener for multicast channel traffic. These are currently used 
00465         for cache coherency, queue publishing, and node discovery activity; 
00466         though could be used for direct messaging also.
00467 
00468 *******************************************************************************/
00469 
00470 private class BulletinConsumer : SocketListener, IConsumer
00471 {
00472         private IEvent                  event;
00473         private Buffer                  buffer;
00474         private ProtocolReader          reader;
00475         private Cluster                 cluster;
00476         private MulticastSocket         consumer;
00477 
00478         /***********************************************************************
00479 
00480                 Construct a multicast consumer for the specified event. The
00481                 event handler will be invoked whenever a message arrives for
00482                 the associated channel.
00483 
00484         ***********************************************************************/
00485         
00486         this (Cluster cluster, IEvent event)
00487         {
00488                 this.event = event;
00489                 this.cluster = cluster;
00490 
00491                 // buffer doesn't need to be larger than Ethernet data-frame
00492                 buffer = new Buffer (1500);
00493 
00494                 // make the reader slice directly from the buffer content
00495                 reader = new ProtocolReader (buffer);
00496                 reader.setAllocator (new BufferAllocator);
00497 
00498                 // configure a listener socket
00499                 consumer = new MulticastSocket;
00500                 consumer.join (cluster.getGroup (event.getChannel.getName));
00501                 
00502                 super (consumer, buffer);
00503 
00504                 // fire up this listener
00505                 start ();
00506         }
00507 
00508         /***********************************************************************
00509 
00510                 Notification callback invoked when we receive a multicast
00511                 packet. Note that we check the packet channel-name against
00512                 the one we're consuming, to check for cases where the group
00513                 address had a hash collision.
00514 
00515         ***********************************************************************/
00516         
00517         override void notify (IBuffer buffer)
00518         {
00519                 ProtocolWriter.Command  cmd;
00520                 char[]                  channel;
00521                 char[]                  element;
00522                 
00523                 IPayload payload = reader.getPayload (channel, element, cmd);
00524                 // printf ("notify '%.*s::%.*s' #'%.*s'\n", channel, element, event.getChannel.getName);
00525 
00526                 // check it's really for us first (might be a hash collision)
00527                 if (channel == event.getChannel.getName)
00528                     invoke (event, payload);
00529         }
00530 
00531         /***********************************************************************
00532 
00533                 Handle error conditions from the listener thread.
00534 
00535         ***********************************************************************/
00536 
00537         override void exception (char [] msg)
00538         {
00539                 cluster.getLogger.error ("BulletinConsumer: "~msg);
00540         }
00541 
00542         /***********************************************************************
00543 
00544                 Overridable mean of notifying the client code.
00545                  
00546         ***********************************************************************/
00547         
00548         protected void invoke (IEvent event, IPayload payload)
00549         {
00550                 event.invoke (payload);
00551         }
00552 
00553         /***********************************************************************
00554 
00555                 Return the cluster instance we're associated with.
00556 
00557         ***********************************************************************/
00558         
00559         Cluster getCluster ()
00560         {
00561                 return cluster;
00562         }
00563 
00564         /***********************************************************************
00565 
00566                 Temporarily halt listening. This can be used to ignore
00567                 multicast messages while, for example, the consumer is
00568                 busy doing other things.
00569 
00570         ***********************************************************************/
00571 
00572         void pauseGroup ()
00573         {
00574                 consumer.pauseGroup ();
00575         }
00576 
00577         /***********************************************************************
00578 
00579                 Resume listening, post-pause.
00580 
00581         ***********************************************************************/
00582 
00583         void resumeGroup ()
00584         {
00585                 consumer.resumeGroup ();
00586         }
00587 
00588         /***********************************************************************
00589 
00590                 Cancel this consumer. The listener is effectively disabled
00591                 from this point forward. The listener thread does not halt
00592                 at this point, but waits until the socket-read returns. 
00593                 Note that the D Interface implementation requires us to 
00594                 "reimplement and dispatch" trivial things like this ~ it's
00595                 a pain in the neck to maintain.
00596 
00597         ***********************************************************************/
00598         
00599         void cancel ()
00600         {
00601                 super.cancel ();
00602         }
00603 }
00604 
00605 
00606 /*******************************************************************************
00607         
00608         A listener for queue events. These events are produced by the 
00609         queue host on a periodic bases when it has available entries.
00610         We listen for them (rather than constantly scanning) and then
00611         begin a sweep to process as many as we can. Note that we will
00612         be in competition with other nodes to process these entries.
00613 
00614 *******************************************************************************/
00615 
00616 private class MessageConsumer : BulletinConsumer
00617 {
00618         /***********************************************************************
00619 
00620                 Construct a multicast consumer for the specified event
00621 
00622         ***********************************************************************/
00623         
00624         this (Cluster cluster, IEvent event)
00625         {
00626                 super (cluster, event);
00627         }
00628 
00629         /***********************************************************************
00630 
00631                 Handle error conditions from the listener thread.
00632 
00633         ***********************************************************************/
00634 
00635         override void exception (char [] msg)
00636         {
00637                 cluster.getLogger.error ("MessageConsumer: "~msg);
00638         }
00639 
00640         /***********************************************************************
00641 
00642                 override the default processing to sweep the cluster for 
00643                 queued entries. Each server node is queried until one is
00644                 found that contains a payload. Note that it is possible 
00645                 to set things up where we are told exactly which node to
00646                 go to; howerver given that we won't be listening whilst
00647                 scanning, and that there's likely to be a group of new
00648                 entries in the cluster, it's just as effective to scan.
00649                 This will be far from ideal for all environments, so we 
00650                 should make the strategy plugable instead.                 
00651 
00652         ***********************************************************************/
00653         
00654         override protected void invoke (IEvent event, IPayload payload)
00655         {
00656                 //temporarilty pause listening
00657                 pauseGroup ();
00658 
00659                 try {
00660                     while ((payload = getCluster.getQueue (event.getChannel)) !== null)
00661                             event.invoke (payload);
00662                     } finally 
00663                             // resume listening
00664                             resumeGroup ();
00665         }
00666 }
00667 
00668 
00669 /*******************************************************************************
00670         
00671         A channel represents something akin to a publish/subscribe topic, 
00672         or a radio station. These are used to segregate cluster operations
00673         into a set of groups, where each group is represented by a channel.
00674         Channel names are whatever you want then to be: use of dot notation 
00675         has proved useful in the past. See Client.createChannel
00676 
00677 *******************************************************************************/
00678 
00679 private class Channel : IChannel
00680 {
00681         char[]                  name;
00682         Buffer                  buffer;
00683         ProtocolReader          reader;
00684         ProtocolWriter          writer;
00685 
00686         /***********************************************************************
00687         
00688                 Construct a channel with the specified name. We cache
00689                 a number of session-related constructs here also, in
00690                 order to eliminate runtime overhead
00691 
00692         ***********************************************************************/
00693 
00694         this (char[] name)
00695         in {
00696            assert (name.length);
00697            }
00698         body
00699         {       
00700                 this.name = name;
00701 
00702                 // this buffer will grow as required to house larger payloads
00703                 buffer = new GrowableBuffer (1024 * 2);
00704                 writer = new ProtocolWriter (buffer);
00705 
00706                 // make the reader slice directly from the buffer content
00707                 reader = new ProtocolReader (buffer);
00708                 reader.setAllocator (new BufferAllocator);
00709         }
00710 
00711         /***********************************************************************
00712         
00713                 Return the name of this channel. This is the name provided
00714                 when the channel was constructed.
00715 
00716         ***********************************************************************/
00717 
00718         char[] getName ()
00719         {
00720                 return name;
00721         }
00722 
00723         /***********************************************************************
00724         
00725                 Output this channel via the provided IWriter
00726 
00727         ***********************************************************************/
00728 
00729         void write (IWriter writer)
00730         {
00731                 writer.put (name);
00732         }
00733 
00734         /***********************************************************************
00735         
00736                 Input this channel via the provided IReader
00737 
00738         ***********************************************************************/
00739 
00740         void read (IReader reader)
00741         {
00742                 reader.get (name);
00743         }
00744 }
00745 
00746 
00747 /*******************************************************************************
00748         
00749         An abstraction of a socket connection. Used internally by the 
00750         socket-based Cluster. 
00751 
00752 *******************************************************************************/
00753 
00754 private class Connection
00755 {
00756         abstract bool reset();
00757 
00758         abstract void done (ulong time);
00759 
00760         abstract SocketConduit getConduit ();
00761 }
00762 
00763 
00764 /*******************************************************************************
00765         
00766         A pool of socket connections for accessing cluster nodes. Note 
00767         that the entries will timeout after a period of inactivity, and
00768         will subsequently cause a connected host to drop the supporting
00769         session.
00770 
00771 *******************************************************************************/
00772 
00773 private class ConnectionPool
00774 { 
00775         private int             count;
00776         private InternetAddress address;
00777         private PoolConnection  freelist;
00778         private const ulong     timeout = 60_000;
00779 
00780         /***********************************************************************
00781         
00782                 Utility class to provide the basic connection facilities
00783                 provided by the connection pool.
00784 
00785         ***********************************************************************/
00786 
00787         class PoolConnection : Connection
00788         {
00789                 ulong           time;
00790                 PoolConnection  next;   
00791                 ConnectionPool  parent;   
00792                 SocketConduit   conduit;
00793 
00794                 /***************************************************************
00795                 
00796                         Construct a new connection and set its parent
00797 
00798                 ***************************************************************/
00799         
00800                 this (ConnectionPool pool)
00801                 {
00802                         parent = pool;
00803                         reset ();
00804                 }
00805                   
00806                 /***************************************************************
00807 
00808                         Return the socket belonging to this connection
00809 
00810                 ***************************************************************/
00811         
00812                 SocketConduit getConduit ()
00813                 {
00814                         return conduit;
00815                 }
00816                   
00817                 /***************************************************************
00818 
00819                         Create a new socket and connect it to the specified 
00820                         server. This will cause a dedicated thread to start 
00821                         on the server. Said thread will quit when an error
00822                         occurs.
00823 
00824                 ***************************************************************/
00825         
00826                 bool reset ()
00827                 {
00828                         try {
00829                             conduit = new SocketConduit;
00830                             conduit.connect (parent.address);
00831 
00832                             // set a 500ms timeout for read operations
00833                             conduit.setTimeout (System.Interval.Millisec * 500);
00834                             return true;
00835 
00836                             } catch (Object)
00837                                     {
00838                                     return false;
00839                                     }
00840                 }
00841                   
00842                 /***************************************************************
00843 
00844                         Close the socket. This will cause any host session
00845                         to be terminated.
00846 
00847                 ***************************************************************/
00848         
00849                 void close ()
00850                 {
00851                         conduit.close ();
00852                 }
00853 
00854                 /***************************************************************
00855 
00856                         Return this connection to the free-list. Note that
00857                         we have to synchronize on the parent-pool itself.
00858 
00859                 ***************************************************************/
00860 
00861                 void done (ulong time)
00862                 {
00863                         synchronized (parent)
00864                                      {
00865                                      next = parent.freelist;
00866                                      parent.freelist = this;
00867                                      this.time = time;
00868                                      }
00869                 }
00870         }
00871 
00872 
00873         /***********************************************************************
00874 
00875                 Create a connection-pool for the specified address.
00876 
00877         ***********************************************************************/
00878 
00879         this (InternetAddress address)
00880         {      
00881                 this.address = address;
00882         }
00883 
00884         /***********************************************************************
00885 
00886                 Allocate a Connection from a list rather than creating a 
00887                 new one. Reap old entries as we go.
00888 
00889         ***********************************************************************/
00890 
00891         synchronized Connection borrow (ulong time)
00892         {  
00893                 if (freelist)
00894                     do {
00895                        PoolConnection c = freelist;
00896 
00897                        freelist = c.next;
00898                        if (freelist && (time - c.time > timeout))
00899                            c.close ();
00900                        else
00901                           return c;
00902                        } while (true);
00903 
00904                 return new PoolConnection (this);
00905         }
00906 
00907         /***********************************************************************
00908 
00909                 Close this pool and drop all existing connections.
00910 
00911         ***********************************************************************/
00912 
00913         synchronized void close ()
00914         {       
00915                 PoolConnection c = freelist;
00916                 freelist = null;
00917                 while (c)
00918                       {
00919                       c.close ();
00920                       c = c.next;
00921                       }
00922         }
00923 }
00924 
00925 
00926 /*******************************************************************************
00927         
00928         Class to represent a cluster node. Each node supports both cache
00929         and queue functionality. Note that the set of available nodes is
00930         configured at startup, simplifying the discovery process in some
00931         significant ways, and causing less thrashing of cache-keys.
00932 
00933 *******************************************************************************/
00934 
00935 private class Node
00936 { 
00937         private char[]                  name,
00938                                         port;
00939         private ILogger                 logger;
00940         private bool                    enabled;
00941 
00942         private ConnectionPool          tasks,
00943                                         cache;
00944 
00945         /***********************************************************************
00946 
00947                 Construct a node with the provided name. This name should
00948                 be the network name of the hosting device.
00949 
00950         ***********************************************************************/
00951         
00952         this (ILogger logger, char[] name)
00953         {
00954                 this.logger = logger;
00955                 this.name = name;
00956         }
00957 
00958         /***********************************************************************
00959 
00960                 Add a cache/queue reference for the remote node
00961                  
00962         ***********************************************************************/
00963 
00964         void setCache (InternetAddress address)
00965         {      
00966                 this.cache = new ConnectionPool (address);
00967                 port = Text.toString (address.port);
00968         }
00969 
00970         /***********************************************************************
00971 
00972                 Add a cache-loader reference for the remote node
00973 
00974         ***********************************************************************/
00975 
00976         void setTasks (InternetAddress address)
00977         {      
00978                 this.tasks = new ConnectionPool (address);
00979         }
00980 
00981         /***********************************************************************
00982 
00983                 Return the name of this node (the network name of the device)
00984 
00985         ***********************************************************************/
00986         
00987         override char[] toString ()
00988         {
00989                 return name;
00990         }
00991 
00992         /***********************************************************************
00993 
00994                 Remove this Node from the cluster. The node is disabled
00995                 until it is seen to recover.
00996 
00997         ***********************************************************************/
00998 
00999         void fail ()
01000         {       
01001                 setEnabled (false);
01002                 cache.close ();    
01003                 tasks.close ();    
01004         }
01005 
01006         /***********************************************************************
01007 
01008                 Get the current state of this node
01009 
01010         ***********************************************************************/
01011 
01012         bool isEnabled ()
01013         {      
01014                 volatile  
01015                        return enabled;    
01016         }
01017 
01018         /***********************************************************************
01019 
01020                 Set the enabled state of this node
01021 
01022         ***********************************************************************/
01023 
01024         void setEnabled (bool enabled)
01025         {      
01026                 if (logger && logger.isEnabled (logger.Level.Trace))
01027                    {
01028                    if (enabled)
01029                        logger.trace ("enabling node '"~name~"' on port "~port);
01030                    else
01031                       logger.trace ("disabling node '"~name~"'");
01032                    }
01033                 volatile  
01034                        this.enabled = enabled;    
01035         }
01036 
01037         /***********************************************************************
01038 
01039                 request data; fail this Node if we can't connect. Note
01040                 that we make several attempts to connect before writing
01041                 the node off as a failure.
01042                 
01043         ***********************************************************************/
01044         
01045         bool request (ConnectionPool pool, ProtocolWriter writer, ProtocolReader reader, out IPayload payload)
01046         {       
01047                 ProtocolWriter.Command  cmd;
01048                 ulong                   time;
01049                 char[]                  channel;
01050                 char[]                  element;
01051                 Connection              connect;
01052 
01053                 // it's possible that the pool may have failed between 
01054                 // the point of selecting it, and the invocation itself
01055                 if (pool is null)
01056                     return false;
01057 
01058                 // get a connection to the server
01059                 connect = pool.borrow (time = System.getMillisecs);
01060 
01061                 // talk to the server (try a few times if necessary)
01062                 for (int attempts=3; attempts--;)
01063                      try {
01064                          // attach connection to reader and writer 
01065                          writer.getBuffer.setConduit (connect.getConduit);
01066                          reader.getBuffer.setConduit (connect.getConduit);
01067         
01068                          // send request
01069                          writer.flush ();
01070 
01071                          // load the returned object (don't retry!)
01072                          attempts = 0;
01073                          payload = reader.getPayload (channel, element, cmd);
01074 
01075                          // return borrowed connection
01076                          connect.done (time);
01077 
01078                          } catch (PickleException x)
01079                                  {
01080                                  connect.done (time);
01081                                  throw x;
01082                                  }
01083                            catch (IOException x)
01084                                   // attempt to reconnect?
01085                                   if (attempts == 0 || !connect.reset)
01086                                      {
01087                                      // that server is offline
01088                                      fail ();
01089   
01090                                      // state that we failed
01091                                      return false;
01092                                      }
01093                     
01094                 // is payload an exception?
01095                 if (cmd != ProtocolWriter.Command.OK)                       
01096                    {
01097                    // is node full?
01098                    if (cmd == ProtocolWriter.Command.Full)                       
01099                        throw new ClusterFullException (channel);
01100 
01101                    // did node barf?
01102                    if (cmd == ProtocolWriter.Command.Exception)                       
01103                        throw new ClusterException (channel);
01104                         
01105                    // bogus response
01106                    throw new ClusterException ("invalid response from cluster server");
01107                    }
01108 
01109                 return true;
01110         }
01111 }
01112 
01113 
01114 /*******************************************************************************
01115         
01116         Models a set of remote cluster nodes.
01117 
01118 *******************************************************************************/
01119 
01120 private class NodeSet
01121 { 
01122         private HashMap         map;
01123         private Node[]          nodes;
01124         private Node[]          random;
01125         private ILogger         logger;
01126 
01127         /***********************************************************************
01128 
01129         ***********************************************************************/
01130         
01131         this (ILogger logger)
01132         {
01133                 this.logger = logger;
01134                 map = new HashMap (128, 0.75, 4);
01135         }
01136 
01137         /***********************************************************************
01138 
01139         ***********************************************************************/
01140         
01141         ILogger getLogger ()
01142         {
01143                 return logger;
01144         }
01145 
01146         /***********************************************************************
01147 
01148                 Add a node to the list of servers, and sort them such that
01149                 all clients will have the same ordered set
01150 
01151         ***********************************************************************/
01152         
01153         synchronized void addNode (Node node)
01154         {
01155                 char[]  name = node.toString;
01156 
01157                 if (map.get (name))
01158                     throw new ClusterException ("Attempt to add cluster node '"~name~"' more than once");
01159 
01160                 map.put (name, node);
01161                 nodes ~= node;
01162                 //nodes.sort;
01163         }
01164 
01165         /***********************************************************************
01166 
01167         ***********************************************************************/
01168         
01169         void optimize ()
01170         {
01171                 // nodes are already in the same order across the cluster
01172                 // since the configuration file should be identical ...
01173 
01174                 // nodes.sort;
01175                 
01176                 // copy the node list
01177                 random = nodes.dup;
01178 
01179                 // muddle up the duplicate list. This randomized list
01180                 // is used when scanning the cluster for queued entries
01181                 foreach (int i, Node n; random)
01182                         {
01183                         int j = Random.get (random.length);
01184                         Node tmp = random[i];
01185                         random[i] = random[j];
01186                         random[j] = tmp;
01187                         }
01188         }
01189 
01190         /***********************************************************************
01191 
01192         ***********************************************************************/
01193         
01194         synchronized void enable (char[] name, ushort port1, ushort port2)
01195         {
01196                 Node node = cast(Node) map.get (name);
01197                 if (node is null)
01198                     throw new ClusterException ("Attempt to enable unknown cluster node '"~name~"'");
01199                 else
01200                    if (! node.isEnabled)
01201                       {
01202                       node.setCache (new InternetAddress (name, port1));
01203                       node.setTasks (new InternetAddress (name, port2));
01204                       node.setEnabled (true);
01205                       }
01206         }
01207 
01208         /***********************************************************************
01209 
01210         ***********************************************************************/
01211         
01212         IPayload request (ProtocolWriter writer, ProtocolReader reader, char[] key = null)
01213         {
01214                 Node     node;
01215                 IPayload payload;
01216 
01217                 do {
01218                    node = selectNode (key);
01219                    } while (! node.request (node.cache, writer, reader, payload));
01220                 return payload;
01221         }
01222 
01223         /***********************************************************************
01224 
01225                 Select a cluster server based on a starting index. If the
01226                 selected server is not currently enabled, we just try the
01227                 next one. This behaviour should be consistent across each
01228                 cluster client.
01229 
01230         ***********************************************************************/
01231         
01232         private final Node selectNode (uint index)
01233         {
01234                 uint i = nodes.length;
01235 
01236                 if (i)
01237                    {
01238                    index %= i;
01239 
01240                    while (i--)
01241                          {
01242                          Node node = nodes[index];
01243                          if (node.isEnabled)
01244                              return node;
01245         
01246                          if (++index >= nodes.length)
01247                              index = 0;
01248                          }
01249                    }
01250                 throw new ClusterEmptyException ("No cluster servers are available"); 
01251         }
01252 
01253         /***********************************************************************
01254 
01255                 Select a cluster server based on the specified key. If the
01256                 selected server is not currently enabled, we just try the
01257                 next one. This behaviour should be consistent across each
01258                 cluster client.
01259 
01260         ***********************************************************************/
01261         
01262         final Node selectNode (char[] key = null)
01263         {
01264                 static uint index;
01265               
01266                 if (key.length)
01267                     return selectNode (HashMap.jhash (key));
01268 
01269                 // no key provided, so just roll the counter
01270                 return selectNode (++index);
01271         }
01272 
01273         /***********************************************************************
01274 
01275                 Sweep the cluster servers. Returns true if the delegate
01276                 returns true, false otherwise. The sweep is aborted when
01277                 the delegate returns true. Note that this scans nodes in
01278                 a randomized pattern, which should tend to avoid 'bursty'
01279                 activity by a set of clients upon any one cluster server.
01280 
01281         ***********************************************************************/
01282         
01283         bool scan (bool delegate (Node) dg)
01284         {
01285                 Node    node;
01286                 int     index = nodes.length;
01287 
01288                 while (index--)
01289                       {
01290                       // lookup the randomized set of server nodes
01291                       node = random [index];
01292 
01293                       // callback on each enabled node
01294                       if (node.isEnabled)
01295                           if (dg (node))
01296                               return true;
01297                       }
01298                 return false;
01299         }
01300 }
01301 
01302  
01303 

Generated on Tue Jan 25 21:18:20 2005 for Mango by doxygen 1.3.6