Main Page | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Class Members | File Members | Related Pages

Cluster.d

Go to the documentation of this file.
00001 /*******************************************************************************
00002 
00003         @file Cluster.d
00004         
00005         Copyright (c) 2004 Kris Bell
00006         
00007         This software is provided 'as-is', without any express or implied
00008         warranty. In no event will the authors be held liable for damages
00009         of any kind arising from the use of this software.
00010         
00011         Permission is hereby granted to anyone to use this software for any 
00012         purpose, including commercial applications, and to alter it and/or 
00013         redistribute it freely, subject to the following restrictions:
00014         
00015         1. The origin of this software must not be misrepresented; you must 
00016            not claim that you wrote the original software. If you use this 
00017            software in a product, an acknowledgment within documentation of 
00018            said product would be appreciated but is not required.
00019 
00020         2. Altered source versions must be plainly marked as such, and must 
00021            not be misrepresented as being the original software.
00022 
00023         3. This notice may not be removed or altered from any distribution
00024            of the source.
00025 
00026         4. Derivative works are permitted, but they must carry this notice
00027            in full and credit the original source.
00028 
00029 
00030                         ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
00031 
00032 
00033         @version        Initial version, July 2004      
00034         @author         Kris
00035 
00036 
00037 *******************************************************************************/
00038 
00039 module mango.cluster.qos.socket.Cluster;
00040 
00041 private import  mango.cache.HashMap;
00042 
00043 private import  mango.utils.Random;
00044 
00045 private import  mango.convert.Integer;
00046 
00047 private import  mango.sys.System;
00048 
00049 private import  mango.io.Buffer,
00050                 mango.io.Socket,
00051                 mango.io.Exception,
00052                 mango.io.Properties,
00053                 mango.io.GrowBuffer,
00054                 mango.io.ArrayAllocator,
00055                 mango.io.SocketConduit,
00056                 mango.io.SocketListener,
00057                 mango.io.MulticastSocket;
00058 
00059 private import  mango.io.model.IConduit;
00060 
00061 private import  mango.log.model.ILogger;
00062 
00063 private import  mango.cluster.Client;
00064 
00065 public  import  mango.cluster.model.ICluster;
00066 
00067 private import  mango.cluster.qos.socket.RollCall,
00068                 mango.cluster.qos.socket.ClusterEvent,
00069                 mango.cluster.qos.socket.ProtocolReader,
00070                 mango.cluster.qos.socket.ProtocolWriter;
00071 
00072 
00073 /*******************************************************************************
00074         
00075         QOS implementation for sockets. All cluster-client activity is 
00076         gated through here by the higher level classes; NetworkQueue &
00077         NetworkCache for example. You gain access to the cluster by 
00078         creating an instance of the QOS (quality of service) you desire
00079         and mapping client classes onto it. For example:
00080 
00081         @code
00082         import mango.cluster.NetworkCache;
00083         import mango.cluster.qos.socket.Cluster;
00084 
00085         ICluster cluster = new Cluster (...);
00086         NetworkCache cache = new NetworkCache (cluster, ...);
00087 
00088         cache.put (...);
00089         cache.get (...);
00090         cache.invalidate (...);
00091         @endcode
00092 
00093         Please see the cluster clients for additional details. Currently
00094         these include CacheInvalidator, CacheInvalidatee, NetworkMessage, 
00095         NetworkTask, NetworkQueue, NetworkCache, NetworkCombo, plus the 
00096         Client base-class.
00097 
00098 *******************************************************************************/
00099 
00100 class Cluster : ICluster, IEventListener
00101 {  
00102         private static HashMap                  groups;
00103         private ILogger                         logger;
00104         private NodeSet                         nodeSet;
00105         private Buffer                          mBuffer;
00106         private ProtocolWriter                  mWriter;
00107         private MulticastSocket                 mSocket;
00108 
00109         private int                             groupTTL = 1;
00110         private int                             groupPort = 3333;
00111         private int                             groupPrefix = 225;
00112      
00113         /***********************************************************************
00114 
00115                 Setup a hashmap for multicast group addresses
00116 
00117         ***********************************************************************/
00118         
00119         static this ()
00120         {
00121                 groups = new HashMap (128, 0.75, 2);
00122         }
00123 
00124         /***********************************************************************
00125 
00126                 Setup a Cluster instance. Currently the buffer & writer
00127                 are shared for all bulletin serialization; this should
00128                 probably change at some point such that we can support 
00129                 multiple threads broadcasting concurrently to different 
00130                 output ports.
00131 
00132         ***********************************************************************/
00133         
00134         this (ILogger logger = null)
00135         {
00136                 this.logger = logger;
00137                 nodeSet = new NodeSet (logger);
00138                 mBuffer = new Buffer (1024 * 4);
00139                 mSocket = new MulticastSocket;
00140                 mWriter = new ProtocolWriter (mBuffer);
00141         }
00142 
00143         /***********************************************************************
00144 
00145                 Setup a Cluster instance. Currently the buffer & writer
00146                 are shared for all bulletin serialization; this should
00147                 probably change at some point such that we can support 
00148                 multiple threads broadcasting concurrently to different 
00149                 output ports.
00150 
00151         ***********************************************************************/
00152         
00153         this (ILogger logger, IConduit conduit)
00154         in {
00155            assert (logger);
00156            assert (conduit);
00157            }
00158         body
00159         {
00160                 this (logger);
00161 
00162                 // callback for loading cluster configuration 
00163                 void loader (char[] name, char[] value)
00164                 {
00165                         logger.info ("cluster config: "~name~" = "~value);
00166                         if (name == "node")
00167                             nodeSet.addNode (new Node (logger, value));
00168                         else
00169                         if (name == "multicast_port")
00170                             groupPort = cast(int) Integer.parse (value);
00171                         else
00172                            if (name == "multicast_prefix")
00173                                groupPrefix = cast(int) Integer.parse (value);
00174                         else
00175                            if (name == "multicast_ttl")
00176                                groupTTL = cast(int) Integer.parse (value);
00177                         else
00178                            throw new ClusterException ("Unrecognized attribute '"~name~"' in socket.Cluster configuration");
00179                 }
00180 
00181 
00182                 // load up the cluster configuration
00183                 Properties.load (conduit, &loader);
00184 
00185                 // finalize nodeSet 
00186                 nodeSet.optimize ();
00187 
00188                 // listen for cluster servers
00189                 IChannel channel = createChannel ("cluster.server.advertise");
00190                 createConsumer (channel, IEvent.Style.Bulletin, this);
00191 
00192                 // ask who's currently running
00193                 logger.trace ("discovering active cluster servers ...");
00194                 broadcast (channel, new RollCall (Socket.hostName(), 0, 0, true));
00195 
00196                 // wait for enabled servers to respond ...
00197                 System.sleep (System.Interval.Millisec * 250);
00198         }
00199 
00200         /***********************************************************************
00201 
00202                 Setup a Cluster instance. Currently the buffer & writer
00203                 are shared for all bulletin serialization; this should
00204                 probably change at some point such that we can support 
00205                 multiple threads broadcasting concurrently to different 
00206                 output ports.
00207 
00208         ***********************************************************************/
00209         
00210         this (ILogger logger, uint serverPort)
00211         in {
00212            assert (logger);
00213            assert (serverPort > 1024);
00214            }
00215         body
00216         {
00217                 this (logger);
00218 
00219                 Node node = new Node (logger, "local");
00220                 node.setCache (new InternetAddress ("localhost", serverPort));
00221                 node.setTasks (new InternetAddress ("localhost", serverPort+1));
00222                 node.setEnabled (true);
00223                 nodeSet.addNode (node);
00224                 nodeSet.optimize ();
00225         }
00226 
00227         /***********************************************************************
00228 
00229                 IEventListener interface method for listening to RollCall
00230                 responses. These are sent out by cluster servers both when 
00231                 they get a RollCall request, and when they begin execution.
00232 
00233         ***********************************************************************/
00234 
00235         void notify (IEvent event, IPayload payload)
00236         {
00237                 RollCall rollcall = cast(RollCall) payload;
00238 
00239                 // ignore requests from clients (we're on a common channel)
00240                 if (! rollcall.request)
00241                       nodeSet.enable (rollcall.name, 
00242                                       cast(ushort) rollcall.port1, 
00243                                       cast(ushort) rollcall.port2);
00244         }
00245 
00246         /***********************************************************************
00247 
00248                 Create a channel instance. Our channel implementation 
00249                 includes a number of cached IO helpers (ProtolcolWriter
00250                 and so on) which simplifies and speeds up execution.
00251 
00252         ***********************************************************************/
00253         
00254         IChannel createChannel (char[] channel)
00255         {
00256                 return new Channel (channel);
00257         }
00258 
00259         /***********************************************************************
00260 
00261                 Return the logger instance provided during construction.
00262                 
00263         ***********************************************************************/
00264         
00265         ILogger getLogger ()
00266         {
00267                 return logger;
00268         }
00269 
00270         /***********************************************************************
00271 
00272                 Broadcast a payload on the specified channel. This uses
00273                 IP/Multicast to scatter the payload to all registered
00274                 listeners (on the same multicast group). Note that the
00275                 maximum payload size is limited to that of an Ethernet 
00276                 data frame, minus the IP/UDP header size (1472 bytes).
00277 
00278         ***********************************************************************/
00279         
00280         synchronized void broadcast (IChannel channel, IPayload payload = null)
00281         {
00282                 // serialize content
00283                 mBuffer.clear ();
00284                 mWriter.put (ProtocolWriter.Command.OK, channel.getName, payload);
00285 
00286                 // Ethernet data-frame size minus the 28 byte UDP/IP header:
00287                 if (mBuffer.getPosition > 1472)
00288                     throw new ClusterException ("payload is too large to broadcast");
00289 
00290                 // send it to the appropriate multicast group
00291                 mSocket.write (mBuffer, getGroup (channel.getName));
00292         }
00293 
00294         /***********************************************************************
00295 
00296                 Create a listener of the specified type. Listeners are 
00297                 run within their own thread, since they spend the vast 
00298                 majority of their time blocked on a Socket read. Would
00299                 be good to support multiplexed reading instead, such 
00300                 that a thread pool could be applied instead.
00301                  
00302         ***********************************************************************/
00303         
00304         IConsumer createConsumer (IChannel channel, IEvent.Style style, 
00305                                   IEventListener notify)
00306         {
00307                 IEvent event = new ClusterEvent (this, channel, style, notify);
00308                 
00309                 if (logger)
00310                     logger.info ("creating " ~ event.getStyleName ~ 
00311                                  " consumer for channel '" ~ channel.getName ~ "'");
00312 
00313                 switch (style)
00314                        {
00315                        case IEvent.Style.Message:
00316                             return new MessageConsumer (this, event);
00317 
00318                        case IEvent.Style.Bulletin:
00319                             return new BulletinConsumer (this, event);
00320 
00321                        default:
00322                             throw new ClusterException ("Invalid consumer style");
00323                        }  
00324                return null; 
00325         }
00326 
00327         /***********************************************************************
00328 
00329                 Return a entry from the network cache, and optionally
00330                 remove it. This is a synchronous operation as opposed
00331                 to the asynchronous nature of an invalidate broadcast.
00332 
00333         ***********************************************************************/
00334         
00335         IPayload getCache (IChannel channel, char[] key, bool remove)
00336         {
00337                 Channel c = cast(Channel) channel;
00338                 
00339                 c.writer.put (remove ? ProtocolWriter.Command.Remove : 
00340                               ProtocolWriter.Command.Copy, c.getName, null, key);
00341                 return nodeSet.request (c.writer, c.reader, key);
00342         }
00343 
00344         /***********************************************************************
00345 
00346                 Place an entry into the network cache, replacing the
00347                 entry with the identical key. Note that this may cause
00348                 the oldest entry in the cache to be displaced if the
00349                 cache is already full.
00350 
00351         ***********************************************************************/
00352         
00353         IPayload putCache (IChannel channel, char[] key, IPayload payload)
00354         {
00355                 Channel c = cast(Channel) channel;
00356 
00357                 c.writer.put (ProtocolWriter.Command.Add, c.getName, payload, key);
00358                 return nodeSet.request (c.writer, c.reader, key);
00359         }
00360 
00361         /***********************************************************************
00362 
00363                 Add an entry to the specified network queue. May throw a
00364                 QueueFullException if there's no room available.
00365 
00366         ***********************************************************************/
00367         
00368         IPayload putQueue (IChannel channel, IPayload payload)
00369         {
00370                 Channel c = cast(Channel) channel;
00371                 
00372                 c.writer.put (ProtocolWriter.Command.AddQueue, c.getName, payload);
00373                 nodeSet.request (c.writer, c.reader);
00374                 return payload;
00375         }
00376 
00377 
00378         /***********************************************************************
00379                 
00380                 Query the cluster for queued entries on the corresponding 
00381                 channel. Returns, and removes, the first matching entry 
00382                 from the cluster. Note that this sweeps the cluster for
00383                 matching entries, and is synchronous in nature. The more
00384                 common approach is to setup a queue listener, which will
00385                 grab and dispatch queue entries asynchronously.
00386 
00387         ***********************************************************************/
00388         
00389         IPayload getQueue (IChannel channel)
00390         {
00391                 IPayload payload;
00392 
00393                 Channel c = cast(Channel) channel;
00394 
00395                 // callback for NodeSet.scan()
00396                 bool scan (Node node)
00397                 {
00398                         // ask this node ...
00399                         c.writer.put (ProtocolWriter.Command.RemoveQueue, c.getName);
00400                         node.request (node.cache, c.writer, c.reader, payload);
00401                         return cast(bool) (payload !is null);
00402                 }
00403 
00404                 // make a pass over each Node, looking for channel entries
00405                 nodeSet.scan (&scan);
00406                 return payload;
00407         }
00408 
00409         /***********************************************************************
00410                 
00411                 Load a network cache entry remotely. This sends the given
00412                 Payload over the network to the cache host, where it will
00413                 be executed locally. The benefit of doing so it that the
00414                 host may deny access to the cache entry for the duration
00415                 of the load operation. This, in turn, provides an elegant 
00416                 mechanism for gating/synchronizing multiple network clients  
00417                 over a given cache entry; handy for those entries that are 
00418                 relatively expensive to construct or access. 
00419 
00420         ***********************************************************************/
00421         
00422         void loadCache (IChannel channel, char[] key, IPayload payload)
00423         {               
00424                 Channel c = cast(Channel) channel;
00425 
00426                 c.writer.put (ProtocolWriter.Command.OK, c.getName, payload, key);
00427                 Node node = nodeSet.selectNode (key);
00428                 node.request (node.tasks, c.writer, c.reader, payload);
00429         }
00430 
00431         /***********************************************************************
00432 
00433                 Return an internet address representing the multicast
00434                 group for the specified channel. We use three of the
00435                 four address segments to represent the channel itself
00436                 (via a hash on the channel name), and set the primary
00437                 segment to be that of the broadcast prefix (above).
00438 
00439         ***********************************************************************/
00440         
00441         synchronized InternetAddress getGroup (char[] channel)
00442         {
00443                 InternetAddress group = cast(InternetAddress) groups.get (channel);
00444 
00445                 if (group is null)
00446                    {
00447                    // construct a group address from the prefix & channel-hash,
00448                    // where the hash is folded down to 24 bits
00449                    uint hash = groups.jhash (channel);
00450                    hash = (hash >> 24) ^ (hash & 0x00ffffff);
00451                   
00452                    char[] address = Integer.format (new char[5], groupPrefix) ~ "." ~
00453                                     Integer.format (new char[5], (hash >> 16) & 0xff) ~ "." ~
00454                                     Integer.format (new char[5], (hash >> 8) & 0xff) ~ "." ~
00455                                     Integer.format (new char[5], hash & 0xff);
00456 
00457                    //printf ("channel '%.*s' == '%.*s'\n", channel, address);
00458         
00459                    // insert InternetAddress into hashmap
00460                    group = new InternetAddress (address, groupPort);
00461                    groups.put (channel, group);
00462                    }
00463                 return group;              
00464         }
00465 }
00466 
00467 
00468 /*******************************************************************************         
00469 
00470         A listener for multicast channel traffic. These are currently used 
00471         for cache coherency, queue publishing, and node discovery activity; 
00472         though could be used for direct messaging also.
00473 
00474 *******************************************************************************/
00475 
00476 private class BulletinConsumer : SocketListener, IConsumer
00477 {
00478         private IEvent                  event;
00479         private Buffer                  buffer;
00480         private ProtocolReader          reader;
00481         private Cluster                 cluster;
00482         private MulticastSocket         consumer;
00483 
00484         /***********************************************************************
00485 
00486                 Construct a multicast consumer for the specified event. The
00487                 event handler will be invoked whenever a message arrives for
00488                 the associated channel.
00489 
00490         ***********************************************************************/
00491         
00492         this (Cluster cluster, IEvent event)
00493         {
00494                 this.event = event;
00495                 this.cluster = cluster;
00496 
00497                 // buffer doesn't need to be larger than Ethernet data-frame
00498                 buffer = new Buffer (1500);
00499 
00500                 // make the reader slice directly from the buffer content
00501                 reader = new ProtocolReader (buffer);
00502                 reader.setAllocator (new BufferAllocator);
00503 
00504                 // configure a listener socket
00505                 consumer = new MulticastSocket;
00506                 consumer.join (cluster.getGroup (event.getChannel.getName));
00507                 
00508                 super (consumer, buffer);
00509 
00510                 // fire up this listener
00511                 start ();
00512         }
00513 
00514         /***********************************************************************
00515 
00516                 Notification callback invoked when we receive a multicast
00517                 packet. Note that we check the packet channel-name against
00518                 the one we're consuming, to check for cases where the group
00519                 address had a hash collision.
00520 
00521         ***********************************************************************/
00522         
00523         override void notify (IBuffer buffer)
00524         {
00525                 ProtocolWriter.Command  cmd;
00526                 char[]                  channel;
00527                 char[]                  element;
00528                 
00529                 IPayload payload = reader.getPayload (channel, element, cmd);
00530                 // printf ("notify '%.*s::%.*s' #'%.*s'\n", channel, element, event.getChannel.getName);
00531 
00532                 // check it's really for us first (might be a hash collision)
00533                 if (channel == event.getChannel.getName)
00534                     invoke (event, payload);
00535         }
00536 
00537         /***********************************************************************
00538 
00539                 Handle error conditions from the listener thread.
00540 
00541         ***********************************************************************/
00542 
00543         override void exception (char [] msg)
00544         {
00545                 cluster.getLogger.error ("BulletinConsumer: "~msg);
00546         }
00547 
00548         /***********************************************************************
00549 
00550                 Overridable mean of notifying the client code.
00551                  
00552         ***********************************************************************/
00553         
00554         protected void invoke (IEvent event, IPayload payload)
00555         {
00556                 event.invoke (payload);
00557         }
00558 
00559         /***********************************************************************
00560 
00561                 Return the cluster instance we're associated with.
00562 
00563         ***********************************************************************/
00564         
00565         Cluster getCluster ()
00566         {
00567                 return cluster;
00568         }
00569 
00570         /***********************************************************************
00571 
00572                 Temporarily halt listening. This can be used to ignore
00573                 multicast messages while, for example, the consumer is
00574                 busy doing other things.
00575 
00576         ***********************************************************************/
00577 
00578         void pauseGroup ()
00579         {
00580                 consumer.pauseGroup ();
00581         }
00582 
00583         /***********************************************************************
00584 
00585                 Resume listening, post-pause.
00586 
00587         ***********************************************************************/
00588 
00589         void resumeGroup ()
00590         {
00591                 consumer.resumeGroup ();
00592         }
00593 
00594         /***********************************************************************
00595 
00596                 Cancel this consumer. The listener is effectively disabled
00597                 from this point forward. The listener thread does not halt
00598                 at this point, but waits until the socket-read returns. 
00599                 Note that the D Interface implementation requires us to 
00600                 "reimplement and dispatch" trivial things like this ~ it's
00601                 a pain in the neck to maintain.
00602 
00603         ***********************************************************************/
00604         
00605         void cancel ()
00606         {
00607                 super.cancel ();
00608         }
00609 }
00610 
00611 
00612 /*******************************************************************************
00613         
00614         A listener for queue events. These events are produced by the 
00615         queue host on a periodic bases when it has available entries.
00616         We listen for them (rather than constantly scanning) and then
00617         begin a sweep to process as many as we can. Note that we will
00618         be in competition with other nodes to process these entries.
00619 
00620 *******************************************************************************/
00621 
00622 private class MessageConsumer : BulletinConsumer
00623 {
00624         /***********************************************************************
00625 
00626                 Construct a multicast consumer for the specified event
00627 
00628         ***********************************************************************/
00629         
00630         this (Cluster cluster, IEvent event)
00631         {
00632                 super (cluster, event);
00633         }
00634 
00635         /***********************************************************************
00636 
00637                 Handle error conditions from the listener thread.
00638 
00639         ***********************************************************************/
00640 
00641         override void exception (char [] msg)
00642         {
00643                 cluster.getLogger.error ("MessageConsumer: "~msg);
00644         }
00645 
00646         /***********************************************************************
00647 
00648                 override the default processing to sweep the cluster for 
00649                 queued entries. Each server node is queried until one is
00650                 found that contains a payload. Note that it is possible 
00651                 to set things up where we are told exactly which node to
00652                 go to; howerver given that we won't be listening whilst
00653                 scanning, and that there's likely to be a group of new
00654                 entries in the cluster, it's just as effective to scan.
00655                 This will be far from ideal for all environments, so we 
00656                 should make the strategy plugable instead.                 
00657 
00658         ***********************************************************************/
00659         
00660         override protected void invoke (IEvent event, IPayload payload)
00661         {
00662                 //temporarilty pause listening
00663                 pauseGroup ();
00664 
00665                 try {
00666                     while ((payload = getCluster.getQueue (event.getChannel)) !is null)
00667                             event.invoke (payload);
00668                     } finally 
00669                             // resume listening
00670                             resumeGroup ();
00671         }
00672 }
00673 
00674 
00675 /*******************************************************************************
00676         
00677         A channel represents something akin to a publish/subscribe topic, 
00678         or a radio station. These are used to segregate cluster operations
00679         into a set of groups, where each group is represented by a channel.
00680         Channel names are whatever you want then to be: use of dot notation 
00681         has proved useful in the past. See Client.createChannel
00682 
00683 *******************************************************************************/
00684 
00685 private class Channel : IChannel
00686 {
00687         char[]                  name;
00688         GrowBuffer              buffer;
00689         ProtocolReader          reader;
00690         ProtocolWriter          writer;
00691 
00692         /***********************************************************************
00693         
00694                 Construct a channel with the specified name. We cache
00695                 a number of session-related constructs here also, in
00696                 order to eliminate runtime overhead
00697 
00698         ***********************************************************************/
00699 
00700         this (char[] name)
00701         in {
00702            assert (name.length);
00703            }
00704         body
00705         {       
00706                 this.name = name;
00707 
00708                 // this buffer will grow as required to house larger payloads
00709                 buffer = new GrowBuffer (1024 * 2);
00710                 writer = new ProtocolWriter (buffer);
00711 
00712                 // make the reader slice directly from the buffer content
00713                 reader = new ProtocolReader (buffer);
00714                 reader.setAllocator (new BufferAllocator);
00715         }
00716 
00717         /***********************************************************************
00718         
00719                 Return the name of this channel. This is the name provided
00720                 when the channel was constructed.
00721 
00722         ***********************************************************************/
00723 
00724         char[] getName ()
00725         {
00726                 return name;
00727         }
00728 
00729         /***********************************************************************
00730         
00731                 Output this channel via the provided IWriter
00732 
00733         ***********************************************************************/
00734 
00735         void write (IWriter writer)
00736         {
00737                 writer.put (name);
00738         }
00739 
00740         /***********************************************************************
00741         
00742                 Input this channel via the provided IReader
00743 
00744         ***********************************************************************/
00745 
00746         void read (IReader reader)
00747         {
00748                 reader.get (name);
00749         }
00750 }
00751 
00752 
00753 /*******************************************************************************
00754         
00755         An abstraction of a socket connection. Used internally by the 
00756         socket-based Cluster. 
00757 
00758 *******************************************************************************/
00759 
00760 private class Connection
00761 {
00762         abstract bool reset();
00763 
00764         abstract void done (ulong time);
00765 
00766         abstract SocketConduit getConduit ();
00767 }
00768 
00769 
00770 /*******************************************************************************
00771         
00772         A pool of socket connections for accessing cluster nodes. Note 
00773         that the entries will timeout after a period of inactivity, and
00774         will subsequently cause a connected host to drop the supporting
00775         session.
00776 
00777 *******************************************************************************/
00778 
00779 private class ConnectionPool
00780 { 
00781         private int             count;
00782         private InternetAddress address;
00783         private PoolConnection  freelist;
00784         private const ulong     timeout = 60_000;
00785 
00786         /***********************************************************************
00787         
00788                 Utility class to provide the basic connection facilities
00789                 provided by the connection pool.
00790 
00791         ***********************************************************************/
00792 
00793         class PoolConnection : Connection
00794         {
00795                 ulong           time;
00796                 PoolConnection  next;   
00797                 ConnectionPool  parent;   
00798                 SocketConduit   conduit;
00799 
00800                 /***************************************************************
00801                 
00802                         Construct a new connection and set its parent
00803 
00804                 ***************************************************************/
00805         
00806                 this (ConnectionPool pool)
00807                 {
00808                         parent = pool;
00809                         reset ();
00810                 }
00811                   
00812                 /***************************************************************
00813 
00814                         Return the socket belonging to this connection
00815 
00816                 ***************************************************************/
00817         
00818                 SocketConduit getConduit ()
00819                 {
00820                         return conduit;
00821                 }
00822                   
00823                 /***************************************************************
00824 
00825                         Create a new socket and connect it to the specified 
00826                         server. This will cause a dedicated thread to start 
00827                         on the server. Said thread will quit when an error
00828                         occurs.
00829 
00830                 ***************************************************************/
00831         
00832                 bool reset ()
00833                 {
00834                         try {
00835                             conduit = new SocketConduit (false);
00836                             conduit.connect (parent.address);
00837 
00838                             // set a 500ms timeout for read operations
00839                             conduit.setTimeout (System.Interval.Millisec * 500);
00840                             return true;
00841 
00842                             } catch (Object)
00843                                     {
00844                                     return false;
00845                                     }
00846                 }
00847                   
00848                 /***************************************************************
00849 
00850                         Close the socket. This will cause any host session
00851                         to be terminated.
00852 
00853                 ***************************************************************/
00854         
00855                 void close ()
00856                 {
00857                         conduit.close ();
00858                 }
00859 
00860                 /***************************************************************
00861 
00862                         Return this connection to the free-list. Note that
00863                         we have to synchronize on the parent-pool itself.
00864 
00865                 ***************************************************************/
00866 
00867                 void done (ulong time)
00868                 {
00869                         synchronized (parent)
00870                                      {
00871                                      next = parent.freelist;
00872                                      parent.freelist = this;
00873                                      this.time = time;
00874                                      }
00875                 }
00876         }
00877 
00878 
00879         /***********************************************************************
00880 
00881                 Create a connection-pool for the specified address.
00882 
00883         ***********************************************************************/
00884 
00885         this (InternetAddress address)
00886         {      
00887                 this.address = address;
00888         }
00889 
00890         /***********************************************************************
00891 
00892                 Allocate a Connection from a list rather than creating a 
00893                 new one. Reap old entries as we go.
00894 
00895         ***********************************************************************/
00896 
00897         synchronized Connection borrow (ulong time)
00898         {  
00899                 if (freelist)
00900                     do {
00901                        PoolConnection c = freelist;
00902 
00903                        freelist = c.next;
00904                        if (freelist && (time - c.time > timeout))
00905                            c.close ();
00906                        else
00907                           return c;
00908                        } while (true);
00909 
00910                 return new PoolConnection (this);
00911         }
00912 
00913         /***********************************************************************
00914 
00915                 Close this pool and drop all existing connections.
00916 
00917         ***********************************************************************/
00918 
00919         synchronized void close ()
00920         {       
00921                 PoolConnection c = freelist;
00922                 freelist = null;
00923                 while (c)
00924                       {
00925                       c.close ();
00926                       c = c.next;
00927                       }
00928         }
00929 }
00930 
00931 
00932 /*******************************************************************************
00933         
00934         Class to represent a cluster node. Each node supports both cache
00935         and queue functionality. Note that the set of available nodes is
00936         configured at startup, simplifying the discovery process in some
00937         significant ways, and causing less thrashing of cache-keys.
00938 
00939 *******************************************************************************/
00940 
00941 private class Node
00942 { 
00943         private char[]                  name,
00944                                         port;
00945         private ILogger                 logger;
00946         private bool                    enabled;
00947 
00948         private ConnectionPool          tasks,
00949                                         cache;
00950 
00951         /***********************************************************************
00952 
00953                 Construct a node with the provided name. This name should
00954                 be the network name of the hosting device.
00955 
00956         ***********************************************************************/
00957         
00958         this (ILogger logger, char[] name)
00959         {
00960                 this.logger = logger;
00961                 this.name = name;
00962         }
00963 
00964         /***********************************************************************
00965 
00966                 Add a cache/queue reference for the remote node
00967                  
00968         ***********************************************************************/
00969 
00970         void setCache (InternetAddress address)
00971         {      
00972                 this.cache = new ConnectionPool (address);
00973                 port = Integer.format (new char[5], address.port);
00974         }
00975 
00976         /***********************************************************************
00977 
00978                 Add a cache-loader reference for the remote node
00979 
00980         ***********************************************************************/
00981 
00982         void setTasks (InternetAddress address)
00983         {      
00984                 this.tasks = new ConnectionPool (address);
00985         }
00986 
00987         /***********************************************************************
00988 
00989                 Return the name of this node (the network name of the device)
00990 
00991         ***********************************************************************/
00992         
00993         override char[] toString ()
00994         {
00995                 return name;
00996         }
00997 
00998         /***********************************************************************
00999 
01000                 Remove this Node from the cluster. The node is disabled
01001                 until it is seen to recover.
01002 
01003         ***********************************************************************/
01004 
01005         void fail ()
01006         {       
01007                 setEnabled (false);
01008                 cache.close ();    
01009                 tasks.close ();    
01010         }
01011 
01012         /***********************************************************************
01013 
01014                 Get the current state of this node
01015 
01016         ***********************************************************************/
01017 
01018         bool isEnabled ()
01019         {      
01020                 volatile  
01021                        return enabled;    
01022         }
01023 
01024         /***********************************************************************
01025 
01026                 Set the enabled state of this node
01027 
01028         ***********************************************************************/
01029 
01030         void setEnabled (bool enabled)
01031         {      
01032                 if (logger && logger.isEnabled (logger.Level.Trace))
01033                    {
01034                    if (enabled)
01035                        logger.trace ("enabling node '"~name~"' on port "~port);
01036                    else
01037                       logger.trace ("disabling node '"~name~"'");
01038                    }
01039                 volatile  
01040                        this.enabled = enabled;    
01041         }
01042 
01043         /***********************************************************************
01044 
01045                 request data; fail this Node if we can't connect. Note
01046                 that we make several attempts to connect before writing
01047                 the node off as a failure.
01048                 
01049         ***********************************************************************/
01050         
01051         bool request (ConnectionPool pool, ProtocolWriter writer, ProtocolReader reader, out IPayload payload)
01052         {       
01053                 ProtocolWriter.Command  cmd;
01054                 ulong                   time;
01055                 char[]                  channel;
01056                 char[]                  element;
01057                 Connection              connect;
01058 
01059                 // it's possible that the pool may have failed between 
01060                 // the point of selecting it, and the invocation itself
01061                 if (pool is null)
01062                     return false;
01063 
01064                 // get a connection to the server
01065                 connect = pool.borrow (time = System.getMillisecs);
01066 
01067                 // talk to the server (try a few times if necessary)
01068                 for (int attempts=3; attempts--;)
01069                      try {
01070                          // attach connection to reader and writer 
01071                          writer.getBuffer.setConduit (connect.getConduit);
01072                          reader.getBuffer.setConduit (connect.getConduit);
01073         
01074                          // send request
01075                          writer.flush ();
01076 
01077                          // load the returned object (don't retry!)
01078                          attempts = 0;
01079                          payload = reader.getPayload (channel, element, cmd);
01080 
01081                          // return borrowed connection
01082                          connect.done (time);
01083 
01084                          } catch (PickleException x)
01085                                  {
01086                                  connect.done (time);
01087                                  throw x;
01088                                  }
01089                            catch (IOException x)
01090                                   // attempt to reconnect?
01091                                   if (attempts == 0 || !connect.reset)
01092                                      {
01093                                      // that server is offline
01094                                      fail ();
01095   
01096                                      // state that we failed
01097                                      return false;
01098                                      }
01099                     
01100                 // is payload an exception?
01101                 if (cmd != ProtocolWriter.Command.OK)                       
01102                    {
01103                    // is node full?
01104                    if (cmd == ProtocolWriter.Command.Full)                       
01105                        throw new ClusterFullException (channel);
01106 
01107                    // did node barf?
01108                    if (cmd == ProtocolWriter.Command.Exception)                       
01109                        throw new ClusterException (channel);
01110                         
01111                    // bogus response
01112                    throw new ClusterException ("invalid response from cluster server");
01113                    }
01114 
01115                 return true;
01116         }
01117 }
01118 
01119 
01120 /*******************************************************************************
01121         
01122         Models a set of remote cluster nodes.
01123 
01124 *******************************************************************************/
01125 
01126 private class NodeSet
01127 { 
01128         private HashMap         map;
01129         private Node[]          nodes;
01130         private Node[]          random;
01131         private ILogger         logger;
01132 
01133         /***********************************************************************
01134 
01135         ***********************************************************************/
01136         
01137         this (ILogger logger)
01138         {
01139                 this.logger = logger;
01140                 map = new HashMap (128, 0.75, 4);
01141         }
01142 
01143         /***********************************************************************
01144 
01145         ***********************************************************************/
01146         
01147         ILogger getLogger ()
01148         {
01149                 return logger;
01150         }
01151 
01152         /***********************************************************************
01153 
01154                 Add a node to the list of servers, and sort them such that
01155                 all clients will have the same ordered set
01156 
01157         ***********************************************************************/
01158         
01159         synchronized void addNode (Node node)
01160         {
01161                 char[]  name = node.toString;
01162 
01163                 if (map.get (name))
01164                     throw new ClusterException ("Attempt to add cluster node '"~name~"' more than once");
01165 
01166                 map.put (name, node);
01167                 nodes ~= node;
01168                 //nodes.sort;
01169         }
01170 
01171         /***********************************************************************
01172 
01173         ***********************************************************************/
01174         
01175         void optimize ()
01176         {
01177                 // nodes are already in the same order across the cluster
01178                 // since the configuration file should be identical ...
01179 
01180                 // nodes.sort;
01181                 
01182                 // copy the node list
01183                 random = nodes.dup;
01184 
01185                 // muddle up the duplicate list. This randomized list
01186                 // is used when scanning the cluster for queued entries
01187                 foreach (int i, Node n; random)
01188                         {
01189                         int j = Random.get (random.length);
01190                         Node tmp = random[i];
01191                         random[i] = random[j];
01192                         random[j] = tmp;
01193                         }
01194         }
01195 
01196         /***********************************************************************
01197 
01198         ***********************************************************************/
01199         
01200         synchronized void enable (char[] name, ushort port1, ushort port2)
01201         {
01202                 Node node = cast(Node) map.get (name);
01203                 if (node is null)
01204                     throw new ClusterException ("Attempt to enable unknown cluster node '"~name~"'");
01205                 else
01206                    if (! node.isEnabled)
01207                       {
01208                       node.setCache (new InternetAddress (name, port1));
01209                       node.setTasks (new InternetAddress (name, port2));
01210                       node.setEnabled (true);
01211                       }
01212         }
01213 
01214         /***********************************************************************
01215 
01216         ***********************************************************************/
01217         
01218         IPayload request (ProtocolWriter writer, ProtocolReader reader, char[] key = null)
01219         {
01220                 Node     node;
01221                 IPayload payload;
01222 
01223                 do {
01224                    node = selectNode (key);
01225                    } while (! node.request (node.cache, writer, reader, payload));
01226                 return payload;
01227         }
01228 
01229         /***********************************************************************
01230 
01231                 Select a cluster server based on a starting index. If the
01232                 selected server is not currently enabled, we just try the
01233                 next one. This behaviour should be consistent across each
01234                 cluster client.
01235 
01236         ***********************************************************************/
01237         
01238         private final Node selectNode (uint index)
01239         {
01240                 uint i = nodes.length;
01241 
01242                 if (i)
01243                    {
01244                    index %= i;
01245 
01246                    while (i--)
01247                          {
01248                          Node node = nodes[index];
01249                          if (node.isEnabled)
01250                              return node;
01251         
01252                          if (++index >= nodes.length)
01253                              index = 0;
01254                          }
01255                    }
01256                 throw new ClusterEmptyException ("No cluster servers are available"); 
01257         }
01258 
01259         /***********************************************************************
01260 
01261                 Select a cluster server based on the specified key. If the
01262                 selected server is not currently enabled, we just try the
01263                 next one. This behaviour should be consistent across each
01264                 cluster client.
01265 
01266         ***********************************************************************/
01267         
01268         final Node selectNode (char[] key = null)
01269         {
01270                 static uint index;
01271               
01272                 if (key.length)
01273                     return selectNode (HashMap.jhash (key));
01274 
01275                 // no key provided, so just roll the counter
01276                 return selectNode (++index);
01277         }
01278 
01279         /***********************************************************************
01280 
01281                 Sweep the cluster servers. Returns true if the delegate
01282                 returns true, false otherwise. The sweep is aborted when
01283                 the delegate returns true. Note that this scans nodes in
01284                 a randomized pattern, which should tend to avoid 'bursty'
01285                 activity by a set of clients upon any one cluster server.
01286 
01287         ***********************************************************************/
01288         
01289         bool scan (bool delegate (Node) dg)
01290         {
01291                 Node    node;
01292                 int     index = nodes.length;
01293 
01294                 while (index--)
01295                       {
01296                       // lookup the randomized set of server nodes
01297                       node = random [index];
01298 
01299                       // callback on each enabled node
01300                       if (node.isEnabled)
01301                           if (dg (node))
01302                               return true;
01303                       }
01304                 return false;
01305         }
01306 }
01307 
01308  
01309 

Generated on Sat Dec 24 17:28:32 2005 for Mango by  doxygen 1.4.0