Main Page | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Class Members | File Members | Related Pages

Cluster.d

Go to the documentation of this file.
00001 /*******************************************************************************
00002 
00003         @file Cluster.d
00004         
00005         Copyright (c) 2004 Kris Bell
00006         
00007         This software is provided 'as-is', without any express or implied
00008         warranty. In no event will the authors be held liable for damages
00009         of any kind arising from the use of this software.
00010         
00011         Permission is hereby granted to anyone to use this software for any 
00012         purpose, including commercial applications, and to alter it and/or 
00013         redistribute it freely, subject to the following restrictions:
00014         
00015         1. The origin of this software must not be misrepresented; you must 
00016            not claim that you wrote the original software. If you use this 
00017            software in a product, an acknowledgment within documentation of 
00018            said product would be appreciated but is not required.
00019 
00020         2. Altered source versions must be plainly marked as such, and must 
00021            not be misrepresented as being the original software.
00022 
00023         3. This notice may not be removed or altered from any distribution
00024            of the source.
00025 
00026         4. Derivative works are permitted, but they must carry this notice
00027            in full and credit the original source.
00028 
00029 
00030                         ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
00031 
00032 
00033         @version        Initial version, July 2004      
00034         @author         Kris
00035 
00036 
00037 *******************************************************************************/
00038 
00039 module mango.cluster.qos.socket.Cluster;
00040 
00041 private import  mango.cache.HashMap;
00042 
00043 private import  mango.utils.Random;
00044 
00045 private import  mango.format.Int;
00046 
00047 private import  mango.sys.System;
00048 
00049 private import  mango.io.Buffer,
00050                 mango.io.Socket,
00051                 mango.io.Exception,
00052                 mango.io.Properties,
00053                 mango.io.ArrayAllocator,
00054                 mango.io.SocketConduit,
00055                 mango.io.SocketListener,
00056                 mango.io.MulticastSocket;
00057 
00058 private import  mango.io.model.IConduit;
00059 
00060 private import  mango.log.model.ILogger;
00061 
00062 private import  mango.cluster.Client;
00063 
00064 public  import  mango.cluster.model.ICluster;
00065 
00066 private import  mango.cluster.qos.socket.RollCall,
00067                 mango.cluster.qos.socket.ClusterEvent,
00068                 mango.cluster.qos.socket.ProtocolReader,
00069                 mango.cluster.qos.socket.ProtocolWriter;
00070 
00071 
00072 /*******************************************************************************
00073         
00074         QOS implementation for sockets. All cluster-client activity is 
00075         gated through here by the higher level classes; NetworkQueue &
00076         NetworkCache for example. You gain access to the cluster by 
00077         creating an instance of the QOS (quality of service) you desire
00078         and mapping client classes onto it. For example:
00079 
00080         @code
00081         import mango.cluster.NetworkCache;
00082         import mango.cluster.qos.socket.Cluster;
00083 
00084         ICluster cluster = new Cluster (...);
00085         NetworkCache cache = new NetworkCache (cluster, ...);
00086 
00087         cache.put (...);
00088         cache.get (...);
00089         cache.invalidate (...);
00090         @endcode
00091 
00092         Please see the cluster clients for additional details. Currently
00093         these include CacheInvalidator, CacheInvalidatee, NetworkMessage, 
00094         NetworkTask, NetworkQueue, NetworkCache, NetworkCombo, plus the 
00095         Client base-class.
00096 
00097 *******************************************************************************/
00098 
00099 class Cluster : ICluster, IEventListener
00100 {  
00101         private static HashMap                  groups;
00102         private ILogger                         logger;
00103         private NodeSet                         nodeSet;
00104         private Buffer                          mBuffer;
00105         private ProtocolWriter                  mWriter;
00106         private MulticastSocket                 mSocket;
00107 
00108         private int                             groupTTL = 1;
00109         private int                             groupPort = 3333;
00110         private int                             groupPrefix = 225;
00111      
00112         /***********************************************************************
00113 
00114                 Setup a hashmap for multicast group addresses
00115 
00116         ***********************************************************************/
00117         
00118         static this ()
00119         {
00120                 groups = new HashMap (128, 0.75, 2);
00121         }
00122 
00123         /***********************************************************************
00124 
00125                 Setup a Cluster instance. Currently the buffer & writer
00126                 are shared for all bulletin serialization; this should
00127                 probably change at some point such that we can support 
00128                 multiple threads broadcasting concurrently to different 
00129                 output ports.
00130 
00131         ***********************************************************************/
00132         
00133         this (ILogger logger = null)
00134         {
00135                 this.logger = logger;
00136                 nodeSet = new NodeSet (logger);
00137                 mBuffer = new Buffer (1024 * 4);
00138                 mSocket = new MulticastSocket;
00139                 mWriter = new ProtocolWriter (mBuffer);
00140         }
00141 
00142         /***********************************************************************
00143 
00144                 Setup a Cluster instance. Currently the buffer & writer
00145                 are shared for all bulletin serialization; this should
00146                 probably change at some point such that we can support 
00147                 multiple threads broadcasting concurrently to different 
00148                 output ports.
00149 
00150         ***********************************************************************/
00151         
00152         this (ILogger logger, IConduit conduit)
00153         in {
00154            assert (logger);
00155            assert (conduit);
00156            }
00157         body
00158         {
00159                 this (logger);
00160 
00161                 // callback for loading cluster configuration 
00162                 void loader (char[] name, char[] value)
00163                 {
00164                         logger.info ("cluster config: "~name~" = "~value);
00165                         if (name == "node")
00166                             nodeSet.addNode (new Node (logger, value));
00167                         else
00168                         if (name == "multicast_port")
00169                             groupPort = Int.parse (value);
00170                         else
00171                            if (name == "multicast_prefix")
00172                                groupPrefix = Int.parse (value);
00173                         else
00174                            if (name == "multicast_ttl")
00175                                groupTTL = Int.parse (value);
00176                         else
00177                            throw new ClusterException ("Unrecognized attribute '"~name~"' in socket.Cluster configuration");
00178                 }
00179 
00180 
00181                 // load up the cluster configuration
00182                 Properties.load (conduit, &loader);
00183 
00184                 // finalize nodeSet 
00185                 nodeSet.optimize ();
00186 
00187                 // listen for cluster servers
00188                 IChannel channel = createChannel ("cluster.server.advertise");
00189                 createConsumer (channel, IEvent.Style.Bulletin, this);
00190 
00191                 // ask who's currently running
00192                 logger.trace ("discovering active cluster servers ...");
00193                 broadcast (channel, new RollCall (Socket.hostName(), 0, 0, true));
00194 
00195                 // wait for enabled servers to respond ...
00196                 System.sleep (System.Interval.Millisec * 250);
00197         }
00198 
00199         /***********************************************************************
00200 
00201                 Setup a Cluster instance. Currently the buffer & writer
00202                 are shared for all bulletin serialization; this should
00203                 probably change at some point such that we can support 
00204                 multiple threads broadcasting concurrently to different 
00205                 output ports.
00206 
00207         ***********************************************************************/
00208         
00209         this (ILogger logger, uint serverPort)
00210         in {
00211            assert (logger);
00212            assert (serverPort > 1024);
00213            }
00214         body
00215         {
00216                 this (logger);
00217 
00218                 Node node = new Node (logger, "local");
00219                 node.setCache (new InternetAddress ("localhost", serverPort));
00220                 node.setTasks (new InternetAddress ("localhost", serverPort+1));
00221                 node.setEnabled (true);
00222                 nodeSet.addNode (node);
00223                 nodeSet.optimize ();
00224         }
00225 
00226         /***********************************************************************
00227 
00228                 IEventListener interface method for listening to RollCall
00229                 responses. These are sent out by cluster servers both when 
00230                 they get a RollCall request, and when they begin execution.
00231 
00232         ***********************************************************************/
00233 
00234         void notify (IEvent event, IPayload payload)
00235         {
00236                 RollCall rollcall = cast(RollCall) payload;
00237 
00238                 // ignore requests from clients (we're on a common channel)
00239                 if (! rollcall.request)
00240                       nodeSet.enable (rollcall.name, rollcall.port1, rollcall.port2);
00241         }
00242 
00243         /***********************************************************************
00244 
00245                 Create a channel instance. Our channel implementation 
00246                 includes a number of cached IO helpers (ProtolcolWriter
00247                 and so on) which simplifies and speeds up execution.
00248 
00249         ***********************************************************************/
00250         
00251         IChannel createChannel (char[] channel)
00252         {
00253                 return new Channel (channel);
00254         }
00255 
00256         /***********************************************************************
00257 
00258                 Return the logger instance provided during construction.
00259                 
00260         ***********************************************************************/
00261         
00262         ILogger getLogger ()
00263         {
00264                 return logger;
00265         }
00266 
00267         /***********************************************************************
00268 
00269                 Broadcast a payload on the specified channel. This uses
00270                 IP/Multicast to scatter the payload to all registered
00271                 listeners (on the same multicast group). Note that the
00272                 maximum payload size is limited to that of an Ethernet 
00273                 data frame, minus the IP/UDP header size (1472 bytes).
00274 
00275         ***********************************************************************/
00276         
00277         synchronized void broadcast (IChannel channel, IPayload payload = null)
00278         {
00279                 // serialize content
00280                 mBuffer.clear ();
00281                 mWriter.put (ProtocolWriter.Command.OK, channel.getName, payload);
00282 
00283                 // Ethernet data-frame size minus the 28 byte UDP/IP header:
00284                 if (mBuffer.getPosition > 1472)
00285                     throw new ClusterException ("payload is too large to broadcast");
00286 
00287                 // send it to the appropriate multicast group
00288                 mSocket.write (mBuffer, getGroup (channel.getName));
00289         }
00290 
00291         /***********************************************************************
00292 
00293                 Create a listener of the specified type. Listeners are 
00294                 run within their own thread, since they spend the vast 
00295                 majority of their time blocked on a Socket read. Would
00296                 be good to support multiplexed reading instead, such 
00297                 that a thread pool could be applied instead.
00298                  
00299         ***********************************************************************/
00300         
00301         IConsumer createConsumer (IChannel channel, IEvent.Style style, 
00302                                   IEventListener notify)
00303         {
00304                 IEvent event = new ClusterEvent (this, channel, style, notify);
00305                 
00306                 if (logger)
00307                     logger.info ("creating " ~ event.getStyleName ~ 
00308                                  " consumer for channel '" ~ channel.getName ~ "'");
00309 
00310                 switch (style)
00311                        {
00312                        case IEvent.Style.Message:
00313                             return new MessageConsumer (this, event);
00314 
00315                        case IEvent.Style.Bulletin:
00316                             return new BulletinConsumer (this, event);
00317                        }     
00318         }
00319 
00320         /***********************************************************************
00321 
00322                 Return a entry from the network cache, and optionally
00323                 remove it. This is a synchronous operation as opposed
00324                 to the asynchronous nature of an invalidate broadcast.
00325 
00326         ***********************************************************************/
00327         
00328         IPayload getCache (IChannel channel, char[] key, bool remove)
00329         {
00330                 Channel c = cast(Channel) channel;
00331                 
00332                 c.writer.put (remove ? ProtocolWriter.Command.Remove : 
00333                               ProtocolWriter.Command.Copy, c.getName, null, key);
00334                 return nodeSet.request (c.writer, c.reader, key);
00335         }
00336 
00337         /***********************************************************************
00338 
00339                 Place an entry into the network cache, replacing the
00340                 entry with the identical key. Note that this may cause
00341                 the oldest entry in the cache to be displaced if the
00342                 cache is already full.
00343 
00344         ***********************************************************************/
00345         
00346         IPayload putCache (IChannel channel, char[] key, IPayload payload)
00347         {
00348                 Channel c = cast(Channel) channel;
00349 
00350                 c.writer.put (ProtocolWriter.Command.Add, c.getName, payload, key);
00351                 return nodeSet.request (c.writer, c.reader, key);
00352         }
00353 
00354         /***********************************************************************
00355 
00356                 Add an entry to the specified network queue. May throw a
00357                 QueueFullException if there's no room available.
00358 
00359         ***********************************************************************/
00360         
00361         IPayload putQueue (IChannel channel, IPayload payload)
00362         {
00363                 Channel c = cast(Channel) channel;
00364                 
00365                 c.writer.put (ProtocolWriter.Command.AddQueue, c.getName, payload);
00366                 nodeSet.request (c.writer, c.reader);
00367                 return payload;
00368         }
00369 
00370 
00371         /***********************************************************************
00372                 
00373                 Query the cluster for queued entries on the corresponding 
00374                 channel. Returns, and removes, the first matching entry 
00375                 from the cluster. Note that this sweeps the cluster for
00376                 matching entries, and is synchronous in nature. The more
00377                 common approach is to setup a queue listener, which will
00378                 grab and dispatch queue entries asynchronously.
00379 
00380         ***********************************************************************/
00381         
00382         IPayload getQueue (IChannel channel)
00383         {
00384                 IPayload payload;
00385 
00386                 Channel c = cast(Channel) channel;
00387 
00388                 // callback for NodeSet.scan()
00389                 bool scan (Node node)
00390                 {
00391                         // ask this node ...
00392                         c.writer.put (ProtocolWriter.Command.RemoveQueue, c.getName);
00393                         node.request (node.cache, c.writer, c.reader, payload);
00394                         return payload !is null;
00395                 }
00396 
00397                 // make a pass over each Node, looking for channel entries
00398                 nodeSet.scan (&scan);
00399                 return payload;
00400         }
00401 
00402         /***********************************************************************
00403                 
00404                 Load a network cache entry remotely. This sends the given
00405                 Payload over the network to the cache host, where it will
00406                 be executed locally. The benefit of doing so it that the
00407                 host may deny access to the cache entry for the duration
00408                 of the load operation. This, in turn, provides an elegant 
00409                 mechanism for gating/synchronizing multiple network clients  
00410                 over a given cache entry; handy for those entries that are 
00411                 relatively expensive to construct or access. 
00412 
00413         ***********************************************************************/
00414         
00415         void loadCache (IChannel channel, char[] key, IPayload payload)
00416         {               
00417                 Channel c = cast(Channel) channel;
00418 
00419                 c.writer.put (ProtocolWriter.Command.OK, c.getName, payload, key);
00420                 Node node = nodeSet.selectNode (key);
00421                 node.request (node.tasks, c.writer, c.reader, payload);
00422         }
00423 
00424         /***********************************************************************
00425 
00426                 Return an internet address representing the multicast
00427                 group for the specified channel. We use three of the
00428                 four address segments to represent the channel itself
00429                 (via a hash on the channel name), and set the primary
00430                 segment to be that of the broadcast prefix (above).
00431 
00432         ***********************************************************************/
00433         
00434         synchronized InternetAddress getGroup (char[] channel)
00435         {
00436                 InternetAddress group = cast(InternetAddress) groups.get (channel);
00437 
00438                 if (group is null)
00439                    {
00440                    char[5] tmp;
00441                    char[]  address;
00442 
00443                    // construct a group address from the prefix & channel-hash,
00444                    // where the hash is folded down to 24 bits
00445                    uint hash = groups.jhash (channel);
00446                    hash = (hash >> 24) ^ (hash & 0x00ffffff);
00447                   
00448                    address = Int.format (groupPrefix) ~ "." ~
00449                              Int.format ((hash >> 16) & 0xff) ~ "." ~
00450                              Int.format ((hash >> 8) & 0xff) ~ "." ~
00451                              Int.format (hash & 0xff);
00452 
00453                    //printf ("channel '%.*s' == '%.*s'\n", channel, address);
00454         
00455                    // insert InternetAddress into hashmap
00456                    group = new InternetAddress (address, groupPort);
00457                    groups.put (channel, group);
00458                    }
00459                 return group;              
00460         }
00461 }
00462 
00463 
00464 /*******************************************************************************         
00465 
00466         A listener for multicast channel traffic. These are currently used 
00467         for cache coherency, queue publishing, and node discovery activity; 
00468         though could be used for direct messaging also.
00469 
00470 *******************************************************************************/
00471 
00472 private class BulletinConsumer : SocketListener, IConsumer
00473 {
00474         private IEvent                  event;
00475         private Buffer                  buffer;
00476         private ProtocolReader          reader;
00477         private Cluster                 cluster;
00478         private MulticastSocket         consumer;
00479 
00480         /***********************************************************************
00481 
00482                 Construct a multicast consumer for the specified event. The
00483                 event handler will be invoked whenever a message arrives for
00484                 the associated channel.
00485 
00486         ***********************************************************************/
00487         
00488         this (Cluster cluster, IEvent event)
00489         {
00490                 this.event = event;
00491                 this.cluster = cluster;
00492 
00493                 // buffer doesn't need to be larger than Ethernet data-frame
00494                 buffer = new Buffer (1500);
00495 
00496                 // make the reader slice directly from the buffer content
00497                 reader = new ProtocolReader (buffer);
00498                 reader.setAllocator (new BufferAllocator);
00499 
00500                 // configure a listener socket
00501                 consumer = new MulticastSocket;
00502                 consumer.join (cluster.getGroup (event.getChannel.getName));
00503                 
00504                 super (consumer, buffer);
00505 
00506                 // fire up this listener
00507                 start ();
00508         }
00509 
00510         /***********************************************************************
00511 
00512                 Notification callback invoked when we receive a multicast
00513                 packet. Note that we check the packet channel-name against
00514                 the one we're consuming, to check for cases where the group
00515                 address had a hash collision.
00516 
00517         ***********************************************************************/
00518         
00519         override void notify (IBuffer buffer)
00520         {
00521                 ProtocolWriter.Command  cmd;
00522                 char[]                  channel;
00523                 char[]                  element;
00524                 
00525                 IPayload payload = reader.getPayload (channel, element, cmd);
00526                 // printf ("notify '%.*s::%.*s' #'%.*s'\n", channel, element, event.getChannel.getName);
00527 
00528                 // check it's really for us first (might be a hash collision)
00529                 if (channel == event.getChannel.getName)
00530                     invoke (event, payload);
00531         }
00532 
00533         /***********************************************************************
00534 
00535                 Handle error conditions from the listener thread.
00536 
00537         ***********************************************************************/
00538 
00539         override void exception (char [] msg)
00540         {
00541                 cluster.getLogger.error ("BulletinConsumer: "~msg);
00542         }
00543 
00544         /***********************************************************************
00545 
00546                 Overridable mean of notifying the client code.
00547                  
00548         ***********************************************************************/
00549         
00550         protected void invoke (IEvent event, IPayload payload)
00551         {
00552                 event.invoke (payload);
00553         }
00554 
00555         /***********************************************************************
00556 
00557                 Return the cluster instance we're associated with.
00558 
00559         ***********************************************************************/
00560         
00561         Cluster getCluster ()
00562         {
00563                 return cluster;
00564         }
00565 
00566         /***********************************************************************
00567 
00568                 Temporarily halt listening. This can be used to ignore
00569                 multicast messages while, for example, the consumer is
00570                 busy doing other things.
00571 
00572         ***********************************************************************/
00573 
00574         void pauseGroup ()
00575         {
00576                 consumer.pauseGroup ();
00577         }
00578 
00579         /***********************************************************************
00580 
00581                 Resume listening, post-pause.
00582 
00583         ***********************************************************************/
00584 
00585         void resumeGroup ()
00586         {
00587                 consumer.resumeGroup ();
00588         }
00589 
00590         /***********************************************************************
00591 
00592                 Cancel this consumer. The listener is effectively disabled
00593                 from this point forward. The listener thread does not halt
00594                 at this point, but waits until the socket-read returns. 
00595                 Note that the D Interface implementation requires us to 
00596                 "reimplement and dispatch" trivial things like this ~ it's
00597                 a pain in the neck to maintain.
00598 
00599         ***********************************************************************/
00600         
00601         void cancel ()
00602         {
00603                 super.cancel ();
00604         }
00605 }
00606 
00607 
00608 /*******************************************************************************
00609         
00610         A listener for queue events. These events are produced by the 
00611         queue host on a periodic bases when it has available entries.
00612         We listen for them (rather than constantly scanning) and then
00613         begin a sweep to process as many as we can. Note that we will
00614         be in competition with other nodes to process these entries.
00615 
00616 *******************************************************************************/
00617 
00618 private class MessageConsumer : BulletinConsumer
00619 {
00620         /***********************************************************************
00621 
00622                 Construct a multicast consumer for the specified event
00623 
00624         ***********************************************************************/
00625         
00626         this (Cluster cluster, IEvent event)
00627         {
00628                 super (cluster, event);
00629         }
00630 
00631         /***********************************************************************
00632 
00633                 Handle error conditions from the listener thread.
00634 
00635         ***********************************************************************/
00636 
00637         override void exception (char [] msg)
00638         {
00639                 cluster.getLogger.error ("MessageConsumer: "~msg);
00640         }
00641 
00642         /***********************************************************************
00643 
00644                 override the default processing to sweep the cluster for 
00645                 queued entries. Each server node is queried until one is
00646                 found that contains a payload. Note that it is possible 
00647                 to set things up where we are told exactly which node to
00648                 go to; howerver given that we won't be listening whilst
00649                 scanning, and that there's likely to be a group of new
00650                 entries in the cluster, it's just as effective to scan.
00651                 This will be far from ideal for all environments, so we 
00652                 should make the strategy plugable instead.                 
00653 
00654         ***********************************************************************/
00655         
00656         override protected void invoke (IEvent event, IPayload payload)
00657         {
00658                 //temporarilty pause listening
00659                 pauseGroup ();
00660 
00661                 try {
00662                     while ((payload = getCluster.getQueue (event.getChannel)) !is null)
00663                             event.invoke (payload);
00664                     } finally 
00665                             // resume listening
00666                             resumeGroup ();
00667         }
00668 }
00669 
00670 
00671 /*******************************************************************************
00672         
00673         A channel represents something akin to a publish/subscribe topic, 
00674         or a radio station. These are used to segregate cluster operations
00675         into a set of groups, where each group is represented by a channel.
00676         Channel names are whatever you want then to be: use of dot notation 
00677         has proved useful in the past. See Client.createChannel
00678 
00679 *******************************************************************************/
00680 
00681 private class Channel : IChannel
00682 {
00683         char[]                  name;
00684         Buffer                  buffer;
00685         ProtocolReader          reader;
00686         ProtocolWriter          writer;
00687 
00688         /***********************************************************************
00689         
00690                 Construct a channel with the specified name. We cache
00691                 a number of session-related constructs here also, in
00692                 order to eliminate runtime overhead
00693 
00694         ***********************************************************************/
00695 
00696         this (char[] name)
00697         in {
00698            assert (name.length);
00699            }
00700         body
00701         {       
00702                 this.name = name;
00703 
00704                 // this buffer will grow as required to house larger payloads
00705                 buffer = new GrowableBuffer (1024 * 2);
00706                 writer = new ProtocolWriter (buffer);
00707 
00708                 // make the reader slice directly from the buffer content
00709                 reader = new ProtocolReader (buffer);
00710                 reader.setAllocator (new BufferAllocator);
00711         }
00712 
00713         /***********************************************************************
00714         
00715                 Return the name of this channel. This is the name provided
00716                 when the channel was constructed.
00717 
00718         ***********************************************************************/
00719 
00720         char[] getName ()
00721         {
00722                 return name;
00723         }
00724 
00725         /***********************************************************************
00726         
00727                 Output this channel via the provided IWriter
00728 
00729         ***********************************************************************/
00730 
00731         void write (IWriter writer)
00732         {
00733                 writer.put (name);
00734         }
00735 
00736         /***********************************************************************
00737         
00738                 Input this channel via the provided IReader
00739 
00740         ***********************************************************************/
00741 
00742         void read (IReader reader)
00743         {
00744                 reader.get (name);
00745         }
00746 }
00747 
00748 
00749 /*******************************************************************************
00750         
00751         An abstraction of a socket connection. Used internally by the 
00752         socket-based Cluster. 
00753 
00754 *******************************************************************************/
00755 
00756 private class Connection
00757 {
00758         abstract bool reset();
00759 
00760         abstract void done (ulong time);
00761 
00762         abstract SocketConduit getConduit ();
00763 }
00764 
00765 
00766 /*******************************************************************************
00767         
00768         A pool of socket connections for accessing cluster nodes. Note 
00769         that the entries will timeout after a period of inactivity, and
00770         will subsequently cause a connected host to drop the supporting
00771         session.
00772 
00773 *******************************************************************************/
00774 
00775 private class ConnectionPool
00776 { 
00777         private int             count;
00778         private InternetAddress address;
00779         private PoolConnection  freelist;
00780         private const ulong     timeout = 60_000;
00781 
00782         /***********************************************************************
00783         
00784                 Utility class to provide the basic connection facilities
00785                 provided by the connection pool.
00786 
00787         ***********************************************************************/
00788 
00789         class PoolConnection : Connection
00790         {
00791                 ulong           time;
00792                 PoolConnection  next;   
00793                 ConnectionPool  parent;   
00794                 SocketConduit   conduit;
00795 
00796                 /***************************************************************
00797                 
00798                         Construct a new connection and set its parent
00799 
00800                 ***************************************************************/
00801         
00802                 this (ConnectionPool pool)
00803                 {
00804                         parent = pool;
00805                         reset ();
00806                 }
00807                   
00808                 /***************************************************************
00809 
00810                         Return the socket belonging to this connection
00811 
00812                 ***************************************************************/
00813         
00814                 SocketConduit getConduit ()
00815                 {
00816                         return conduit;
00817                 }
00818                   
00819                 /***************************************************************
00820 
00821                         Create a new socket and connect it to the specified 
00822                         server. This will cause a dedicated thread to start 
00823                         on the server. Said thread will quit when an error
00824                         occurs.
00825 
00826                 ***************************************************************/
00827         
00828                 bool reset ()
00829                 {
00830                         try {
00831                             conduit = new SocketConduit;
00832                             conduit.connect (parent.address);
00833 
00834                             // set a 500ms timeout for read operations
00835                             conduit.setTimeout (System.Interval.Millisec * 500);
00836                             return true;
00837 
00838                             } catch (Object)
00839                                     {
00840                                     return false;
00841                                     }
00842                 }
00843                   
00844                 /***************************************************************
00845 
00846                         Close the socket. This will cause any host session
00847                         to be terminated.
00848 
00849                 ***************************************************************/
00850         
00851                 void close ()
00852                 {
00853                         conduit.close ();
00854                 }
00855 
00856                 /***************************************************************
00857 
00858                         Return this connection to the free-list. Note that
00859                         we have to synchronize on the parent-pool itself.
00860 
00861                 ***************************************************************/
00862 
00863                 void done (ulong time)
00864                 {
00865                         synchronized (parent)
00866                                      {
00867                                      next = parent.freelist;
00868                                      parent.freelist = this;
00869                                      this.time = time;
00870                                      }
00871                 }
00872         }
00873 
00874 
00875         /***********************************************************************
00876 
00877                 Create a connection-pool for the specified address.
00878 
00879         ***********************************************************************/
00880 
00881         this (InternetAddress address)
00882         {      
00883                 this.address = address;
00884         }
00885 
00886         /***********************************************************************
00887 
00888                 Allocate a Connection from a list rather than creating a 
00889                 new one. Reap old entries as we go.
00890 
00891         ***********************************************************************/
00892 
00893         synchronized Connection borrow (ulong time)
00894         {  
00895                 if (freelist)
00896                     do {
00897                        PoolConnection c = freelist;
00898 
00899                        freelist = c.next;
00900                        if (freelist && (time - c.time > timeout))
00901                            c.close ();
00902                        else
00903                           return c;
00904                        } while (true);
00905 
00906                 return new PoolConnection (this);
00907         }
00908 
00909         /***********************************************************************
00910 
00911                 Close this pool and drop all existing connections.
00912 
00913         ***********************************************************************/
00914 
00915         synchronized void close ()
00916         {       
00917                 PoolConnection c = freelist;
00918                 freelist = null;
00919                 while (c)
00920                       {
00921                       c.close ();
00922                       c = c.next;
00923                       }
00924         }
00925 }
00926 
00927 
00928 /*******************************************************************************
00929         
00930         Class to represent a cluster node. Each node supports both cache
00931         and queue functionality. Note that the set of available nodes is
00932         configured at startup, simplifying the discovery process in some
00933         significant ways, and causing less thrashing of cache-keys.
00934 
00935 *******************************************************************************/
00936 
00937 private class Node
00938 { 
00939         private char[]                  name,
00940                                         port;
00941         private ILogger                 logger;
00942         private bool                    enabled;
00943 
00944         private ConnectionPool          tasks,
00945                                         cache;
00946 
00947         /***********************************************************************
00948 
00949                 Construct a node with the provided name. This name should
00950                 be the network name of the hosting device.
00951 
00952         ***********************************************************************/
00953         
00954         this (ILogger logger, char[] name)
00955         {
00956                 this.logger = logger;
00957                 this.name = name;
00958         }
00959 
00960         /***********************************************************************
00961 
00962                 Add a cache/queue reference for the remote node
00963                  
00964         ***********************************************************************/
00965 
00966         void setCache (InternetAddress address)
00967         {      
00968                 this.cache = new ConnectionPool (address);
00969                 port = Int.format (address.port);
00970         }
00971 
00972         /***********************************************************************
00973 
00974                 Add a cache-loader reference for the remote node
00975 
00976         ***********************************************************************/
00977 
00978         void setTasks (InternetAddress address)
00979         {      
00980                 this.tasks = new ConnectionPool (address);
00981         }
00982 
00983         /***********************************************************************
00984 
00985                 Return the name of this node (the network name of the device)
00986 
00987         ***********************************************************************/
00988         
00989         override char[] toString ()
00990         {
00991                 return name;
00992         }
00993 
00994         /***********************************************************************
00995 
00996                 Remove this Node from the cluster. The node is disabled
00997                 until it is seen to recover.
00998 
00999         ***********************************************************************/
01000 
01001         void fail ()
01002         {       
01003                 setEnabled (false);
01004                 cache.close ();    
01005                 tasks.close ();    
01006         }
01007 
01008         /***********************************************************************
01009 
01010                 Get the current state of this node
01011 
01012         ***********************************************************************/
01013 
01014         bool isEnabled ()
01015         {      
01016                 volatile  
01017                        return enabled;    
01018         }
01019 
01020         /***********************************************************************
01021 
01022                 Set the enabled state of this node
01023 
01024         ***********************************************************************/
01025 
01026         void setEnabled (bool enabled)
01027         {      
01028                 if (logger && logger.isEnabled (logger.Level.Trace))
01029                    {
01030                    if (enabled)
01031                        logger.trace ("enabling node '"~name~"' on port "~port);
01032                    else
01033                       logger.trace ("disabling node '"~name~"'");
01034                    }
01035                 volatile  
01036                        this.enabled = enabled;    
01037         }
01038 
01039         /***********************************************************************
01040 
01041                 request data; fail this Node if we can't connect. Note
01042                 that we make several attempts to connect before writing
01043                 the node off as a failure.
01044                 
01045         ***********************************************************************/
01046         
01047         bool request (ConnectionPool pool, ProtocolWriter writer, ProtocolReader reader, out IPayload payload)
01048         {       
01049                 ProtocolWriter.Command  cmd;
01050                 ulong                   time;
01051                 char[]                  channel;
01052                 char[]                  element;
01053                 Connection              connect;
01054 
01055                 // it's possible that the pool may have failed between 
01056                 // the point of selecting it, and the invocation itself
01057                 if (pool is null)
01058                     return false;
01059 
01060                 // get a connection to the server
01061                 connect = pool.borrow (time = System.getMillisecs);
01062 
01063                 // talk to the server (try a few times if necessary)
01064                 for (int attempts=3; attempts--;)
01065                      try {
01066                          // attach connection to reader and writer 
01067                          writer.getBuffer.setConduit (connect.getConduit);
01068                          reader.getBuffer.setConduit (connect.getConduit);
01069         
01070                          // send request
01071                          writer.flush ();
01072 
01073                          // load the returned object (don't retry!)
01074                          attempts = 0;
01075                          payload = reader.getPayload (channel, element, cmd);
01076 
01077                          // return borrowed connection
01078                          connect.done (time);
01079 
01080                          } catch (PickleException x)
01081                                  {
01082                                  connect.done (time);
01083                                  throw x;
01084                                  }
01085                            catch (IOException x)
01086                                   // attempt to reconnect?
01087                                   if (attempts == 0 || !connect.reset)
01088                                      {
01089                                      // that server is offline
01090                                      fail ();
01091   
01092                                      // state that we failed
01093                                      return false;
01094                                      }
01095                     
01096                 // is payload an exception?
01097                 if (cmd != ProtocolWriter.Command.OK)                       
01098                    {
01099                    // is node full?
01100                    if (cmd == ProtocolWriter.Command.Full)                       
01101                        throw new ClusterFullException (channel);
01102 
01103                    // did node barf?
01104                    if (cmd == ProtocolWriter.Command.Exception)                       
01105                        throw new ClusterException (channel);
01106                         
01107                    // bogus response
01108                    throw new ClusterException ("invalid response from cluster server");
01109                    }
01110 
01111                 return true;
01112         }
01113 }
01114 
01115 
01116 /*******************************************************************************
01117         
01118         Models a set of remote cluster nodes.
01119 
01120 *******************************************************************************/
01121 
01122 private class NodeSet
01123 { 
01124         private HashMap         map;
01125         private Node[]          nodes;
01126         private Node[]          random;
01127         private ILogger         logger;
01128 
01129         /***********************************************************************
01130 
01131         ***********************************************************************/
01132         
01133         this (ILogger logger)
01134         {
01135                 this.logger = logger;
01136                 map = new HashMap (128, 0.75, 4);
01137         }
01138 
01139         /***********************************************************************
01140 
01141         ***********************************************************************/
01142         
01143         ILogger getLogger ()
01144         {
01145                 return logger;
01146         }
01147 
01148         /***********************************************************************
01149 
01150                 Add a node to the list of servers, and sort them such that
01151                 all clients will have the same ordered set
01152 
01153         ***********************************************************************/
01154         
01155         synchronized void addNode (Node node)
01156         {
01157                 char[]  name = node.toString;
01158 
01159                 if (map.get (name))
01160                     throw new ClusterException ("Attempt to add cluster node '"~name~"' more than once");
01161 
01162                 map.put (name, node);
01163                 nodes ~= node;
01164                 //nodes.sort;
01165         }
01166 
01167         /***********************************************************************
01168 
01169         ***********************************************************************/
01170         
01171         void optimize ()
01172         {
01173                 // nodes are already in the same order across the cluster
01174                 // since the configuration file should be identical ...
01175 
01176                 // nodes.sort;
01177                 
01178                 // copy the node list
01179                 random = nodes.dup;
01180 
01181                 // muddle up the duplicate list. This randomized list
01182                 // is used when scanning the cluster for queued entries
01183                 foreach (int i, Node n; random)
01184                         {
01185                         int j = Random.get (random.length);
01186                         Node tmp = random[i];
01187                         random[i] = random[j];
01188                         random[j] = tmp;
01189                         }
01190         }
01191 
01192         /***********************************************************************
01193 
01194         ***********************************************************************/
01195         
01196         synchronized void enable (char[] name, ushort port1, ushort port2)
01197         {
01198                 Node node = cast(Node) map.get (name);
01199                 if (node is null)
01200                     throw new ClusterException ("Attempt to enable unknown cluster node '"~name~"'");
01201                 else
01202                    if (! node.isEnabled)
01203                       {
01204                       node.setCache (new InternetAddress (name, port1));
01205                       node.setTasks (new InternetAddress (name, port2));
01206                       node.setEnabled (true);
01207                       }
01208         }
01209 
01210         /***********************************************************************
01211 
01212         ***********************************************************************/
01213         
01214         IPayload request (ProtocolWriter writer, ProtocolReader reader, char[] key = null)
01215         {
01216                 Node     node;
01217                 IPayload payload;
01218 
01219                 do {
01220                    node = selectNode (key);
01221                    } while (! node.request (node.cache, writer, reader, payload));
01222                 return payload;
01223         }
01224 
01225         /***********************************************************************
01226 
01227                 Select a cluster server based on a starting index. If the
01228                 selected server is not currently enabled, we just try the
01229                 next one. This behaviour should be consistent across each
01230                 cluster client.
01231 
01232         ***********************************************************************/
01233         
01234         private final Node selectNode (uint index)
01235         {
01236                 uint i = nodes.length;
01237 
01238                 if (i)
01239                    {
01240                    index %= i;
01241 
01242                    while (i--)
01243                          {
01244                          Node node = nodes[index];
01245                          if (node.isEnabled)
01246                              return node;
01247         
01248                          if (++index >= nodes.length)
01249                              index = 0;
01250                          }
01251                    }
01252                 throw new ClusterEmptyException ("No cluster servers are available"); 
01253         }
01254 
01255         /***********************************************************************
01256 
01257                 Select a cluster server based on the specified key. If the
01258                 selected server is not currently enabled, we just try the
01259                 next one. This behaviour should be consistent across each
01260                 cluster client.
01261 
01262         ***********************************************************************/
01263         
01264         final Node selectNode (char[] key = null)
01265         {
01266                 static uint index;
01267               
01268                 if (key.length)
01269                     return selectNode (HashMap.jhash (key));
01270 
01271                 // no key provided, so just roll the counter
01272                 return selectNode (++index);
01273         }
01274 
01275         /***********************************************************************
01276 
01277                 Sweep the cluster servers. Returns true if the delegate
01278                 returns true, false otherwise. The sweep is aborted when
01279                 the delegate returns true. Note that this scans nodes in
01280                 a randomized pattern, which should tend to avoid 'bursty'
01281                 activity by a set of clients upon any one cluster server.
01282 
01283         ***********************************************************************/
01284         
01285         bool scan (bool delegate (Node) dg)
01286         {
01287                 Node    node;
01288                 int     index = nodes.length;
01289 
01290                 while (index--)
01291                       {
01292                       // lookup the randomized set of server nodes
01293                       node = random [index];
01294 
01295                       // callback on each enabled node
01296                       if (node.isEnabled)
01297                           if (dg (node))
01298                               return true;
01299                       }
01300                 return false;
01301         }
01302 }
01303 
01304  
01305 

Generated on Fri Nov 11 18:44:18 2005 for Mango by  doxygen 1.4.0