Main Page | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | File Members | Related Pages

Cluster.d

Go to the documentation of this file.
00001 /*******************************************************************************
00002 
00003         @file Cluster.d
00004         
00005         Copyright (c) 2004 Kris Bell
00006         
00007         This software is provided 'as-is', without any express or implied
00008         warranty. In no event will the authors be held liable for damages
00009         of any kind arising from the use of this software.
00010         
00011         Permission is hereby granted to anyone to use this software for any 
00012         purpose, including commercial applications, and to alter it and/or 
00013         redistribute it freely, subject to the following restrictions:
00014         
00015         1. The origin of this software must not be misrepresented; you must 
00016            not claim that you wrote the original software. If you use this 
00017            software in a product, an acknowledgment within documentation of 
00018            said product would be appreciated but is not required.
00019 
00020         2. Altered source versions must be plainly marked as such, and must 
00021            not be misrepresented as being the original software.
00022 
00023         3. This notice may not be removed or altered from any distribution
00024            of the source.
00025 
00026         4. Derivative works are permitted, but they must carry this notice
00027            in full and credit the original source.
00028 
00029 
00030                         ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
00031 
00032 
00033         @version        Initial version, July 2004      
00034         @author         Kris
00035 
00036 
00037 *******************************************************************************/
00038 
00039 module mango.cluster.qos.socket.Cluster;
00040 
00041 private import  mango.cache.HashMap;
00042 
00043 private import  mango.utils.Text,
00044                 mango.utils.Random;
00045 
00046 private import  mango.format.Int;
00047 
00048 private import  mango.base.System;
00049 
00050 private import  mango.io.Buffer,
00051                 mango.io.Socket,
00052                 mango.io.Exception,
00053                 mango.io.Properties,
00054                 mango.io.ArrayAllocator,
00055                 mango.io.SocketConduit,
00056                 mango.io.SocketListener,
00057                 mango.io.MulticastSocket;
00058 
00059 private import  mango.io.model.IConduit;
00060 
00061 private import  mango.log.model.ILogger;
00062 
00063 private import  mango.cluster.Client;
00064 
00065 public  import  mango.cluster.model.ICluster;
00066 
00067 private import  mango.cluster.qos.socket.RollCall,
00068                 mango.cluster.qos.socket.ClusterEvent,
00069                 mango.cluster.qos.socket.ProtocolReader,
00070                 mango.cluster.qos.socket.ProtocolWriter;
00071 
00072 
00073 /*******************************************************************************
00074         
00075         QOS implementation for sockets. All cluster-client activity is 
00076         gated through here by the higher level classes; NetworkQueue &
00077         NetworkCache for example. You gain access to the cluster by 
00078         creating an instance of the QOS (quality of service) you desire
00079         and mapping client classes onto it. For example:
00080 
00081         @code
00082         import mango.cluster.NetworkCache;
00083         import mango.cluster.qos.socket.Cluster;
00084 
00085         ICluster cluster = new Cluster (...);
00086         NetworkCache cache = new NetworkCache (cluster, ...);
00087 
00088         cache.put (...);
00089         cache.get (...);
00090         cache.invalidate (...);
00091         @endcode
00092 
00093         Please see the cluster clients for additional details. Currently
00094         these include CacheInvalidator, CacheInvalidatee, NetworkMessage, 
00095         NetworkTask, NetworkQueue, NetworkCache, NetworkCombo, plus the 
00096         Client base-class.
00097 
00098 *******************************************************************************/
00099 
00100 class Cluster : ICluster, IEventListener
00101 {  
00102         private static HashMap                  groups;
00103         private ILogger                         logger;
00104         private NodeSet                         nodeSet;
00105         private Buffer                          mBuffer;
00106         private ProtocolWriter                  mWriter;
00107         private MulticastSocket                 mSocket;
00108 
00109         private int                             groupTTL = 1;
00110         private int                             groupPort = 3333;
00111         private int                             groupPrefix = 225;
00112      
00113         /***********************************************************************
00114 
00115                 Setup a hashmap for multicast group addresses
00116 
00117         ***********************************************************************/
00118         
00119         static this ()
00120         {
00121                 groups = new HashMap (128, 0.75, 2);
00122         }
00123 
00124         /***********************************************************************
00125 
00126                 Setup a Cluster instance. Currently the buffer & writer
00127                 are shared for all bulletin serialization; this should
00128                 probably change at some point such that we can support 
00129                 multiple threads broadcasting concurrently to different 
00130                 output ports.
00131 
00132         ***********************************************************************/
00133         
00134         this (ILogger logger = null)
00135         {
00136                 this.logger = logger;
00137                 nodeSet = new NodeSet (logger);
00138                 mBuffer = new Buffer (1024 * 4);
00139                 mSocket = new MulticastSocket;
00140                 mWriter = new ProtocolWriter (mBuffer);
00141         }
00142 
00143         /***********************************************************************
00144 
00145                 Setup a Cluster instance. Currently the buffer & writer
00146                 are shared for all bulletin serialization; this should
00147                 probably change at some point such that we can support 
00148                 multiple threads broadcasting concurrently to different 
00149                 output ports.
00150 
00151         ***********************************************************************/
00152         
00153         this (ILogger logger, IConduit conduit)
00154         in {
00155            assert (logger);
00156            assert (conduit);
00157            }
00158         body
00159         {
00160                 this (logger);
00161 
00162                 // callback for loading cluster configuration 
00163                 void loader (char[] name, char[] value)
00164                 {
00165                         logger.info ("cluster config: "~name~" = "~value);
00166                         if (name == "node")
00167                             nodeSet.addNode (new Node (logger, value));
00168                         else
00169                         if (name == "multicast_port")
00170                             groupPort = Int.parse (value);
00171                         else
00172                            if (name == "multicast_prefix")
00173                                groupPrefix = Int.parse (value);
00174                         else
00175                            if (name == "multicast_ttl")
00176                                groupTTL = Int.parse (value);
00177                         else
00178                            throw new ClusterException ("Unrecognized attribute '"~name~"' in socket.Cluster configuration");
00179                 }
00180 
00181 
00182                 // load up the cluster configuration
00183                 Properties.load (conduit, &loader);
00184 
00185                 // finalize nodeSet 
00186                 nodeSet.optimize ();
00187 
00188                 // listen for cluster servers
00189                 IChannel channel = createChannel ("cluster.server.advertise");
00190                 createConsumer (channel, IEvent.Style.Bulletin, this);
00191 
00192                 // ask who's currently running
00193                 logger.trace ("discovering active cluster servers ...");
00194                 broadcast (channel, new RollCall (Socket.hostName(), 0, 0, true));
00195 
00196                 // wait for enabled servers to respond ...
00197                 System.sleep (System.Interval.Millisec * 250);
00198         }
00199 
00200         /***********************************************************************
00201 
00202                 Setup a Cluster instance. Currently the buffer & writer
00203                 are shared for all bulletin serialization; this should
00204                 probably change at some point such that we can support 
00205                 multiple threads broadcasting concurrently to different 
00206                 output ports.
00207 
00208         ***********************************************************************/
00209         
00210         this (ILogger logger, uint serverPort)
00211         in {
00212            assert (logger);
00213            assert (serverPort > 1024);
00214            }
00215         body
00216         {
00217                 this (logger);
00218 
00219                 Node node = new Node (logger, "local");
00220                 node.setCache (new InternetAddress ("localhost", serverPort));
00221                 node.setTasks (new InternetAddress ("localhost", serverPort+1));
00222                 node.setEnabled (true);
00223                 nodeSet.addNode (node);
00224                 nodeSet.optimize ();
00225         }
00226 
00227         /***********************************************************************
00228 
00229                 IEventListener interface method for listening to RollCall
00230                 responses. These are sent out by cluster servers both when 
00231                 they get a RollCall request, and when they begin execution.
00232 
00233         ***********************************************************************/
00234 
00235         void notify (IEvent event, IPayload payload)
00236         {
00237                 RollCall rollcall = cast(RollCall) payload;
00238 
00239                 // ignore requests from clients (we're on a common channel)
00240                 if (! rollcall.request)
00241                       nodeSet.enable (rollcall.name, rollcall.port1, rollcall.port2);
00242         }
00243 
00244         /***********************************************************************
00245 
00246                 Create a channel instance. Our channel implementation 
00247                 includes a number of cached IO helpers (ProtolcolWriter
00248                 and so on) which simplifies and speeds up execution.
00249 
00250         ***********************************************************************/
00251         
00252         IChannel createChannel (char[] channel)
00253         {
00254                 return new Channel (channel);
00255         }
00256 
00257         /***********************************************************************
00258 
00259                 Return the logger instance provided during construction.
00260                 
00261         ***********************************************************************/
00262         
00263         ILogger getLogger ()
00264         {
00265                 return logger;
00266         }
00267 
00268         /***********************************************************************
00269 
00270                 Broadcast a payload on the specified channel. This uses
00271                 IP/Multicast to scatter the payload to all registered
00272                 listeners (on the same multicast group). Note that the
00273                 maximum payload size is limited to that of an Ethernet 
00274                 data frame, minus the IP/UDP header size (1472 bytes).
00275 
00276         ***********************************************************************/
00277         
00278         synchronized void broadcast (IChannel channel, IPayload payload = null)
00279         {
00280                 // serialize content
00281                 mBuffer.clear ();
00282                 mWriter.put (ProtocolWriter.Command.OK, channel.getName, payload);
00283 
00284                 // Ethernet data-frame size minus the 28 byte UDP/IP header:
00285                 if (mBuffer.getPosition > 1472)
00286                     throw new ClusterException ("payload is too large to broadcast");
00287 
00288                 // send it to the appropriate multicast group
00289                 mSocket.write (mBuffer, getGroup (channel.getName));
00290         }
00291 
00292         /***********************************************************************
00293 
00294                 Create a listener of the specified type. Listeners are 
00295                 run within their own thread, since they spend the vast 
00296                 majority of their time blocked on a Socket read. Would
00297                 be good to support multiplexed reading instead, such 
00298                 that a thread pool could be applied instead.
00299                  
00300         ***********************************************************************/
00301         
00302         IConsumer createConsumer (IChannel channel, IEvent.Style style, 
00303                                   IEventListener notify)
00304         {
00305                 IEvent event = new ClusterEvent (this, channel, style, notify);
00306                 
00307                 if (logger)
00308                     logger.info ("creating " ~ event.getStyleName ~ 
00309                                  " consumer for channel '" ~ channel.getName ~ "'");
00310 
00311                 switch (style)
00312                        {
00313                        case IEvent.Style.Message:
00314                             return new MessageConsumer (this, event);
00315 
00316                        case IEvent.Style.Bulletin:
00317                             return new BulletinConsumer (this, event);
00318                        }     
00319         }
00320 
00321         /***********************************************************************
00322 
00323                 Return a entry from the network cache, and optionally
00324                 remove it. This is a synchronous operation as opposed
00325                 to the asynchronous nature of an invalidate broadcast.
00326 
00327         ***********************************************************************/
00328         
00329         IPayload getCache (IChannel channel, char[] key, bool remove)
00330         {
00331                 Channel c = cast(Channel) channel;
00332                 
00333                 c.writer.put (remove ? ProtocolWriter.Command.Remove : 
00334                               ProtocolWriter.Command.Copy, c.getName, null, key);
00335                 return nodeSet.request (c.writer, c.reader, key);
00336         }
00337 
00338         /***********************************************************************
00339 
00340                 Place an entry into the network cache, replacing the
00341                 entry with the identical key. Note that this may cause
00342                 the oldest entry in the cache to be displaced if the
00343                 cache is already full.
00344 
00345         ***********************************************************************/
00346         
00347         IPayload putCache (IChannel channel, char[] key, IPayload payload)
00348         {
00349                 Channel c = cast(Channel) channel;
00350 
00351                 c.writer.put (ProtocolWriter.Command.Add, c.getName, payload, key);
00352                 return nodeSet.request (c.writer, c.reader, key);
00353         }
00354 
00355         /***********************************************************************
00356 
00357                 Add an entry to the specified network queue. May throw a
00358                 QueueFullException if there's no room available.
00359 
00360         ***********************************************************************/
00361         
00362         IPayload putQueue (IChannel channel, IPayload payload)
00363         {
00364                 Channel c = cast(Channel) channel;
00365                 
00366                 c.writer.put (ProtocolWriter.Command.AddQueue, c.getName, payload);
00367                 nodeSet.request (c.writer, c.reader);
00368                 return payload;
00369         }
00370 
00371 
00372         /***********************************************************************
00373                 
00374                 Query the cluster for queued entries on the corresponding 
00375                 channel. Returns, and removes, the first matching entry 
00376                 from the cluster. Note that this sweeps the cluster for
00377                 matching entries, and is synchronous in nature. The more
00378                 common approach is to setup a queue listener, which will
00379                 grab and dispatch queue entries asynchronously.
00380 
00381         ***********************************************************************/
00382         
00383         IPayload getQueue (IChannel channel)
00384         {
00385                 IPayload payload;
00386 
00387                 Channel c = cast(Channel) channel;
00388 
00389                 // callback for NodeSet.scan()
00390                 bool scan (Node node)
00391                 {
00392                         // ask this node ...
00393                         c.writer.put (ProtocolWriter.Command.RemoveQueue, c.getName);
00394                         node.request (node.cache, c.writer, c.reader, payload);
00395                         return payload !== null;
00396                 }
00397 
00398                 // make a pass over each Node, looking for channel entries
00399                 nodeSet.scan (&scan);
00400                 return payload;
00401         }
00402 
00403         /***********************************************************************
00404                 
00405                 Load a network cache entry remotely. This sends the given
00406                 Payload over the network to the cache host, where it will
00407                 be executed locally. The benefit of doing so it that the
00408                 host may deny access to the cache entry for the duration
00409                 of the load operation. This, in turn, provides an elegant 
00410                 mechanism for gating/synchronizing multiple network clients  
00411                 over a given cache entry; handy for those entries that are 
00412                 relatively expensive to construct or access. 
00413 
00414         ***********************************************************************/
00415         
00416         void loadCache (IChannel channel, char[] key, IPayload payload)
00417         {               
00418                 Channel c = cast(Channel) channel;
00419 
00420                 c.writer.put (ProtocolWriter.Command.OK, c.getName, payload, key);
00421                 Node node = nodeSet.selectNode (key);
00422                 node.request (node.tasks, c.writer, c.reader, payload);
00423         }
00424 
00425         /***********************************************************************
00426 
00427                 Return an internet address representing the multicast
00428                 group for the specified channel. We use three of the
00429                 four address segments to represent the channel itself
00430                 (via a hash on the channel name), and set the primary
00431                 segment to be that of the broadcast prefix (above).
00432 
00433         ***********************************************************************/
00434         
00435         synchronized InternetAddress getGroup (char[] channel)
00436         {
00437                 InternetAddress group = cast(InternetAddress) groups.get (channel);
00438 
00439                 if (group is null)
00440                    {
00441                    char[5] tmp;
00442                    char[]  address;
00443 
00444                    // construct a group address from the prefix & channel-hash,
00445                    // where the hash is folded down to 24 bits
00446                    uint hash = groups.jhash (channel);
00447                    hash = (hash >> 24) ^ (hash & 0x00ffffff);
00448                   
00449                    address = Int.format (groupPrefix) ~ "." ~
00450                              Int.format ((hash >> 16) & 0xff) ~ "." ~
00451                              Int.format ((hash >> 8) & 0xff) ~ "." ~
00452                              Int.format (hash & 0xff);
00453 
00454                    //printf ("channel '%.*s' == '%.*s'\n", channel, address);
00455         
00456                    // insert InternetAddress into hashmap
00457                    group = new InternetAddress (address, groupPort);
00458                    groups.put (channel, group);
00459                    }
00460                 return group;              
00461         }
00462 }
00463 
00464 
00465 /*******************************************************************************         
00466 
00467         A listener for multicast channel traffic. These are currently used 
00468         for cache coherency, queue publishing, and node discovery activity; 
00469         though could be used for direct messaging also.
00470 
00471 *******************************************************************************/
00472 
00473 private class BulletinConsumer : SocketListener, IConsumer
00474 {
00475         private IEvent                  event;
00476         private Buffer                  buffer;
00477         private ProtocolReader          reader;
00478         private Cluster                 cluster;
00479         private MulticastSocket         consumer;
00480 
00481         /***********************************************************************
00482 
00483                 Construct a multicast consumer for the specified event. The
00484                 event handler will be invoked whenever a message arrives for
00485                 the associated channel.
00486 
00487         ***********************************************************************/
00488         
00489         this (Cluster cluster, IEvent event)
00490         {
00491                 this.event = event;
00492                 this.cluster = cluster;
00493 
00494                 // buffer doesn't need to be larger than Ethernet data-frame
00495                 buffer = new Buffer (1500);
00496 
00497                 // make the reader slice directly from the buffer content
00498                 reader = new ProtocolReader (buffer);
00499                 reader.setAllocator (new BufferAllocator);
00500 
00501                 // configure a listener socket
00502                 consumer = new MulticastSocket;
00503                 consumer.join (cluster.getGroup (event.getChannel.getName));
00504                 
00505                 super (consumer, buffer);
00506 
00507                 // fire up this listener
00508                 start ();
00509         }
00510 
00511         /***********************************************************************
00512 
00513                 Notification callback invoked when we receive a multicast
00514                 packet. Note that we check the packet channel-name against
00515                 the one we're consuming, to check for cases where the group
00516                 address had a hash collision.
00517 
00518         ***********************************************************************/
00519         
00520         override void notify (IBuffer buffer)
00521         {
00522                 ProtocolWriter.Command  cmd;
00523                 char[]                  channel;
00524                 char[]                  element;
00525                 
00526                 IPayload payload = reader.getPayload (channel, element, cmd);
00527                 // printf ("notify '%.*s::%.*s' #'%.*s'\n", channel, element, event.getChannel.getName);
00528 
00529                 // check it's really for us first (might be a hash collision)
00530                 if (channel == event.getChannel.getName)
00531                     invoke (event, payload);
00532         }
00533 
00534         /***********************************************************************
00535 
00536                 Handle error conditions from the listener thread.
00537 
00538         ***********************************************************************/
00539 
00540         override void exception (char [] msg)
00541         {
00542                 cluster.getLogger.error ("BulletinConsumer: "~msg);
00543         }
00544 
00545         /***********************************************************************
00546 
00547                 Overridable mean of notifying the client code.
00548                  
00549         ***********************************************************************/
00550         
00551         protected void invoke (IEvent event, IPayload payload)
00552         {
00553                 event.invoke (payload);
00554         }
00555 
00556         /***********************************************************************
00557 
00558                 Return the cluster instance we're associated with.
00559 
00560         ***********************************************************************/
00561         
00562         Cluster getCluster ()
00563         {
00564                 return cluster;
00565         }
00566 
00567         /***********************************************************************
00568 
00569                 Temporarily halt listening. This can be used to ignore
00570                 multicast messages while, for example, the consumer is
00571                 busy doing other things.
00572 
00573         ***********************************************************************/
00574 
00575         void pauseGroup ()
00576         {
00577                 consumer.pauseGroup ();
00578         }
00579 
00580         /***********************************************************************
00581 
00582                 Resume listening, post-pause.
00583 
00584         ***********************************************************************/
00585 
00586         void resumeGroup ()
00587         {
00588                 consumer.resumeGroup ();
00589         }
00590 
00591         /***********************************************************************
00592 
00593                 Cancel this consumer. The listener is effectively disabled
00594                 from this point forward. The listener thread does not halt
00595                 at this point, but waits until the socket-read returns. 
00596                 Note that the D Interface implementation requires us to 
00597                 "reimplement and dispatch" trivial things like this ~ it's
00598                 a pain in the neck to maintain.
00599 
00600         ***********************************************************************/
00601         
00602         void cancel ()
00603         {
00604                 super.cancel ();
00605         }
00606 }
00607 
00608 
00609 /*******************************************************************************
00610         
00611         A listener for queue events. These events are produced by the 
00612         queue host on a periodic bases when it has available entries.
00613         We listen for them (rather than constantly scanning) and then
00614         begin a sweep to process as many as we can. Note that we will
00615         be in competition with other nodes to process these entries.
00616 
00617 *******************************************************************************/
00618 
00619 private class MessageConsumer : BulletinConsumer
00620 {
00621         /***********************************************************************
00622 
00623                 Construct a multicast consumer for the specified event
00624 
00625         ***********************************************************************/
00626         
00627         this (Cluster cluster, IEvent event)
00628         {
00629                 super (cluster, event);
00630         }
00631 
00632         /***********************************************************************
00633 
00634                 Handle error conditions from the listener thread.
00635 
00636         ***********************************************************************/
00637 
00638         override void exception (char [] msg)
00639         {
00640                 cluster.getLogger.error ("MessageConsumer: "~msg);
00641         }
00642 
00643         /***********************************************************************
00644 
00645                 override the default processing to sweep the cluster for 
00646                 queued entries. Each server node is queried until one is
00647                 found that contains a payload. Note that it is possible 
00648                 to set things up where we are told exactly which node to
00649                 go to; howerver given that we won't be listening whilst
00650                 scanning, and that there's likely to be a group of new
00651                 entries in the cluster, it's just as effective to scan.
00652                 This will be far from ideal for all environments, so we 
00653                 should make the strategy plugable instead.                 
00654 
00655         ***********************************************************************/
00656         
00657         override protected void invoke (IEvent event, IPayload payload)
00658         {
00659                 //temporarilty pause listening
00660                 pauseGroup ();
00661 
00662                 try {
00663                     while ((payload = getCluster.getQueue (event.getChannel)) !== null)
00664                             event.invoke (payload);
00665                     } finally 
00666                             // resume listening
00667                             resumeGroup ();
00668         }
00669 }
00670 
00671 
00672 /*******************************************************************************
00673         
00674         A channel represents something akin to a publish/subscribe topic, 
00675         or a radio station. These are used to segregate cluster operations
00676         into a set of groups, where each group is represented by a channel.
00677         Channel names are whatever you want then to be: use of dot notation 
00678         has proved useful in the past. See Client.createChannel
00679 
00680 *******************************************************************************/
00681 
00682 private class Channel : IChannel
00683 {
00684         char[]                  name;
00685         Buffer                  buffer;
00686         ProtocolReader          reader;
00687         ProtocolWriter          writer;
00688 
00689         /***********************************************************************
00690         
00691                 Construct a channel with the specified name. We cache
00692                 a number of session-related constructs here also, in
00693                 order to eliminate runtime overhead
00694 
00695         ***********************************************************************/
00696 
00697         this (char[] name)
00698         in {
00699            assert (name.length);
00700            }
00701         body
00702         {       
00703                 this.name = name;
00704 
00705                 // this buffer will grow as required to house larger payloads
00706                 buffer = new GrowableBuffer (1024 * 2);
00707                 writer = new ProtocolWriter (buffer);
00708 
00709                 // make the reader slice directly from the buffer content
00710                 reader = new ProtocolReader (buffer);
00711                 reader.setAllocator (new BufferAllocator);
00712         }
00713 
00714         /***********************************************************************
00715         
00716                 Return the name of this channel. This is the name provided
00717                 when the channel was constructed.
00718 
00719         ***********************************************************************/
00720 
00721         char[] getName ()
00722         {
00723                 return name;
00724         }
00725 
00726         /***********************************************************************
00727         
00728                 Output this channel via the provided IWriter
00729 
00730         ***********************************************************************/
00731 
00732         void write (IWriter writer)
00733         {
00734                 writer.put (name);
00735         }
00736 
00737         /***********************************************************************
00738         
00739                 Input this channel via the provided IReader
00740 
00741         ***********************************************************************/
00742 
00743         void read (IReader reader)
00744         {
00745                 reader.get (name);
00746         }
00747 }
00748 
00749 
00750 /*******************************************************************************
00751         
00752         An abstraction of a socket connection. Used internally by the 
00753         socket-based Cluster. 
00754 
00755 *******************************************************************************/
00756 
00757 private class Connection
00758 {
00759         abstract bool reset();
00760 
00761         abstract void done (ulong time);
00762 
00763         abstract SocketConduit getConduit ();
00764 }
00765 
00766 
00767 /*******************************************************************************
00768         
00769         A pool of socket connections for accessing cluster nodes. Note 
00770         that the entries will timeout after a period of inactivity, and
00771         will subsequently cause a connected host to drop the supporting
00772         session.
00773 
00774 *******************************************************************************/
00775 
00776 private class ConnectionPool
00777 { 
00778         private int             count;
00779         private InternetAddress address;
00780         private PoolConnection  freelist;
00781         private const ulong     timeout = 60_000;
00782 
00783         /***********************************************************************
00784         
00785                 Utility class to provide the basic connection facilities
00786                 provided by the connection pool.
00787 
00788         ***********************************************************************/
00789 
00790         class PoolConnection : Connection
00791         {
00792                 ulong           time;
00793                 PoolConnection  next;   
00794                 ConnectionPool  parent;   
00795                 SocketConduit   conduit;
00796 
00797                 /***************************************************************
00798                 
00799                         Construct a new connection and set its parent
00800 
00801                 ***************************************************************/
00802         
00803                 this (ConnectionPool pool)
00804                 {
00805                         parent = pool;
00806                         reset ();
00807                 }
00808                   
00809                 /***************************************************************
00810 
00811                         Return the socket belonging to this connection
00812 
00813                 ***************************************************************/
00814         
00815                 SocketConduit getConduit ()
00816                 {
00817                         return conduit;
00818                 }
00819                   
00820                 /***************************************************************
00821 
00822                         Create a new socket and connect it to the specified 
00823                         server. This will cause a dedicated thread to start 
00824                         on the server. Said thread will quit when an error
00825                         occurs.
00826 
00827                 ***************************************************************/
00828         
00829                 bool reset ()
00830                 {
00831                         try {
00832                             conduit = new SocketConduit;
00833                             conduit.connect (parent.address);
00834 
00835                             // set a 500ms timeout for read operations
00836                             conduit.setTimeout (System.Interval.Millisec * 500);
00837                             return true;
00838 
00839                             } catch (Object)
00840                                     {
00841                                     return false;
00842                                     }
00843                 }
00844                   
00845                 /***************************************************************
00846 
00847                         Close the socket. This will cause any host session
00848                         to be terminated.
00849 
00850                 ***************************************************************/
00851         
00852                 void close ()
00853                 {
00854                         conduit.close ();
00855                 }
00856 
00857                 /***************************************************************
00858 
00859                         Return this connection to the free-list. Note that
00860                         we have to synchronize on the parent-pool itself.
00861 
00862                 ***************************************************************/
00863 
00864                 void done (ulong time)
00865                 {
00866                         synchronized (parent)
00867                                      {
00868                                      next = parent.freelist;
00869                                      parent.freelist = this;
00870                                      this.time = time;
00871                                      }
00872                 }
00873         }
00874 
00875 
00876         /***********************************************************************
00877 
00878                 Create a connection-pool for the specified address.
00879 
00880         ***********************************************************************/
00881 
00882         this (InternetAddress address)
00883         {      
00884                 this.address = address;
00885         }
00886 
00887         /***********************************************************************
00888 
00889                 Allocate a Connection from a list rather than creating a 
00890                 new one. Reap old entries as we go.
00891 
00892         ***********************************************************************/
00893 
00894         synchronized Connection borrow (ulong time)
00895         {  
00896                 if (freelist)
00897                     do {
00898                        PoolConnection c = freelist;
00899 
00900                        freelist = c.next;
00901                        if (freelist && (time - c.time > timeout))
00902                            c.close ();
00903                        else
00904                           return c;
00905                        } while (true);
00906 
00907                 return new PoolConnection (this);
00908         }
00909 
00910         /***********************************************************************
00911 
00912                 Close this pool and drop all existing connections.
00913 
00914         ***********************************************************************/
00915 
00916         synchronized void close ()
00917         {       
00918                 PoolConnection c = freelist;
00919                 freelist = null;
00920                 while (c)
00921                       {
00922                       c.close ();
00923                       c = c.next;
00924                       }
00925         }
00926 }
00927 
00928 
00929 /*******************************************************************************
00930         
00931         Class to represent a cluster node. Each node supports both cache
00932         and queue functionality. Note that the set of available nodes is
00933         configured at startup, simplifying the discovery process in some
00934         significant ways, and causing less thrashing of cache-keys.
00935 
00936 *******************************************************************************/
00937 
00938 private class Node
00939 { 
00940         private char[]                  name,
00941                                         port;
00942         private ILogger                 logger;
00943         private bool                    enabled;
00944 
00945         private ConnectionPool          tasks,
00946                                         cache;
00947 
00948         /***********************************************************************
00949 
00950                 Construct a node with the provided name. This name should
00951                 be the network name of the hosting device.
00952 
00953         ***********************************************************************/
00954         
00955         this (ILogger logger, char[] name)
00956         {
00957                 this.logger = logger;
00958                 this.name = name;
00959         }
00960 
00961         /***********************************************************************
00962 
00963                 Add a cache/queue reference for the remote node
00964                  
00965         ***********************************************************************/
00966 
00967         void setCache (InternetAddress address)
00968         {      
00969                 this.cache = new ConnectionPool (address);
00970                 port = Int.format (address.port);
00971         }
00972 
00973         /***********************************************************************
00974 
00975                 Add a cache-loader reference for the remote node
00976 
00977         ***********************************************************************/
00978 
00979         void setTasks (InternetAddress address)
00980         {      
00981                 this.tasks = new ConnectionPool (address);
00982         }
00983 
00984         /***********************************************************************
00985 
00986                 Return the name of this node (the network name of the device)
00987 
00988         ***********************************************************************/
00989         
00990         override char[] toString ()
00991         {
00992                 return name;
00993         }
00994 
00995         /***********************************************************************
00996 
00997                 Remove this Node from the cluster. The node is disabled
00998                 until it is seen to recover.
00999 
01000         ***********************************************************************/
01001 
01002         void fail ()
01003         {       
01004                 setEnabled (false);
01005                 cache.close ();    
01006                 tasks.close ();    
01007         }
01008 
01009         /***********************************************************************
01010 
01011                 Get the current state of this node
01012 
01013         ***********************************************************************/
01014 
01015         bool isEnabled ()
01016         {      
01017                 volatile  
01018                        return enabled;    
01019         }
01020 
01021         /***********************************************************************
01022 
01023                 Set the enabled state of this node
01024 
01025         ***********************************************************************/
01026 
01027         void setEnabled (bool enabled)
01028         {      
01029                 if (logger && logger.isEnabled (logger.Level.Trace))
01030                    {
01031                    if (enabled)
01032                        logger.trace ("enabling node '"~name~"' on port "~port);
01033                    else
01034                       logger.trace ("disabling node '"~name~"'");
01035                    }
01036                 volatile  
01037                        this.enabled = enabled;    
01038         }
01039 
01040         /***********************************************************************
01041 
01042                 request data; fail this Node if we can't connect. Note
01043                 that we make several attempts to connect before writing
01044                 the node off as a failure.
01045                 
01046         ***********************************************************************/
01047         
01048         bool request (ConnectionPool pool, ProtocolWriter writer, ProtocolReader reader, out IPayload payload)
01049         {       
01050                 ProtocolWriter.Command  cmd;
01051                 ulong                   time;
01052                 char[]                  channel;
01053                 char[]                  element;
01054                 Connection              connect;
01055 
01056                 // it's possible that the pool may have failed between 
01057                 // the point of selecting it, and the invocation itself
01058                 if (pool is null)
01059                     return false;
01060 
01061                 // get a connection to the server
01062                 connect = pool.borrow (time = System.getMillisecs);
01063 
01064                 // talk to the server (try a few times if necessary)
01065                 for (int attempts=3; attempts--;)
01066                      try {
01067                          // attach connection to reader and writer 
01068                          writer.getBuffer.setConduit (connect.getConduit);
01069                          reader.getBuffer.setConduit (connect.getConduit);
01070         
01071                          // send request
01072                          writer.flush ();
01073 
01074                          // load the returned object (don't retry!)
01075                          attempts = 0;
01076                          payload = reader.getPayload (channel, element, cmd);
01077 
01078                          // return borrowed connection
01079                          connect.done (time);
01080 
01081                          } catch (PickleException x)
01082                                  {
01083                                  connect.done (time);
01084                                  throw x;
01085                                  }
01086                            catch (IOException x)
01087                                   // attempt to reconnect?
01088                                   if (attempts == 0 || !connect.reset)
01089                                      {
01090                                      // that server is offline
01091                                      fail ();
01092   
01093                                      // state that we failed
01094                                      return false;
01095                                      }
01096                     
01097                 // is payload an exception?
01098                 if (cmd != ProtocolWriter.Command.OK)                       
01099                    {
01100                    // is node full?
01101                    if (cmd == ProtocolWriter.Command.Full)                       
01102                        throw new ClusterFullException (channel);
01103 
01104                    // did node barf?
01105                    if (cmd == ProtocolWriter.Command.Exception)                       
01106                        throw new ClusterException (channel);
01107                         
01108                    // bogus response
01109                    throw new ClusterException ("invalid response from cluster server");
01110                    }
01111 
01112                 return true;
01113         }
01114 }
01115 
01116 
01117 /*******************************************************************************
01118         
01119         Models a set of remote cluster nodes.
01120 
01121 *******************************************************************************/
01122 
01123 private class NodeSet
01124 { 
01125         private HashMap         map;
01126         private Node[]          nodes;
01127         private Node[]          random;
01128         private ILogger         logger;
01129 
01130         /***********************************************************************
01131 
01132         ***********************************************************************/
01133         
01134         this (ILogger logger)
01135         {
01136                 this.logger = logger;
01137                 map = new HashMap (128, 0.75, 4);
01138         }
01139 
01140         /***********************************************************************
01141 
01142         ***********************************************************************/
01143         
01144         ILogger getLogger ()
01145         {
01146                 return logger;
01147         }
01148 
01149         /***********************************************************************
01150 
01151                 Add a node to the list of servers, and sort them such that
01152                 all clients will have the same ordered set
01153 
01154         ***********************************************************************/
01155         
01156         synchronized void addNode (Node node)
01157         {
01158                 char[]  name = node.toString;
01159 
01160                 if (map.get (name))
01161                     throw new ClusterException ("Attempt to add cluster node '"~name~"' more than once");
01162 
01163                 map.put (name, node);
01164                 nodes ~= node;
01165                 //nodes.sort;
01166         }
01167 
01168         /***********************************************************************
01169 
01170         ***********************************************************************/
01171         
01172         void optimize ()
01173         {
01174                 // nodes are already in the same order across the cluster
01175                 // since the configuration file should be identical ...
01176 
01177                 // nodes.sort;
01178                 
01179                 // copy the node list
01180                 random = nodes.dup;
01181 
01182                 // muddle up the duplicate list. This randomized list
01183                 // is used when scanning the cluster for queued entries
01184                 foreach (int i, Node n; random)
01185                         {
01186                         int j = Random.get (random.length);
01187                         Node tmp = random[i];
01188                         random[i] = random[j];
01189                         random[j] = tmp;
01190                         }
01191         }
01192 
01193         /***********************************************************************
01194 
01195         ***********************************************************************/
01196         
01197         synchronized void enable (char[] name, ushort port1, ushort port2)
01198         {
01199                 Node node = cast(Node) map.get (name);
01200                 if (node is null)
01201                     throw new ClusterException ("Attempt to enable unknown cluster node '"~name~"'");
01202                 else
01203                    if (! node.isEnabled)
01204                       {
01205                       node.setCache (new InternetAddress (name, port1));
01206                       node.setTasks (new InternetAddress (name, port2));
01207                       node.setEnabled (true);
01208                       }
01209         }
01210 
01211         /***********************************************************************
01212 
01213         ***********************************************************************/
01214         
01215         IPayload request (ProtocolWriter writer, ProtocolReader reader, char[] key = null)
01216         {
01217                 Node     node;
01218                 IPayload payload;
01219 
01220                 do {
01221                    node = selectNode (key);
01222                    } while (! node.request (node.cache, writer, reader, payload));
01223                 return payload;
01224         }
01225 
01226         /***********************************************************************
01227 
01228                 Select a cluster server based on a starting index. If the
01229                 selected server is not currently enabled, we just try the
01230                 next one. This behaviour should be consistent across each
01231                 cluster client.
01232 
01233         ***********************************************************************/
01234         
01235         private final Node selectNode (uint index)
01236         {
01237                 uint i = nodes.length;
01238 
01239                 if (i)
01240                    {
01241                    index %= i;
01242 
01243                    while (i--)
01244                          {
01245                          Node node = nodes[index];
01246                          if (node.isEnabled)
01247                              return node;
01248         
01249                          if (++index >= nodes.length)
01250                              index = 0;
01251                          }
01252                    }
01253                 throw new ClusterEmptyException ("No cluster servers are available"); 
01254         }
01255 
01256         /***********************************************************************
01257 
01258                 Select a cluster server based on the specified key. If the
01259                 selected server is not currently enabled, we just try the
01260                 next one. This behaviour should be consistent across each
01261                 cluster client.
01262 
01263         ***********************************************************************/
01264         
01265         final Node selectNode (char[] key = null)
01266         {
01267                 static uint index;
01268               
01269                 if (key.length)
01270                     return selectNode (HashMap.jhash (key));
01271 
01272                 // no key provided, so just roll the counter
01273                 return selectNode (++index);
01274         }
01275 
01276         /***********************************************************************
01277 
01278                 Sweep the cluster servers. Returns true if the delegate
01279                 returns true, false otherwise. The sweep is aborted when
01280                 the delegate returns true. Note that this scans nodes in
01281                 a randomized pattern, which should tend to avoid 'bursty'
01282                 activity by a set of clients upon any one cluster server.
01283 
01284         ***********************************************************************/
01285         
01286         bool scan (bool delegate (Node) dg)
01287         {
01288                 Node    node;
01289                 int     index = nodes.length;
01290 
01291                 while (index--)
01292                       {
01293                       // lookup the randomized set of server nodes
01294                       node = random [index];
01295 
01296                       // callback on each enabled node
01297                       if (node.isEnabled)
01298                           if (dg (node))
01299                               return true;
01300                       }
01301                 return false;
01302         }
01303 }
01304 
01305  
01306 

Generated on Sat Apr 9 20:11:25 2005 for Mango by doxygen 1.3.6