Main Page | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | File Members | Related Pages

Cluster.d

Go to the documentation of this file.
00001 /*******************************************************************************
00002 
00003         @file Cluster.d
00004         
00005         Copyright (C) 2004 Kris Bell
00006         
00007         This software is provided 'as-is', without any express or implied
00008         warranty. In no event will the authors be held liable for damages
00009         of any kind arising from the use of this software.
00010         
00011         Permission is hereby granted to anyone to use this software for any 
00012         purpose, including commercial applications, and to alter it and/or 
00013         redistribute it freely, subject to the following restrictions:
00014         
00015         1. The origin of this software must not be misrepresented; you must 
00016            not claim that you wrote the original software. If you use this 
00017            software in a product, an acknowledgment within documentation of 
00018            said product would be appreciated but is not required.
00019 
00020         2. Altered source versions must be plainly marked as such, and must 
00021            not be misrepresented as being the original software.
00022 
00023         3. This notice may not be removed or altered from any distribution
00024            of the source.
00025 
00026 
00027                         ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
00028 
00029 
00030         @version        Initial version, July 2004      
00031         @author         Kris
00032 
00033 
00034 *******************************************************************************/
00035 
00036 module mango.cluster.qos.socket.Cluster;
00037 
00038 private import  mango.cache.HashMap;
00039 
00040 private import  mango.utils.Text,
00041                 mango.utils.Random;
00042 
00043 private import  mango.base.System;
00044 
00045 private import  mango.io.Buffer,
00046                 mango.io.Socket,
00047                 mango.io.Exception,
00048                 mango.io.Properties,
00049                 mango.io.ArrayAllocator,
00050                 mango.io.SocketConduit,
00051                 mango.io.SocketListener,
00052                 mango.io.MulticastSocket;
00053 
00054 private import  mango.io.model.IConduit;
00055 
00056 private import  mango.log.model.ILogger;
00057 
00058 private import  mango.cluster.Client;
00059 
00060 public  import  mango.cluster.model.ICluster;
00061 
00062 private import  mango.cluster.qos.socket.RollCall,
00063                 mango.cluster.qos.socket.ClusterEvent,
00064                 mango.cluster.qos.socket.ProtocolReader,
00065                 mango.cluster.qos.socket.ProtocolWriter;
00066 
00067 
00068 /*******************************************************************************
00069         
00070         QOS implementation for sockets. All cluster-client activity is 
00071         gated through here by the higher level classes; NetworkQueue &
00072         NetworkCache for example. You gain access to the cluster by 
00073         creating an instance of the QOS (quality of service) you desire
00074         and mapping client classes onto it. For example:
00075 
00076         @code
00077         import mango.cluster.NetworkCache;
00078         import mango.cluster.qos.socket.Cluster;
00079 
00080         ICluster cluster = new Cluster (...);
00081         NetworkCache cache = new NetworkCache (cluster, ...);
00082 
00083         cache.put (...);
00084         cache.get (...);
00085         cache.invalidate (...);
00086         @endcode
00087 
00088         Please see the cluster clients for additional details. Currently
00089         these include CacheInvalidator, CacheInvalidatee, NetworkMessage, 
00090         NetworkTask, NetworkQueue, NetworkCache, NetworkCombo, plus the 
00091         Client base-class.
00092 
00093 *******************************************************************************/
00094 
00095 class Cluster : ICluster, IEventListener
00096 {  
00097         private static HashMap                  groups;
00098         private ILogger                         logger;
00099         private NodeSet                         nodeSet;
00100         private Buffer                          mBuffer;
00101         private ProtocolWriter                  mWriter;
00102         private MulticastSocket                 mSocket;
00103 
00104         private ushort                          groupTTL = 1;
00105         private ushort                          groupPort = 3333;
00106         private ubyte                           groupPrefix = 225;
00107      
00108         /***********************************************************************
00109 
00110                 Setup a hashmap for multicast group addresses
00111 
00112         ***********************************************************************/
00113         
00114         static this ()
00115         {
00116                 groups = new HashMap (128, 0.75, 2);
00117         }
00118 
00119         /***********************************************************************
00120 
00121                 Setup a Cluster instance. Currently the buffer & writer
00122                 are shared for all bulletin serialization; this should
00123                 probably change at some point such that we can support 
00124                 multiple threads broadcasting concurrently to different 
00125                 output ports.
00126 
00127         ***********************************************************************/
00128         
00129         this (ILogger logger = null)
00130         {
00131                 this.logger = logger;
00132                 nodeSet = new NodeSet (logger);
00133                 mBuffer = new Buffer (1024 * 4);
00134                 mSocket = new MulticastSocket;
00135                 mWriter = new ProtocolWriter (mBuffer);
00136         }
00137 
00138         /***********************************************************************
00139 
00140                 Setup a Cluster instance. Currently the buffer & writer
00141                 are shared for all bulletin serialization; this should
00142                 probably change at some point such that we can support 
00143                 multiple threads broadcasting concurrently to different 
00144                 output ports.
00145 
00146         ***********************************************************************/
00147         
00148         this (ILogger logger, IConduit conduit)
00149         in {
00150            assert (logger);
00151            assert (conduit);
00152            }
00153         body
00154         {
00155                 this (logger);
00156 
00157                 // callback for loading cluster configuration 
00158                 void loader (char[] name, char[] value)
00159                 {
00160                         logger.info ("cluster config: "~name~" = "~value);
00161                         if (name == "node")
00162                             nodeSet.addNode (new Node (logger, value));
00163                         else
00164                         if (name == "multicast_port")
00165                             groupPort = Text.atoi (value);
00166                         else
00167                            if (name == "multicast_prefix")
00168                                groupPrefix = Text.atoi (value);
00169                         else
00170                            if (name == "multicast_ttl")
00171                                groupTTL = Text.atoi (value);
00172                         else
00173                            throw new ClusterException ("Unrecognized attribute '"~name~"' in socket.Cluster configuration");
00174                 }
00175 
00176 
00177                 // load up the cluster configuration
00178                 Properties.load (conduit, &loader);
00179 
00180                 // finalize nodeSet 
00181                 nodeSet.optimize ();
00182 
00183                 // listen for cluster servers
00184                 IChannel channel = createChannel ("cluster.server.advertise");
00185                 createConsumer (channel, IEvent.Style.Bulletin, this);
00186 
00187                 // ask who's currently running
00188                 logger.trace ("discovering active cluster servers ...");
00189                 broadcast (channel, new RollCall (Socket.hostName(), 0, 0, true));
00190 
00191                 // wait for enabled servers to respond ...
00192                 System.sleep (System.Interval.Millisec * 250);
00193         }
00194 
00195         /***********************************************************************
00196 
00197                 Setup a Cluster instance. Currently the buffer & writer
00198                 are shared for all bulletin serialization; this should
00199                 probably change at some point such that we can support 
00200                 multiple threads broadcasting concurrently to different 
00201                 output ports.
00202 
00203         ***********************************************************************/
00204         
00205         this (ILogger logger, uint serverPort)
00206         in {
00207            assert (logger);
00208            assert (serverPort > 1024);
00209            }
00210         body
00211         {
00212                 this (logger);
00213 
00214                 Node node = new Node (logger, "local");
00215                 node.setCache (new InternetAddress ("lucy", serverPort));
00216                 node.setTasks (new InternetAddress ("lucy", serverPort+1));
00217                 node.setEnabled (true);
00218                 nodeSet.addNode (node);
00219                 nodeSet.optimize ();
00220         }
00221 
00222         /***********************************************************************
00223 
00224                 IEventListener interface method for listening to RollCall
00225                 responses. These are sent out by cluster servers both when 
00226                 they get a RollCall request, and when they begin execution.
00227 
00228         ***********************************************************************/
00229 
00230         void notify (IEvent event, IPayload payload)
00231         {
00232                 RollCall rollcall = cast(RollCall) payload;
00233 
00234                 // ignore requests from clients (we're on a common channel)
00235                 if (! rollcall.request)
00236                       nodeSet.enable (rollcall.name, rollcall.port1, rollcall.port2);
00237         }
00238 
00239         /***********************************************************************
00240 
00241                 Create a channel instance. Our channel implementation 
00242                 includes a number of cached IO helpers (ProtolcolWriter
00243                 and so on) which simplifies and speeds up execution.
00244 
00245         ***********************************************************************/
00246         
00247         IChannel createChannel (char[] channel)
00248         {
00249                 return new Channel (channel);
00250         }
00251 
00252         /***********************************************************************
00253 
00254                 Return the logger instance provided during construction.
00255                 
00256         ***********************************************************************/
00257         
00258         ILogger getLogger ()
00259         {
00260                 return logger;
00261         }
00262 
00263         /***********************************************************************
00264 
00265                 Broadcast a payload on the specified channel. This uses
00266                 IP/Multicast to scatter the payload to all registered
00267                 listeners (on the same multicast group). Note that the
00268                 maximum payload size is limited to that of an Ethernet 
00269                 data frame, minus the IP/UDP header size (1472 bytes).
00270 
00271         ***********************************************************************/
00272         
00273         synchronized void broadcast (IChannel channel, IPayload payload = null)
00274         {
00275                 // serialize content
00276                 mBuffer.clear ();
00277                 mWriter.put (ProtocolWriter.Command.OK, channel.getName, payload);
00278 
00279                 // Ethernet data-frame size minus the 28 byte UDP/IP header:
00280                 if (mBuffer.getPosition > 1472)
00281                     throw new ClusterException ("payload is too large to broadcast");
00282 
00283                 // send it to the appropriate multicast group
00284                 mSocket.write (mBuffer, getGroup (channel.getName));
00285         }
00286 
00287         /***********************************************************************
00288 
00289                 Create a listener of the specified type. Listeners are 
00290                 run within their own thread, since they spend the vast 
00291                 majority of their time blocked on a Socket read. Would
00292                 be good to support multiplexed reading instead, such 
00293                 that a thread pool could be applied instead.
00294                  
00295         ***********************************************************************/
00296         
00297         IConsumer createConsumer (IChannel channel, IEvent.Style style, 
00298                                   IEventListener notify)
00299         {
00300                 IEvent event = new ClusterEvent (this, channel, style, notify);
00301                 
00302                 if (logger)
00303                     logger.info ("creating " ~ event.getStyleName ~ 
00304                                  " consumer for channel '" ~ channel.getName ~ "'");
00305 
00306                 switch (style)
00307                        {
00308                        case IEvent.Style.Message:
00309                             return new MessageConsumer (this, event);
00310 
00311                        case IEvent.Style.Bulletin:
00312                             return new BulletinConsumer (this, event);
00313                        }     
00314         }
00315 
00316         /***********************************************************************
00317 
00318                 Return a entry from the network cache, and optionally
00319                 remove it. This is a synchronous operation as opposed
00320                 to the asynchronous nature of an invalidate broadcast.
00321 
00322         ***********************************************************************/
00323         
00324         IPayload getCache (IChannel channel, char[] key, bool remove)
00325         {
00326                 Channel c = cast(Channel) channel;
00327                 
00328                 c.writer.put (remove ? ProtocolWriter.Command.Remove : 
00329                               ProtocolWriter.Command.Copy, c.getName, null, key);
00330                 return nodeSet.request (c.writer, c.reader, key);
00331         }
00332 
00333         /***********************************************************************
00334 
00335                 Place an entry into the network cache, replacing the
00336                 entry with the identical key. Note that this may cause
00337                 the oldest entry in the cache to be displaced if the
00338                 cache is already full.
00339 
00340         ***********************************************************************/
00341         
00342         IPayload putCache (IChannel channel, char[] key, IPayload payload)
00343         {
00344                 Channel c = cast(Channel) channel;
00345 
00346                 c.writer.put (ProtocolWriter.Command.Add, c.getName, payload, key);
00347                 return nodeSet.request (c.writer, c.reader, key);
00348         }
00349 
00350         /***********************************************************************
00351 
00352                 Add an entry to the specified network queue. May throw a
00353                 QueueFullException if there's no room available.
00354 
00355         ***********************************************************************/
00356         
00357         IPayload putQueue (IChannel channel, IPayload payload)
00358         {
00359                 Channel c = cast(Channel) channel;
00360                 
00361                 c.writer.put (ProtocolWriter.Command.AddQueue, c.getName, payload);
00362                 nodeSet.request (c.writer, c.reader);
00363                 return payload;
00364         }
00365 
00366 
00367         /***********************************************************************
00368                 
00369                 Query the cluster for queued entries on the corresponding 
00370                 channel. Returns, and removes, the first matching entry 
00371                 from the cluster. Note that this sweeps the cluster for
00372                 matching entries, and is synchronous in nature. The more
00373                 common approach is to setup a queue listener, which will
00374                 grab and dispatch queue entries asynchronously.
00375 
00376         ***********************************************************************/
00377         
00378         IPayload getQueue (IChannel channel)
00379         {
00380                 IPayload payload;
00381 
00382                 Channel c = cast(Channel) channel;
00383 
00384                 // callback for NodeSet.scan()
00385                 bool scan (Node node)
00386                 {
00387                         // ask this node ...
00388                         c.writer.put (ProtocolWriter.Command.RemoveQueue, c.getName);
00389                         node.request (node.cache, c.writer, c.reader, payload);
00390                         return payload !== null;
00391                 }
00392 
00393                 // make a pass over each Node, looking for channel entries
00394                 nodeSet.scan (&scan);
00395                 return payload;
00396         }
00397 
00398         /***********************************************************************
00399                 
00400                 Load a network cache entry remotely. This sends the given
00401                 Payload over the network to the cache host, where it will
00402                 be executed locally. The benefit of doing so it that the
00403                 host may deny access to the cache entry for the duration
00404                 of the load operation. This, in turn, provides an elegant 
00405                 mechanism for gating/synchronizing multiple network clients  
00406                 over a given cache entry; handy for those entries that are 
00407                 relatively expensive to construct or access. 
00408 
00409         ***********************************************************************/
00410         
00411         void loadCache (IChannel channel, char[] key, IPayload payload)
00412         {               
00413                 Channel c = cast(Channel) channel;
00414 
00415                 c.writer.put (ProtocolWriter.Command.OK, c.getName, payload, key);
00416                 Node node = nodeSet.selectNode (key);
00417                 node.request (node.tasks, c.writer, c.reader, payload);
00418         }
00419 
00420         /***********************************************************************
00421 
00422                 Return an internet address representing the multicast
00423                 group for the specified channel. We use three of the
00424                 four address segments to represent the channel itself
00425                 (via a hash on the channel name), and set the primary
00426                 segment to be that of the broadcast prefix (above).
00427 
00428         ***********************************************************************/
00429         
00430         synchronized InternetAddress getGroup (char[] channel)
00431         {
00432                 InternetAddress group = cast(InternetAddress) groups.get (channel);
00433 
00434                 if (group is null)
00435                    {
00436                    char[] address;
00437 
00438                    // construct a group address from the prefix & channel-hash,
00439                    // where the hash is folded down to 24 bits
00440                    uint hash = groups.jhash (channel);
00441                    hash = (hash >> 24) ^ (hash & 0x00ffffff);
00442 
00443                    address = Text.toString (groupPrefix) ~ "." ~
00444                              Text.toString ((hash >> 16) & 0xff) ~ "." ~
00445                              Text.toString ((hash >> 8) & 0xff) ~ "." ~
00446                              Text.toString (hash & 0xff);
00447 
00448                    //printf ("channel '%.*s' == '%.*s'\n", channel, address);
00449         
00450                    // insert InternetAddress into hashmap
00451                    group = new InternetAddress (address, groupPort);
00452                    groups.put (channel, group);
00453                    }
00454                 return group;              
00455         }
00456 }
00457 
00458 
00459 /*******************************************************************************         
00460 
00461         A listener for multicast channel traffic. These are currently used 
00462         for cache coherency, queue publishing, and node discovery activity; 
00463         though could be used for direct messaging also.
00464 
00465 *******************************************************************************/
00466 
00467 private class BulletinConsumer : SocketListener, IConsumer
00468 {
00469         private IEvent                  event;
00470         private Buffer                  buffer;
00471         private ProtocolReader          reader;
00472         private Cluster                 cluster;
00473         private MulticastSocket         consumer;
00474 
00475         /***********************************************************************
00476 
00477                 Construct a multicast consumer for the specified event. The
00478                 event handler will be invoked whenever a message arrives for
00479                 the associated channel.
00480 
00481         ***********************************************************************/
00482         
00483         this (Cluster cluster, IEvent event)
00484         {
00485                 this.event = event;
00486                 this.cluster = cluster;
00487 
00488                 // buffer doesn't need to be larger than Ethernet data-frame
00489                 buffer = new Buffer (1500);
00490 
00491                 // make the reader slice directly from the buffer content
00492                 reader = new ProtocolReader (buffer);
00493                 reader.setAllocator (new BufferAllocator (reader));
00494 
00495                 // configure a listener socket
00496                 consumer = new MulticastSocket;
00497                 consumer.join (cluster.getGroup (event.getChannel.getName));
00498                 
00499                 super (consumer, buffer);
00500 
00501                 // fire up this listener
00502                 start ();
00503         }
00504 
00505         /***********************************************************************
00506 
00507                 Notification callback invoked when we receive a multicast
00508                 packet. Note that we check the packet channel-name against
00509                 the one we're consuming, to check for cases where the group
00510                 address had a hash collision.
00511 
00512         ***********************************************************************/
00513         
00514         override void notify (IBuffer buffer)
00515         {
00516                 ProtocolWriter.Command  cmd;
00517                 char[]                  channel;
00518                 char[]                  element;
00519                 
00520                 IPayload payload = reader.getPayload (channel, element, cmd);
00521                 // printf ("notify '%.*s::%.*s' #'%.*s'\n", channel, element, event.getChannel.getName);
00522 
00523                 // check it's really for us first (might be a hash collision)
00524                 if (channel == event.getChannel.getName)
00525                     invoke (event, payload);
00526         }
00527 
00528         /***********************************************************************
00529 
00530                 Handle error conditions from the listener thread.
00531 
00532         ***********************************************************************/
00533 
00534         override void exception (char [] msg)
00535         {
00536                 cluster.getLogger.error ("BulletinConsumer: "~msg);
00537         }
00538 
00539         /***********************************************************************
00540 
00541                 Overridable mean of notifying the client code.
00542                  
00543         ***********************************************************************/
00544         
00545         protected void invoke (IEvent event, IPayload payload)
00546         {
00547                 event.invoke (payload);
00548         }
00549 
00550         /***********************************************************************
00551 
00552                 Return the cluster instance we're associated with.
00553 
00554         ***********************************************************************/
00555         
00556         Cluster getCluster ()
00557         {
00558                 return cluster;
00559         }
00560 
00561         /***********************************************************************
00562 
00563                 Temporarily halt listening. This can be used to ignore
00564                 multicast messages while, for example, the consumer is
00565                 busy doing other things.
00566 
00567         ***********************************************************************/
00568 
00569         void pauseGroup ()
00570         {
00571                 consumer.pauseGroup ();
00572         }
00573 
00574         /***********************************************************************
00575 
00576                 Resume listening, post-pause.
00577 
00578         ***********************************************************************/
00579 
00580         void resumeGroup ()
00581         {
00582                 consumer.resumeGroup ();
00583         }
00584 
00585         /***********************************************************************
00586 
00587                 Cancel this consumer. The listener is effectively disabled
00588                 from this point forward. The listener thread does not halt
00589                 at this point, but waits until the socket-read returns. 
00590                 Note that the D Interface implementation requires us to 
00591                 "reimplement and dispatch" trivial things like this ~ it's
00592                 a pain in the neck to maintain.
00593 
00594         ***********************************************************************/
00595         
00596         void cancel ()
00597         {
00598                 super.cancel ();
00599         }
00600 }
00601 
00602 
00603 /*******************************************************************************
00604         
00605         A listener for queue events. These events are produced by the 
00606         queue host on a periodic bases when it has available entries.
00607         We listen for them (rather than constantly scanning) and then
00608         begin a sweep to process as many as we can. Note that we will
00609         be in competition with other nodes to process these entries.
00610 
00611 *******************************************************************************/
00612 
00613 private class MessageConsumer : BulletinConsumer
00614 {
00615         /***********************************************************************
00616 
00617                 Construct a multicast consumer for the specified event
00618 
00619         ***********************************************************************/
00620         
00621         this (Cluster cluster, IEvent event)
00622         {
00623                 super (cluster, event);
00624         }
00625 
00626         /***********************************************************************
00627 
00628                 Handle error conditions from the listener thread.
00629 
00630         ***********************************************************************/
00631 
00632         override void exception (char [] msg)
00633         {
00634                 cluster.getLogger.error ("MessageConsumer: "~msg);
00635         }
00636 
00637         /***********************************************************************
00638 
00639                 override the default processing to sweep the cluster for 
00640                 queued entries. Each server node is queried until one is
00641                 found that contains a payload. Note that it is possible 
00642                 to set things up where we are told exactly which node to
00643                 go to; howerver given that we won't be listening whilst
00644                 scanning, and that there's likely to be a group of new
00645                 entries in the cluster, it's just as effective to scan.
00646                 This will be far from ideal for all environments, so we 
00647                 should make the strategy plugable instead.                 
00648 
00649         ***********************************************************************/
00650         
00651         override protected void invoke (IEvent event, IPayload payload)
00652         {
00653                 //temporarilty pause listening
00654                 pauseGroup ();
00655 
00656                 try {
00657                     while ((payload = getCluster.getQueue (event.getChannel)) !== null)
00658                             event.invoke (payload);
00659                     } finally 
00660                             // resume listening
00661                             resumeGroup ();
00662         }
00663 }
00664 
00665 
00666 /*******************************************************************************
00667         
00668         A channel represents something akin to a publish/subscribe topic, 
00669         or a radio station. These are used to segregate cluster operations
00670         into a set of groups, where each group is represented by a channel.
00671         Channel names are whatever you want then to be: use of dot notation 
00672         has proved useful in the past. See Client.createChannel
00673 
00674 *******************************************************************************/
00675 
00676 private class Channel : IChannel
00677 {
00678         char[]                  name;
00679         Buffer                  buffer;
00680         ProtocolReader          reader;
00681         ProtocolWriter          writer;
00682 
00683         /***********************************************************************
00684         
00685                 Construct a channel with the specified name. We cache
00686                 a number of session-related constructs here also, in
00687                 order to eliminate runtime overhead
00688 
00689         ***********************************************************************/
00690 
00691         this (char[] name)
00692         in {
00693            assert (name.length);
00694            }
00695         body
00696         {       
00697                 this.name = name;
00698 
00699                 // this buffer will grow as required to house larger payloads
00700                 buffer = new GrowableBuffer (1024 * 2);
00701                 writer = new ProtocolWriter (buffer);
00702 
00703                 // make the reader slice directly from the buffer content
00704                 reader = new ProtocolReader (buffer);
00705                 reader.setAllocator (new BufferAllocator (reader));
00706         }
00707 
00708         /***********************************************************************
00709         
00710                 Return the name of this channel. This is the name provided
00711                 when the channel was constructed.
00712 
00713         ***********************************************************************/
00714 
00715         char[] getName ()
00716         {
00717                 return name;
00718         }
00719 
00720         /***********************************************************************
00721         
00722                 Output this channel via the provided IWriter
00723 
00724         ***********************************************************************/
00725 
00726         void write (IWriter writer)
00727         {
00728                 writer.put (name.length).put(name);
00729         }
00730 
00731         /***********************************************************************
00732         
00733                 Input this channel via the provided IReader
00734 
00735         ***********************************************************************/
00736 
00737         void read (IReader reader)
00738         {
00739                 name.length = reader.length ();
00740                 reader.get (name);
00741         }
00742 }
00743 
00744 
00745 /*******************************************************************************
00746         
00747         An abstraction of a socket connection. Used internally by the 
00748         socket-based Cluster. 
00749 
00750 *******************************************************************************/
00751 
00752 private class Connection
00753 {
00754         abstract bool reset();
00755 
00756         abstract void done (ulong time);
00757 
00758         abstract SocketConduit getConduit ();
00759 }
00760 
00761 
00762 /*******************************************************************************
00763         
00764         A pool of socket connections for accessing cluster nodes. Note 
00765         that the entries will timeout after a period of inactivity, and
00766         will subsequently cause a connected host to drop the supporting
00767         session.
00768 
00769 *******************************************************************************/
00770 
00771 private class ConnectionPool
00772 { 
00773         private int             count;
00774         private InternetAddress address;
00775         private PoolConnection  freelist;
00776         private const ulong     timeout = 60_000;
00777 
00778         /***********************************************************************
00779         
00780                 Utility class to provide the basic connection facilities
00781                 provided by the connection pool.
00782 
00783         ***********************************************************************/
00784 
00785         class PoolConnection : Connection
00786         {
00787                 ulong           time;
00788                 PoolConnection  next;   
00789                 ConnectionPool  parent;   
00790                 SocketConduit   conduit;
00791 
00792                 /***************************************************************
00793                 
00794                         Construct a new connection and set its parent
00795 
00796                 ***************************************************************/
00797         
00798                 this (ConnectionPool pool)
00799                 {
00800                         parent = pool;
00801                         reset ();
00802                 }
00803                   
00804                 /***************************************************************
00805 
00806                         Return the socket belonging to this connection
00807 
00808                 ***************************************************************/
00809         
00810                 SocketConduit getConduit ()
00811                 {
00812                         return conduit;
00813                 }
00814                   
00815                 /***************************************************************
00816 
00817                         Create a new socket and connect it to the specified 
00818                         server. This will cause a dedicated thread to start 
00819                         on the server. Said thread will quit when an error
00820                         occurs.
00821 
00822                 ***************************************************************/
00823         
00824                 bool reset ()
00825                 {
00826                         try {
00827                             conduit = new SocketConduit;
00828                             conduit.connect (parent.address);
00829 
00830                             // set a 500ms timeout for read operations
00831                             conduit.setTimeout (System.Interval.Millisec * 500);
00832                             return true;
00833 
00834                             } catch (Object)
00835                                     {
00836                                     return false;
00837                                     }
00838                 }
00839                   
00840                 /***************************************************************
00841 
00842                         Close the socket. This will cause any host session
00843                         to be terminated.
00844 
00845                 ***************************************************************/
00846         
00847                 void close ()
00848                 {
00849                         conduit.close ();
00850                 }
00851 
00852                 /***************************************************************
00853 
00854                         Return this connection to the free-list. Note that
00855                         we have to synchronize on the parent-pool itself.
00856 
00857                 ***************************************************************/
00858 
00859                 void done (ulong time)
00860                 {
00861                         synchronized (parent)
00862                                      {
00863                                      next = parent.freelist;
00864                                      parent.freelist = this;
00865                                      this.time = time;
00866                                      }
00867                 }
00868         }
00869 
00870 
00871         /***********************************************************************
00872 
00873                 Create a connection-pool for the specified address.
00874 
00875         ***********************************************************************/
00876 
00877         this (InternetAddress address)
00878         {      
00879                 this.address = address;
00880         }
00881 
00882         /***********************************************************************
00883 
00884                 Allocate a Connection from a list rather than creating a 
00885                 new one. Reap old entries as we go.
00886 
00887         ***********************************************************************/
00888 
00889         synchronized Connection borrow (ulong time)
00890         {  
00891                 if (freelist)
00892                     do {
00893                        PoolConnection c = freelist;
00894 
00895                        freelist = c.next;
00896                        if (freelist && (time - c.time > timeout))
00897                            c.close ();
00898                        else
00899                           return c;
00900                        } while (true);
00901 
00902                 return new PoolConnection (this);
00903         }
00904 
00905         /***********************************************************************
00906 
00907                 Close this pool and drop all existing connections.
00908 
00909         ***********************************************************************/
00910 
00911         synchronized void close ()
00912         {       
00913                 PoolConnection c = freelist;
00914                 freelist = null;
00915                 while (c)
00916                       {
00917                       c.close ();
00918                       c = c.next;
00919                       }
00920         }
00921 }
00922 
00923 
00924 /*******************************************************************************
00925         
00926         Class to represent a cluster node. Each node supports both cache
00927         and queue functionality. Note that the set of available nodes is
00928         configured at startup, simplifying the discovery process in some
00929         significant ways, and causing less thrashing of cache-keys.
00930 
00931 *******************************************************************************/
00932 
00933 private class Node
00934 { 
00935         private char[]                  name,
00936                                         port;
00937         private ILogger                 logger;
00938         private bool                    enabled;
00939 
00940         private ConnectionPool          tasks,
00941                                         cache;
00942 
00943         /***********************************************************************
00944 
00945                 Construct a node with the provided name. This name should
00946                 be the network name of the hosting device.
00947 
00948         ***********************************************************************/
00949         
00950         this (ILogger logger, char[] name)
00951         {
00952                 this.logger = logger;
00953                 this.name = name;
00954         }
00955 
00956         /***********************************************************************
00957 
00958                 Add a cache/queue reference for the remote node
00959                  
00960         ***********************************************************************/
00961 
00962         void setCache (InternetAddress address)
00963         {      
00964                 this.cache = new ConnectionPool (address);
00965                 port = Text.toString (address.port);
00966         }
00967 
00968         /***********************************************************************
00969 
00970                 Add a cache-loader reference for the remote node
00971 
00972         ***********************************************************************/
00973 
00974         void setTasks (InternetAddress address)
00975         {      
00976                 this.tasks = new ConnectionPool (address);
00977         }
00978 
00979         /***********************************************************************
00980 
00981                 Return the name of this node (the network name of the device)
00982 
00983         ***********************************************************************/
00984         
00985         override char[] toString ()
00986         {
00987                 return name;
00988         }
00989 
00990         /***********************************************************************
00991 
00992                 Remove this Node from the cluster. The node is disabled
00993                 until it is seen to recover.
00994 
00995         ***********************************************************************/
00996 
00997         void fail ()
00998         {       
00999                 setEnabled (false);
01000                 cache.close ();    
01001                 tasks.close ();    
01002         }
01003 
01004         /***********************************************************************
01005 
01006                 Get the current state of this node
01007 
01008         ***********************************************************************/
01009 
01010         bool isEnabled ()
01011         {      
01012                 volatile  
01013                        return enabled;    
01014         }
01015 
01016         /***********************************************************************
01017 
01018                 Set the enabled state of this node
01019 
01020         ***********************************************************************/
01021 
01022         void setEnabled (bool enabled)
01023         {      
01024                 if (logger && logger.isEnabled (logger.Level.Trace))
01025                    {
01026                    if (enabled)
01027                        logger.trace ("enabling node '"~name~"' on port "~port);
01028                    else
01029                       logger.trace ("disabling node '"~name~"'");
01030                    }
01031                 volatile  
01032                        this.enabled = enabled;    
01033         }
01034 
01035         /***********************************************************************
01036 
01037                 request data; fail this Node if we can't connect. Note
01038                 that we make several attempts to connect before writing
01039                 the node off as a failure.
01040                 
01041         ***********************************************************************/
01042         
01043         bool request (ConnectionPool pool, ProtocolWriter writer, ProtocolReader reader, out IPayload payload)
01044         {       
01045                 ProtocolWriter.Command  cmd;
01046                 ulong                   time;
01047                 char[]                  channel;
01048                 char[]                  element;
01049                 Connection              connect;
01050 
01051                 // it's possible that the pool may have failed between 
01052                 // the point of selecting it, and the invocation itself
01053                 if (pool is null)
01054                     return false;
01055 
01056                 // get a connection to the server
01057                 connect = pool.borrow (time = System.getMillisecs);
01058 
01059                 // talk to the server (try a few times if necessary)
01060                 for (int attempts=3; attempts--;)
01061                      try {
01062                          // attach connection to reader and writer 
01063                          writer.getBuffer.setConduit (connect.getConduit);
01064                          reader.getBuffer.setConduit (connect.getConduit);
01065         
01066                          // send request
01067                          writer.flush ();
01068 
01069                          // load the returned object (don't retry!)
01070                          attempts = 0;
01071                          payload = reader.getPayload (channel, element, cmd);
01072 
01073                          // return borrowed connection
01074                          connect.done (time);
01075 
01076                          } catch (PickleException x)
01077                                  {
01078                                  connect.done (time);
01079                                  throw x;
01080                                  }
01081                            catch (IOException x)
01082                                   // attempt to reconnect?
01083                                   if (attempts == 0 || !connect.reset)
01084                                      {
01085                                      // that server is offline
01086                                      fail ();
01087   
01088                                      // state that we failed
01089                                      return false;
01090                                      }
01091                     
01092                 // is payload an exception?
01093                 if (cmd != ProtocolWriter.Command.OK)                       
01094                    {
01095                    // is node full?
01096                    if (cmd == ProtocolWriter.Command.Full)                       
01097                        throw new ClusterFullException (channel);
01098 
01099                    // did node barf?
01100                    if (cmd == ProtocolWriter.Command.Exception)                       
01101                        throw new ClusterException (channel);
01102                         
01103                    // bogus response
01104                    throw new ClusterException ("invalid response from cluster server");
01105                    }
01106 
01107                 return true;
01108         }
01109 }
01110 
01111 
01112 /*******************************************************************************
01113         
01114         Models a set of remote cluster nodes.
01115 
01116 *******************************************************************************/
01117 
01118 private class NodeSet
01119 { 
01120         private HashMap         map;
01121         private Node[]          nodes;
01122         private Node[]          random;
01123         private ILogger         logger;
01124 
01125         /***********************************************************************
01126 
01127         ***********************************************************************/
01128         
01129         this (ILogger logger)
01130         {
01131                 this.logger = logger;
01132                 map = new HashMap (128, 0.75, 4);
01133         }
01134 
01135         /***********************************************************************
01136 
01137         ***********************************************************************/
01138         
01139         ILogger getLogger ()
01140         {
01141                 return logger;
01142         }
01143 
01144         /***********************************************************************
01145 
01146                 Add a node to the list of servers, and sort them such that
01147                 all clients will have the same ordered set
01148 
01149         ***********************************************************************/
01150         
01151         synchronized void addNode (Node node)
01152         {
01153                 char[]  name = node.toString;
01154 
01155                 if (map.get (name))
01156                     throw new ClusterException ("Attempt to add cluster node '"~name~"' more than once");
01157 
01158                 map.put (name, node);
01159                 nodes ~= node;
01160                 //nodes.sort;
01161         }
01162 
01163         /***********************************************************************
01164 
01165         ***********************************************************************/
01166         
01167         void optimize ()
01168         {
01169                 // nodes are already in the same order across the cluster
01170                 // since the configuration file should be identical ...
01171 
01172                 // nodes.sort;
01173                 
01174                 // copy the node list
01175                 random = nodes.dup;
01176 
01177                 // muddle up the duplicate list. This randomized list
01178                 // is used when scanning the cluster for queued entries
01179                 foreach (int i, Node n; random)
01180                         {
01181                         int j = Random.get (random.length);
01182                         Node tmp = random[i];
01183                         random[i] = random[j];
01184                         random[j] = tmp;
01185                         }
01186         }
01187 
01188         /***********************************************************************
01189 
01190         ***********************************************************************/
01191         
01192         synchronized void enable (char[] name, ushort port1, ushort port2)
01193         {
01194                 Node node = cast(Node) map.get (name);
01195                 if (node is null)
01196                     throw new ClusterException ("Attempt to enable unknown cluster node '"~name~"'");
01197                 else
01198                    if (! node.isEnabled)
01199                       {
01200                       node.setCache (new InternetAddress (name, port1));
01201                       node.setTasks (new InternetAddress (name, port2));
01202                       node.setEnabled (true);
01203                       }
01204         }
01205 
01206         /***********************************************************************
01207 
01208         ***********************************************************************/
01209         
01210         IPayload request (ProtocolWriter writer, ProtocolReader reader, char[] key = null)
01211         {
01212                 Node     node;
01213                 IPayload payload;
01214 
01215                 do {
01216                    node = selectNode (key);
01217                    } while (! node.request (node.cache, writer, reader, payload));
01218                 return payload;
01219         }
01220 
01221         /***********************************************************************
01222 
01223                 Select a cluster server based on a starting index. If the
01224                 selected server is not currently enabled, we just try the
01225                 next one. This behaviour should be consistent across each
01226                 cluster client.
01227 
01228         ***********************************************************************/
01229         
01230         private final Node selectNode (uint index)
01231         {
01232                 uint i = nodes.length;
01233 
01234                 if (i)
01235                    {
01236                    index %= i;
01237 
01238                    while (i--)
01239                          {
01240                          Node node = nodes[index];
01241                          if (node.isEnabled)
01242                              return node;
01243         
01244                          if (++index >= nodes.length)
01245                              index = 0;
01246                          }
01247                    }
01248                 throw new ClusterEmptyException ("No cluster servers are available"); 
01249         }
01250 
01251         /***********************************************************************
01252 
01253                 Select a cluster server based on the specified key. If the
01254                 selected server is not currently enabled, we just try the
01255                 next one. This behaviour should be consistent across each
01256                 cluster client.
01257 
01258         ***********************************************************************/
01259         
01260         final Node selectNode (char[] key = null)
01261         {
01262                 static uint index;
01263               
01264                 if (key.length)
01265                     return selectNode (HashMap.jhash (key));
01266 
01267                 // no key provided, so just roll the counter
01268                 return selectNode (++index);
01269         }
01270 
01271         /***********************************************************************
01272 
01273                 Sweep the cluster servers. Returns true if the delegate
01274                 returns true, false otherwise. The sweep is aborted when
01275                 the delegate returns true. Note that this scans nodes in
01276                 a randomized pattern, which should tend to avoid 'bursty'
01277                 activity by a set of clients upon any one cluster server.
01278 
01279         ***********************************************************************/
01280         
01281         bool scan (bool delegate (Node) dg)
01282         {
01283                 Node    node;
01284                 int     index = nodes.length;
01285 
01286                 while (index--)
01287                       {
01288                       // lookup the randomized set of server nodes
01289                       node = random [index];
01290 
01291                       // callback on each enabled node
01292                       if (node.isEnabled)
01293                           if (dg (node))
01294                               return true;
01295                       }
01296                 return false;
01297         }
01298 }
01299 
01300  
01301 

Generated on Sun Nov 7 19:06:50 2004 for Mango by doxygen 1.3.6