00001 /******************************************************************************* 00002 00003 @file Cluster.d 00004 00005 Copyright (c) 2004 Kris Bell 00006 00007 This software is provided 'as-is', without any express or implied 00008 warranty. In no event will the authors be held liable for damages 00009 of any kind arising from the use of this software. 00010 00011 Permission is hereby granted to anyone to use this software for any 00012 purpose, including commercial applications, and to alter it and/or 00013 redistribute it freely, subject to the following restrictions: 00014 00015 1. The origin of this software must not be misrepresented; you must 00016 not claim that you wrote the original software. If you use this 00017 software in a product, an acknowledgment within documentation of 00018 said product would be appreciated but is not required. 00019 00020 2. Altered source versions must be plainly marked as such, and must 00021 not be misrepresented as being the original software. 00022 00023 3. This notice may not be removed or altered from any distribution 00024 of the source. 00025 00026 4. Derivative works are permitted, but they must carry this notice 00027 in full and credit the original source. 00028 00029 00030 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 00031 00032 00033 @version Initial version, July 2004 00034 @author Kris 00035 00036 00037 *******************************************************************************/ 00038 00039 module mango.cluster.qos.socket.Cluster; 00040 00041 private import mango.cache.HashMap; 00042 00043 private import mango.utils.Random; 00044 00045 private import mango.convert.Integer; 00046 00047 private import mango.sys.System; 00048 00049 private import mango.io.Buffer, 00050 mango.io.Socket, 00051 mango.io.Exception, 00052 mango.io.Properties, 00053 mango.io.GrowBuffer, 00054 mango.io.ArrayAllocator, 00055 mango.io.SocketConduit, 00056 mango.io.SocketListener, 00057 mango.io.MulticastSocket; 00058 00059 private import mango.io.model.IConduit; 00060 00061 private import mango.log.model.ILogger; 00062 00063 private import mango.cluster.Client; 00064 00065 public import mango.cluster.model.ICluster; 00066 00067 private import mango.cluster.qos.socket.RollCall, 00068 mango.cluster.qos.socket.ClusterEvent, 00069 mango.cluster.qos.socket.ProtocolReader, 00070 mango.cluster.qos.socket.ProtocolWriter; 00071 00072 00073 /******************************************************************************* 00074 00075 QOS implementation for sockets. All cluster-client activity is 00076 gated through here by the higher level classes; NetworkQueue & 00077 NetworkCache for example. You gain access to the cluster by 00078 creating an instance of the QOS (quality of service) you desire 00079 and mapping client classes onto it. For example: 00080 00081 @code 00082 import mango.cluster.NetworkCache; 00083 import mango.cluster.qos.socket.Cluster; 00084 00085 ICluster cluster = new Cluster (...); 00086 NetworkCache cache = new NetworkCache (cluster, ...); 00087 00088 cache.put (...); 00089 cache.get (...); 00090 cache.invalidate (...); 00091 @endcode 00092 00093 Please see the cluster clients for additional details. Currently 00094 these include CacheInvalidator, CacheInvalidatee, NetworkMessage, 00095 NetworkTask, NetworkQueue, NetworkCache, NetworkCombo, plus the 00096 Client base-class. 00097 00098 *******************************************************************************/ 00099 00100 class Cluster : ICluster, IEventListener 00101 { 00102 private static HashMap groups; 00103 private ILogger logger; 00104 private NodeSet nodeSet; 00105 private Buffer mBuffer; 00106 private ProtocolWriter mWriter; 00107 private MulticastSocket mSocket; 00108 00109 private int groupTTL = 1; 00110 private int groupPort = 3333; 00111 private int groupPrefix = 225; 00112 00113 /*********************************************************************** 00114 00115 Setup a hashmap for multicast group addresses 00116 00117 ***********************************************************************/ 00118 00119 static this () 00120 { 00121 groups = new HashMap (128, 0.75, 2); 00122 } 00123 00124 /*********************************************************************** 00125 00126 Setup a Cluster instance. Currently the buffer & writer 00127 are shared for all bulletin serialization; this should 00128 probably change at some point such that we can support 00129 multiple threads broadcasting concurrently to different 00130 output ports. 00131 00132 ***********************************************************************/ 00133 00134 this (ILogger logger = null) 00135 { 00136 this.logger = logger; 00137 nodeSet = new NodeSet (logger); 00138 mBuffer = new Buffer (1024 * 4); 00139 mSocket = new MulticastSocket; 00140 mWriter = new ProtocolWriter (mBuffer); 00141 } 00142 00143 /*********************************************************************** 00144 00145 Setup a Cluster instance. Currently the buffer & writer 00146 are shared for all bulletin serialization; this should 00147 probably change at some point such that we can support 00148 multiple threads broadcasting concurrently to different 00149 output ports. 00150 00151 ***********************************************************************/ 00152 00153 this (ILogger logger, IConduit conduit) 00154 in { 00155 assert (logger); 00156 assert (conduit); 00157 } 00158 body 00159 { 00160 this (logger); 00161 00162 // callback for loading cluster configuration 00163 void loader (char[] name, char[] value) 00164 { 00165 logger.info ("cluster config: "~name~" = "~value); 00166 if (name == "node") 00167 nodeSet.addNode (new Node (logger, value)); 00168 else 00169 if (name == "multicast_port") 00170 groupPort = cast(int) Integer.parse (value); 00171 else 00172 if (name == "multicast_prefix") 00173 groupPrefix = cast(int) Integer.parse (value); 00174 else 00175 if (name == "multicast_ttl") 00176 groupTTL = cast(int) Integer.parse (value); 00177 else 00178 throw new ClusterException ("Unrecognized attribute '"~name~"' in socket.Cluster configuration"); 00179 } 00180 00181 00182 // load up the cluster configuration 00183 Properties.load (conduit, &loader); 00184 00185 // finalize nodeSet 00186 nodeSet.optimize (); 00187 00188 // listen for cluster servers 00189 IChannel channel = createChannel ("cluster.server.advertise"); 00190 createConsumer (channel, IEvent.Style.Bulletin, this); 00191 00192 // ask who's currently running 00193 logger.trace ("discovering active cluster servers ..."); 00194 broadcast (channel, new RollCall (Socket.hostName(), 0, 0, true)); 00195 00196 // wait for enabled servers to respond ... 00197 System.sleep (System.Interval.Millisec * 250); 00198 } 00199 00200 /*********************************************************************** 00201 00202 Setup a Cluster instance. Currently the buffer & writer 00203 are shared for all bulletin serialization; this should 00204 probably change at some point such that we can support 00205 multiple threads broadcasting concurrently to different 00206 output ports. 00207 00208 ***********************************************************************/ 00209 00210 this (ILogger logger, uint serverPort) 00211 in { 00212 assert (logger); 00213 assert (serverPort > 1024); 00214 } 00215 body 00216 { 00217 this (logger); 00218 00219 Node node = new Node (logger, "local"); 00220 node.setCache (new InternetAddress ("localhost", serverPort)); 00221 node.setTasks (new InternetAddress ("localhost", serverPort+1)); 00222 node.setEnabled (true); 00223 nodeSet.addNode (node); 00224 nodeSet.optimize (); 00225 } 00226 00227 /*********************************************************************** 00228 00229 IEventListener interface method for listening to RollCall 00230 responses. These are sent out by cluster servers both when 00231 they get a RollCall request, and when they begin execution. 00232 00233 ***********************************************************************/ 00234 00235 void notify (IEvent event, IPayload payload) 00236 { 00237 RollCall rollcall = cast(RollCall) payload; 00238 00239 // ignore requests from clients (we're on a common channel) 00240 if (! rollcall.request) 00241 nodeSet.enable (rollcall.name, 00242 cast(ushort) rollcall.port1, 00243 cast(ushort) rollcall.port2); 00244 } 00245 00246 /*********************************************************************** 00247 00248 Create a channel instance. Our channel implementation 00249 includes a number of cached IO helpers (ProtolcolWriter 00250 and so on) which simplifies and speeds up execution. 00251 00252 ***********************************************************************/ 00253 00254 IChannel createChannel (char[] channel) 00255 { 00256 return new Channel (channel); 00257 } 00258 00259 /*********************************************************************** 00260 00261 Return the logger instance provided during construction. 00262 00263 ***********************************************************************/ 00264 00265 ILogger getLogger () 00266 { 00267 return logger; 00268 } 00269 00270 /*********************************************************************** 00271 00272 Broadcast a payload on the specified channel. This uses 00273 IP/Multicast to scatter the payload to all registered 00274 listeners (on the same multicast group). Note that the 00275 maximum payload size is limited to that of an Ethernet 00276 data frame, minus the IP/UDP header size (1472 bytes). 00277 00278 ***********************************************************************/ 00279 00280 synchronized void broadcast (IChannel channel, IPayload payload = null) 00281 { 00282 // serialize content 00283 mBuffer.clear (); 00284 mWriter.put (ProtocolWriter.Command.OK, channel.getName, payload); 00285 00286 // Ethernet data-frame size minus the 28 byte UDP/IP header: 00287 if (mBuffer.getPosition > 1472) 00288 throw new ClusterException ("payload is too large to broadcast"); 00289 00290 // send it to the appropriate multicast group 00291 mSocket.write (mBuffer, getGroup (channel.getName)); 00292 } 00293 00294 /*********************************************************************** 00295 00296 Create a listener of the specified type. Listeners are 00297 run within their own thread, since they spend the vast 00298 majority of their time blocked on a Socket read. Would 00299 be good to support multiplexed reading instead, such 00300 that a thread pool could be applied instead. 00301 00302 ***********************************************************************/ 00303 00304 IConsumer createConsumer (IChannel channel, IEvent.Style style, 00305 IEventListener notify) 00306 { 00307 IEvent event = new ClusterEvent (this, channel, style, notify); 00308 00309 if (logger) 00310 logger.info ("creating " ~ event.getStyleName ~ 00311 " consumer for channel '" ~ channel.getName ~ "'"); 00312 00313 switch (style) 00314 { 00315 case IEvent.Style.Message: 00316 return new MessageConsumer (this, event); 00317 00318 case IEvent.Style.Bulletin: 00319 return new BulletinConsumer (this, event); 00320 00321 default: 00322 throw new ClusterException ("Invalid consumer style"); 00323 } 00324 return null; 00325 } 00326 00327 /*********************************************************************** 00328 00329 Return a entry from the network cache, and optionally 00330 remove it. This is a synchronous operation as opposed 00331 to the asynchronous nature of an invalidate broadcast. 00332 00333 ***********************************************************************/ 00334 00335 IPayload getCache (IChannel channel, char[] key, bool remove) 00336 { 00337 Channel c = cast(Channel) channel; 00338 00339 c.writer.put (remove ? ProtocolWriter.Command.Remove : 00340 ProtocolWriter.Command.Copy, c.getName, null, key); 00341 return nodeSet.request (c.writer, c.reader, key); 00342 } 00343 00344 /*********************************************************************** 00345 00346 Place an entry into the network cache, replacing the 00347 entry with the identical key. Note that this may cause 00348 the oldest entry in the cache to be displaced if the 00349 cache is already full. 00350 00351 ***********************************************************************/ 00352 00353 IPayload putCache (IChannel channel, char[] key, IPayload payload) 00354 { 00355 Channel c = cast(Channel) channel; 00356 00357 c.writer.put (ProtocolWriter.Command.Add, c.getName, payload, key); 00358 return nodeSet.request (c.writer, c.reader, key); 00359 } 00360 00361 /*********************************************************************** 00362 00363 Add an entry to the specified network queue. May throw a 00364 QueueFullException if there's no room available. 00365 00366 ***********************************************************************/ 00367 00368 IPayload putQueue (IChannel channel, IPayload payload) 00369 { 00370 Channel c = cast(Channel) channel; 00371 00372 c.writer.put (ProtocolWriter.Command.AddQueue, c.getName, payload); 00373 nodeSet.request (c.writer, c.reader); 00374 return payload; 00375 } 00376 00377 00378 /*********************************************************************** 00379 00380 Query the cluster for queued entries on the corresponding 00381 channel. Returns, and removes, the first matching entry 00382 from the cluster. Note that this sweeps the cluster for 00383 matching entries, and is synchronous in nature. The more 00384 common approach is to setup a queue listener, which will 00385 grab and dispatch queue entries asynchronously. 00386 00387 ***********************************************************************/ 00388 00389 IPayload getQueue (IChannel channel) 00390 { 00391 IPayload payload; 00392 00393 Channel c = cast(Channel) channel; 00394 00395 // callback for NodeSet.scan() 00396 bool scan (Node node) 00397 { 00398 // ask this node ... 00399 c.writer.put (ProtocolWriter.Command.RemoveQueue, c.getName); 00400 node.request (node.cache, c.writer, c.reader, payload); 00401 return cast(bool) (payload !is null); 00402 } 00403 00404 // make a pass over each Node, looking for channel entries 00405 nodeSet.scan (&scan); 00406 return payload; 00407 } 00408 00409 /*********************************************************************** 00410 00411 Load a network cache entry remotely. This sends the given 00412 Payload over the network to the cache host, where it will 00413 be executed locally. The benefit of doing so it that the 00414 host may deny access to the cache entry for the duration 00415 of the load operation. This, in turn, provides an elegant 00416 mechanism for gating/synchronizing multiple network clients 00417 over a given cache entry; handy for those entries that are 00418 relatively expensive to construct or access. 00419 00420 ***********************************************************************/ 00421 00422 void loadCache (IChannel channel, char[] key, IPayload payload) 00423 { 00424 Channel c = cast(Channel) channel; 00425 00426 c.writer.put (ProtocolWriter.Command.OK, c.getName, payload, key); 00427 Node node = nodeSet.selectNode (key); 00428 node.request (node.tasks, c.writer, c.reader, payload); 00429 } 00430 00431 /*********************************************************************** 00432 00433 Return an internet address representing the multicast 00434 group for the specified channel. We use three of the 00435 four address segments to represent the channel itself 00436 (via a hash on the channel name), and set the primary 00437 segment to be that of the broadcast prefix (above). 00438 00439 ***********************************************************************/ 00440 00441 synchronized InternetAddress getGroup (char[] channel) 00442 { 00443 InternetAddress group = cast(InternetAddress) groups.get (channel); 00444 00445 if (group is null) 00446 { 00447 // construct a group address from the prefix & channel-hash, 00448 // where the hash is folded down to 24 bits 00449 uint hash = groups.jhash (channel); 00450 hash = (hash >> 24) ^ (hash & 0x00ffffff); 00451 00452 char[] address = Integer.format (new char[5], groupPrefix) ~ "." ~ 00453 Integer.format (new char[5], (hash >> 16) & 0xff) ~ "." ~ 00454 Integer.format (new char[5], (hash >> 8) & 0xff) ~ "." ~ 00455 Integer.format (new char[5], hash & 0xff); 00456 00457 //printf ("channel '%.*s' == '%.*s'\n", channel, address); 00458 00459 // insert InternetAddress into hashmap 00460 group = new InternetAddress (address, groupPort); 00461 groups.put (channel, group); 00462 } 00463 return group; 00464 } 00465 } 00466 00467 00468 /******************************************************************************* 00469 00470 A listener for multicast channel traffic. These are currently used 00471 for cache coherency, queue publishing, and node discovery activity; 00472 though could be used for direct messaging also. 00473 00474 *******************************************************************************/ 00475 00476 private class BulletinConsumer : SocketListener, IConsumer 00477 { 00478 private IEvent event; 00479 private Buffer buffer; 00480 private ProtocolReader reader; 00481 private Cluster cluster; 00482 private MulticastSocket consumer; 00483 00484 /*********************************************************************** 00485 00486 Construct a multicast consumer for the specified event. The 00487 event handler will be invoked whenever a message arrives for 00488 the associated channel. 00489 00490 ***********************************************************************/ 00491 00492 this (Cluster cluster, IEvent event) 00493 { 00494 this.event = event; 00495 this.cluster = cluster; 00496 00497 // buffer doesn't need to be larger than Ethernet data-frame 00498 buffer = new Buffer (1500); 00499 00500 // make the reader slice directly from the buffer content 00501 reader = new ProtocolReader (buffer); 00502 reader.setAllocator (new BufferAllocator); 00503 00504 // configure a listener socket 00505 consumer = new MulticastSocket; 00506 consumer.join (cluster.getGroup (event.getChannel.getName)); 00507 00508 super (consumer, buffer); 00509 00510 // fire up this listener 00511 start (); 00512 } 00513 00514 /*********************************************************************** 00515 00516 Notification callback invoked when we receive a multicast 00517 packet. Note that we check the packet channel-name against 00518 the one we're consuming, to check for cases where the group 00519 address had a hash collision. 00520 00521 ***********************************************************************/ 00522 00523 override void notify (IBuffer buffer) 00524 { 00525 ProtocolWriter.Command cmd; 00526 char[] channel; 00527 char[] element; 00528 00529 IPayload payload = reader.getPayload (channel, element, cmd); 00530 // printf ("notify '%.*s::%.*s' #'%.*s'\n", channel, element, event.getChannel.getName); 00531 00532 // check it's really for us first (might be a hash collision) 00533 if (channel == event.getChannel.getName) 00534 invoke (event, payload); 00535 } 00536 00537 /*********************************************************************** 00538 00539 Handle error conditions from the listener thread. 00540 00541 ***********************************************************************/ 00542 00543 override void exception (char [] msg) 00544 { 00545 cluster.getLogger.error ("BulletinConsumer: "~msg); 00546 } 00547 00548 /*********************************************************************** 00549 00550 Overridable mean of notifying the client code. 00551 00552 ***********************************************************************/ 00553 00554 protected void invoke (IEvent event, IPayload payload) 00555 { 00556 event.invoke (payload); 00557 } 00558 00559 /*********************************************************************** 00560 00561 Return the cluster instance we're associated with. 00562 00563 ***********************************************************************/ 00564 00565 Cluster getCluster () 00566 { 00567 return cluster; 00568 } 00569 00570 /*********************************************************************** 00571 00572 Temporarily halt listening. This can be used to ignore 00573 multicast messages while, for example, the consumer is 00574 busy doing other things. 00575 00576 ***********************************************************************/ 00577 00578 void pauseGroup () 00579 { 00580 consumer.pauseGroup (); 00581 } 00582 00583 /*********************************************************************** 00584 00585 Resume listening, post-pause. 00586 00587 ***********************************************************************/ 00588 00589 void resumeGroup () 00590 { 00591 consumer.resumeGroup (); 00592 } 00593 00594 /*********************************************************************** 00595 00596 Cancel this consumer. The listener is effectively disabled 00597 from this point forward. The listener thread does not halt 00598 at this point, but waits until the socket-read returns. 00599 Note that the D Interface implementation requires us to 00600 "reimplement and dispatch" trivial things like this ~ it's 00601 a pain in the neck to maintain. 00602 00603 ***********************************************************************/ 00604 00605 void cancel () 00606 { 00607 super.cancel (); 00608 } 00609 } 00610 00611 00612 /******************************************************************************* 00613 00614 A listener for queue events. These events are produced by the 00615 queue host on a periodic bases when it has available entries. 00616 We listen for them (rather than constantly scanning) and then 00617 begin a sweep to process as many as we can. Note that we will 00618 be in competition with other nodes to process these entries. 00619 00620 *******************************************************************************/ 00621 00622 private class MessageConsumer : BulletinConsumer 00623 { 00624 /*********************************************************************** 00625 00626 Construct a multicast consumer for the specified event 00627 00628 ***********************************************************************/ 00629 00630 this (Cluster cluster, IEvent event) 00631 { 00632 super (cluster, event); 00633 } 00634 00635 /*********************************************************************** 00636 00637 Handle error conditions from the listener thread. 00638 00639 ***********************************************************************/ 00640 00641 override void exception (char [] msg) 00642 { 00643 cluster.getLogger.error ("MessageConsumer: "~msg); 00644 } 00645 00646 /*********************************************************************** 00647 00648 override the default processing to sweep the cluster for 00649 queued entries. Each server node is queried until one is 00650 found that contains a payload. Note that it is possible 00651 to set things up where we are told exactly which node to 00652 go to; howerver given that we won't be listening whilst 00653 scanning, and that there's likely to be a group of new 00654 entries in the cluster, it's just as effective to scan. 00655 This will be far from ideal for all environments, so we 00656 should make the strategy plugable instead. 00657 00658 ***********************************************************************/ 00659 00660 override protected void invoke (IEvent event, IPayload payload) 00661 { 00662 //temporarilty pause listening 00663 pauseGroup (); 00664 00665 try { 00666 while ((payload = getCluster.getQueue (event.getChannel)) !is null) 00667 event.invoke (payload); 00668 } finally 00669 // resume listening 00670 resumeGroup (); 00671 } 00672 } 00673 00674 00675 /******************************************************************************* 00676 00677 A channel represents something akin to a publish/subscribe topic, 00678 or a radio station. These are used to segregate cluster operations 00679 into a set of groups, where each group is represented by a channel. 00680 Channel names are whatever you want then to be: use of dot notation 00681 has proved useful in the past. See Client.createChannel 00682 00683 *******************************************************************************/ 00684 00685 private class Channel : IChannel 00686 { 00687 char[] name; 00688 GrowBuffer buffer; 00689 ProtocolReader reader; 00690 ProtocolWriter writer; 00691 00692 /*********************************************************************** 00693 00694 Construct a channel with the specified name. We cache 00695 a number of session-related constructs here also, in 00696 order to eliminate runtime overhead 00697 00698 ***********************************************************************/ 00699 00700 this (char[] name) 00701 in { 00702 assert (name.length); 00703 } 00704 body 00705 { 00706 this.name = name; 00707 00708 // this buffer will grow as required to house larger payloads 00709 buffer = new GrowBuffer (1024 * 2); 00710 writer = new ProtocolWriter (buffer); 00711 00712 // make the reader slice directly from the buffer content 00713 reader = new ProtocolReader (buffer); 00714 reader.setAllocator (new BufferAllocator); 00715 } 00716 00717 /*********************************************************************** 00718 00719 Return the name of this channel. This is the name provided 00720 when the channel was constructed. 00721 00722 ***********************************************************************/ 00723 00724 char[] getName () 00725 { 00726 return name; 00727 } 00728 00729 /*********************************************************************** 00730 00731 Output this channel via the provided IWriter 00732 00733 ***********************************************************************/ 00734 00735 void write (IWriter writer) 00736 { 00737 writer.put (name); 00738 } 00739 00740 /*********************************************************************** 00741 00742 Input this channel via the provided IReader 00743 00744 ***********************************************************************/ 00745 00746 void read (IReader reader) 00747 { 00748 reader.get (name); 00749 } 00750 } 00751 00752 00753 /******************************************************************************* 00754 00755 An abstraction of a socket connection. Used internally by the 00756 socket-based Cluster. 00757 00758 *******************************************************************************/ 00759 00760 private class Connection 00761 { 00762 abstract bool reset(); 00763 00764 abstract void done (ulong time); 00765 00766 abstract SocketConduit getConduit (); 00767 } 00768 00769 00770 /******************************************************************************* 00771 00772 A pool of socket connections for accessing cluster nodes. Note 00773 that the entries will timeout after a period of inactivity, and 00774 will subsequently cause a connected host to drop the supporting 00775 session. 00776 00777 *******************************************************************************/ 00778 00779 private class ConnectionPool 00780 { 00781 private int count; 00782 private InternetAddress address; 00783 private PoolConnection freelist; 00784 private const ulong timeout = 60_000; 00785 00786 /*********************************************************************** 00787 00788 Utility class to provide the basic connection facilities 00789 provided by the connection pool. 00790 00791 ***********************************************************************/ 00792 00793 class PoolConnection : Connection 00794 { 00795 ulong time; 00796 PoolConnection next; 00797 ConnectionPool parent; 00798 SocketConduit conduit; 00799 00800 /*************************************************************** 00801 00802 Construct a new connection and set its parent 00803 00804 ***************************************************************/ 00805 00806 this (ConnectionPool pool) 00807 { 00808 parent = pool; 00809 reset (); 00810 } 00811 00812 /*************************************************************** 00813 00814 Return the socket belonging to this connection 00815 00816 ***************************************************************/ 00817 00818 SocketConduit getConduit () 00819 { 00820 return conduit; 00821 } 00822 00823 /*************************************************************** 00824 00825 Create a new socket and connect it to the specified 00826 server. This will cause a dedicated thread to start 00827 on the server. Said thread will quit when an error 00828 occurs. 00829 00830 ***************************************************************/ 00831 00832 bool reset () 00833 { 00834 try { 00835 conduit = new SocketConduit (false); 00836 conduit.connect (parent.address); 00837 00838 // set a 500ms timeout for read operations 00839 conduit.setTimeout (System.Interval.Millisec * 500); 00840 return true; 00841 00842 } catch (Object) 00843 { 00844 return false; 00845 } 00846 } 00847 00848 /*************************************************************** 00849 00850 Close the socket. This will cause any host session 00851 to be terminated. 00852 00853 ***************************************************************/ 00854 00855 void close () 00856 { 00857 conduit.close (); 00858 } 00859 00860 /*************************************************************** 00861 00862 Return this connection to the free-list. Note that 00863 we have to synchronize on the parent-pool itself. 00864 00865 ***************************************************************/ 00866 00867 void done (ulong time) 00868 { 00869 synchronized (parent) 00870 { 00871 next = parent.freelist; 00872 parent.freelist = this; 00873 this.time = time; 00874 } 00875 } 00876 } 00877 00878 00879 /*********************************************************************** 00880 00881 Create a connection-pool for the specified address. 00882 00883 ***********************************************************************/ 00884 00885 this (InternetAddress address) 00886 { 00887 this.address = address; 00888 } 00889 00890 /*********************************************************************** 00891 00892 Allocate a Connection from a list rather than creating a 00893 new one. Reap old entries as we go. 00894 00895 ***********************************************************************/ 00896 00897 synchronized Connection borrow (ulong time) 00898 { 00899 if (freelist) 00900 do { 00901 PoolConnection c = freelist; 00902 00903 freelist = c.next; 00904 if (freelist && (time - c.time > timeout)) 00905 c.close (); 00906 else 00907 return c; 00908 } while (true); 00909 00910 return new PoolConnection (this); 00911 } 00912 00913 /*********************************************************************** 00914 00915 Close this pool and drop all existing connections. 00916 00917 ***********************************************************************/ 00918 00919 synchronized void close () 00920 { 00921 PoolConnection c = freelist; 00922 freelist = null; 00923 while (c) 00924 { 00925 c.close (); 00926 c = c.next; 00927 } 00928 } 00929 } 00930 00931 00932 /******************************************************************************* 00933 00934 Class to represent a cluster node. Each node supports both cache 00935 and queue functionality. Note that the set of available nodes is 00936 configured at startup, simplifying the discovery process in some 00937 significant ways, and causing less thrashing of cache-keys. 00938 00939 *******************************************************************************/ 00940 00941 private class Node 00942 { 00943 private char[] name, 00944 port; 00945 private ILogger logger; 00946 private bool enabled; 00947 00948 private ConnectionPool tasks, 00949 cache; 00950 00951 /*********************************************************************** 00952 00953 Construct a node with the provided name. This name should 00954 be the network name of the hosting device. 00955 00956 ***********************************************************************/ 00957 00958 this (ILogger logger, char[] name) 00959 { 00960 this.logger = logger; 00961 this.name = name; 00962 } 00963 00964 /*********************************************************************** 00965 00966 Add a cache/queue reference for the remote node 00967 00968 ***********************************************************************/ 00969 00970 void setCache (InternetAddress address) 00971 { 00972 this.cache = new ConnectionPool (address); 00973 port = Integer.format (new char[5], address.port); 00974 } 00975 00976 /*********************************************************************** 00977 00978 Add a cache-loader reference for the remote node 00979 00980 ***********************************************************************/ 00981 00982 void setTasks (InternetAddress address) 00983 { 00984 this.tasks = new ConnectionPool (address); 00985 } 00986 00987 /*********************************************************************** 00988 00989 Return the name of this node (the network name of the device) 00990 00991 ***********************************************************************/ 00992 00993 override char[] toString () 00994 { 00995 return name; 00996 } 00997 00998 /*********************************************************************** 00999 01000 Remove this Node from the cluster. The node is disabled 01001 until it is seen to recover. 01002 01003 ***********************************************************************/ 01004 01005 void fail () 01006 { 01007 setEnabled (false); 01008 cache.close (); 01009 tasks.close (); 01010 } 01011 01012 /*********************************************************************** 01013 01014 Get the current state of this node 01015 01016 ***********************************************************************/ 01017 01018 bool isEnabled () 01019 { 01020 volatile 01021 return enabled; 01022 } 01023 01024 /*********************************************************************** 01025 01026 Set the enabled state of this node 01027 01028 ***********************************************************************/ 01029 01030 void setEnabled (bool enabled) 01031 { 01032 if (logger && logger.isEnabled (logger.Level.Trace)) 01033 { 01034 if (enabled) 01035 logger.trace ("enabling node '"~name~"' on port "~port); 01036 else 01037 logger.trace ("disabling node '"~name~"'"); 01038 } 01039 volatile 01040 this.enabled = enabled; 01041 } 01042 01043 /*********************************************************************** 01044 01045 request data; fail this Node if we can't connect. Note 01046 that we make several attempts to connect before writing 01047 the node off as a failure. 01048 01049 ***********************************************************************/ 01050 01051 bool request (ConnectionPool pool, ProtocolWriter writer, ProtocolReader reader, out IPayload payload) 01052 { 01053 ProtocolWriter.Command cmd; 01054 ulong time; 01055 char[] channel; 01056 char[] element; 01057 Connection connect; 01058 01059 // it's possible that the pool may have failed between 01060 // the point of selecting it, and the invocation itself 01061 if (pool is null) 01062 return false; 01063 01064 // get a connection to the server 01065 connect = pool.borrow (time = System.getMillisecs); 01066 01067 // talk to the server (try a few times if necessary) 01068 for (int attempts=3; attempts--;) 01069 try { 01070 // attach connection to reader and writer 01071 writer.getBuffer.setConduit (connect.getConduit); 01072 reader.getBuffer.setConduit (connect.getConduit); 01073 01074 // send request 01075 writer.flush (); 01076 01077 // load the returned object (don't retry!) 01078 attempts = 0; 01079 payload = reader.getPayload (channel, element, cmd); 01080 01081 // return borrowed connection 01082 connect.done (time); 01083 01084 } catch (PickleException x) 01085 { 01086 connect.done (time); 01087 throw x; 01088 } 01089 catch (IOException x) 01090 // attempt to reconnect? 01091 if (attempts == 0 || !connect.reset) 01092 { 01093 // that server is offline 01094 fail (); 01095 01096 // state that we failed 01097 return false; 01098 } 01099 01100 // is payload an exception? 01101 if (cmd != ProtocolWriter.Command.OK) 01102 { 01103 // is node full? 01104 if (cmd == ProtocolWriter.Command.Full) 01105 throw new ClusterFullException (channel); 01106 01107 // did node barf? 01108 if (cmd == ProtocolWriter.Command.Exception) 01109 throw new ClusterException (channel); 01110 01111 // bogus response 01112 throw new ClusterException ("invalid response from cluster server"); 01113 } 01114 01115 return true; 01116 } 01117 } 01118 01119 01120 /******************************************************************************* 01121 01122 Models a set of remote cluster nodes. 01123 01124 *******************************************************************************/ 01125 01126 private class NodeSet 01127 { 01128 private HashMap map; 01129 private Node[] nodes; 01130 private Node[] random; 01131 private ILogger logger; 01132 01133 /*********************************************************************** 01134 01135 ***********************************************************************/ 01136 01137 this (ILogger logger) 01138 { 01139 this.logger = logger; 01140 map = new HashMap (128, 0.75, 4); 01141 } 01142 01143 /*********************************************************************** 01144 01145 ***********************************************************************/ 01146 01147 ILogger getLogger () 01148 { 01149 return logger; 01150 } 01151 01152 /*********************************************************************** 01153 01154 Add a node to the list of servers, and sort them such that 01155 all clients will have the same ordered set 01156 01157 ***********************************************************************/ 01158 01159 synchronized void addNode (Node node) 01160 { 01161 char[] name = node.toString; 01162 01163 if (map.get (name)) 01164 throw new ClusterException ("Attempt to add cluster node '"~name~"' more than once"); 01165 01166 map.put (name, node); 01167 nodes ~= node; 01168 //nodes.sort; 01169 } 01170 01171 /*********************************************************************** 01172 01173 ***********************************************************************/ 01174 01175 void optimize () 01176 { 01177 // nodes are already in the same order across the cluster 01178 // since the configuration file should be identical ... 01179 01180 // nodes.sort; 01181 01182 // copy the node list 01183 random = nodes.dup; 01184 01185 // muddle up the duplicate list. This randomized list 01186 // is used when scanning the cluster for queued entries 01187 foreach (int i, Node n; random) 01188 { 01189 int j = Random.get (random.length); 01190 Node tmp = random[i]; 01191 random[i] = random[j]; 01192 random[j] = tmp; 01193 } 01194 } 01195 01196 /*********************************************************************** 01197 01198 ***********************************************************************/ 01199 01200 synchronized void enable (char[] name, ushort port1, ushort port2) 01201 { 01202 Node node = cast(Node) map.get (name); 01203 if (node is null) 01204 throw new ClusterException ("Attempt to enable unknown cluster node '"~name~"'"); 01205 else 01206 if (! node.isEnabled) 01207 { 01208 node.setCache (new InternetAddress (name, port1)); 01209 node.setTasks (new InternetAddress (name, port2)); 01210 node.setEnabled (true); 01211 } 01212 } 01213 01214 /*********************************************************************** 01215 01216 ***********************************************************************/ 01217 01218 IPayload request (ProtocolWriter writer, ProtocolReader reader, char[] key = null) 01219 { 01220 Node node; 01221 IPayload payload; 01222 01223 do { 01224 node = selectNode (key); 01225 } while (! node.request (node.cache, writer, reader, payload)); 01226 return payload; 01227 } 01228 01229 /*********************************************************************** 01230 01231 Select a cluster server based on a starting index. If the 01232 selected server is not currently enabled, we just try the 01233 next one. This behaviour should be consistent across each 01234 cluster client. 01235 01236 ***********************************************************************/ 01237 01238 private final Node selectNode (uint index) 01239 { 01240 uint i = nodes.length; 01241 01242 if (i) 01243 { 01244 index %= i; 01245 01246 while (i--) 01247 { 01248 Node node = nodes[index]; 01249 if (node.isEnabled) 01250 return node; 01251 01252 if (++index >= nodes.length) 01253 index = 0; 01254 } 01255 } 01256 throw new ClusterEmptyException ("No cluster servers are available"); 01257 } 01258 01259 /*********************************************************************** 01260 01261 Select a cluster server based on the specified key. If the 01262 selected server is not currently enabled, we just try the 01263 next one. This behaviour should be consistent across each 01264 cluster client. 01265 01266 ***********************************************************************/ 01267 01268 final Node selectNode (char[] key = null) 01269 { 01270 static uint index; 01271 01272 if (key.length) 01273 return selectNode (HashMap.jhash (key)); 01274 01275 // no key provided, so just roll the counter 01276 return selectNode (++index); 01277 } 01278 01279 /*********************************************************************** 01280 01281 Sweep the cluster servers. Returns true if the delegate 01282 returns true, false otherwise. The sweep is aborted when 01283 the delegate returns true. Note that this scans nodes in 01284 a randomized pattern, which should tend to avoid 'bursty' 01285 activity by a set of clients upon any one cluster server. 01286 01287 ***********************************************************************/ 01288 01289 bool scan (bool delegate (Node) dg) 01290 { 01291 Node node; 01292 int index = nodes.length; 01293 01294 while (index--) 01295 { 01296 // lookup the randomized set of server nodes 01297 node = random [index]; 01298 01299 // callback on each enabled node 01300 if (node.isEnabled) 01301 if (dg (node)) 01302 return true; 01303 } 01304 return false; 01305 } 01306 } 01307 01308 01309