00001 /******************************************************************************* 00002 00003 @file Cluster.d 00004 00005 Copyright (c) 2004 Kris Bell 00006 00007 This software is provided 'as-is', without any express or implied 00008 warranty. In no event will the authors be held liable for damages 00009 of any kind arising from the use of this software. 00010 00011 Permission is hereby granted to anyone to use this software for any 00012 purpose, including commercial applications, and to alter it and/or 00013 redistribute it freely, subject to the following restrictions: 00014 00015 1. The origin of this software must not be misrepresented; you must 00016 not claim that you wrote the original software. If you use this 00017 software in a product, an acknowledgment within documentation of 00018 said product would be appreciated but is not required. 00019 00020 2. Altered source versions must be plainly marked as such, and must 00021 not be misrepresented as being the original software. 00022 00023 3. This notice may not be removed or altered from any distribution 00024 of the source. 00025 00026 4. Derivative works are permitted, but they must carry this notice 00027 in full and credit the original source. 00028 00029 00030 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 00031 00032 00033 @version Initial version, July 2004 00034 @author Kris 00035 00036 00037 *******************************************************************************/ 00038 00039 module mango.cluster.qos.socket.Cluster; 00040 00041 private import mango.cache.HashMap; 00042 00043 private import mango.utils.Text, 00044 mango.utils.Random; 00045 00046 private import mango.format.Int; 00047 00048 private import mango.base.System; 00049 00050 private import mango.io.Buffer, 00051 mango.io.Socket, 00052 mango.io.Exception, 00053 mango.io.Properties, 00054 mango.io.ArrayAllocator, 00055 mango.io.SocketConduit, 00056 mango.io.SocketListener, 00057 mango.io.MulticastSocket; 00058 00059 private import mango.io.model.IConduit; 00060 00061 private import mango.log.model.ILogger; 00062 00063 private import mango.cluster.Client; 00064 00065 public import mango.cluster.model.ICluster; 00066 00067 private import mango.cluster.qos.socket.RollCall, 00068 mango.cluster.qos.socket.ClusterEvent, 00069 mango.cluster.qos.socket.ProtocolReader, 00070 mango.cluster.qos.socket.ProtocolWriter; 00071 00072 00073 /******************************************************************************* 00074 00075 QOS implementation for sockets. All cluster-client activity is 00076 gated through here by the higher level classes; NetworkQueue & 00077 NetworkCache for example. You gain access to the cluster by 00078 creating an instance of the QOS (quality of service) you desire 00079 and mapping client classes onto it. For example: 00080 00081 @code 00082 import mango.cluster.NetworkCache; 00083 import mango.cluster.qos.socket.Cluster; 00084 00085 ICluster cluster = new Cluster (...); 00086 NetworkCache cache = new NetworkCache (cluster, ...); 00087 00088 cache.put (...); 00089 cache.get (...); 00090 cache.invalidate (...); 00091 @endcode 00092 00093 Please see the cluster clients for additional details. Currently 00094 these include CacheInvalidator, CacheInvalidatee, NetworkMessage, 00095 NetworkTask, NetworkQueue, NetworkCache, NetworkCombo, plus the 00096 Client base-class. 00097 00098 *******************************************************************************/ 00099 00100 class Cluster : ICluster, IEventListener 00101 { 00102 private static HashMap groups; 00103 private ILogger logger; 00104 private NodeSet nodeSet; 00105 private Buffer mBuffer; 00106 private ProtocolWriter mWriter; 00107 private MulticastSocket mSocket; 00108 00109 private int groupTTL = 1; 00110 private int groupPort = 3333; 00111 private int groupPrefix = 225; 00112 00113 /*********************************************************************** 00114 00115 Setup a hashmap for multicast group addresses 00116 00117 ***********************************************************************/ 00118 00119 static this () 00120 { 00121 groups = new HashMap (128, 0.75, 2); 00122 } 00123 00124 /*********************************************************************** 00125 00126 Setup a Cluster instance. Currently the buffer & writer 00127 are shared for all bulletin serialization; this should 00128 probably change at some point such that we can support 00129 multiple threads broadcasting concurrently to different 00130 output ports. 00131 00132 ***********************************************************************/ 00133 00134 this (ILogger logger = null) 00135 { 00136 this.logger = logger; 00137 nodeSet = new NodeSet (logger); 00138 mBuffer = new Buffer (1024 * 4); 00139 mSocket = new MulticastSocket; 00140 mWriter = new ProtocolWriter (mBuffer); 00141 } 00142 00143 /*********************************************************************** 00144 00145 Setup a Cluster instance. Currently the buffer & writer 00146 are shared for all bulletin serialization; this should 00147 probably change at some point such that we can support 00148 multiple threads broadcasting concurrently to different 00149 output ports. 00150 00151 ***********************************************************************/ 00152 00153 this (ILogger logger, IConduit conduit) 00154 in { 00155 assert (logger); 00156 assert (conduit); 00157 } 00158 body 00159 { 00160 this (logger); 00161 00162 // callback for loading cluster configuration 00163 void loader (char[] name, char[] value) 00164 { 00165 logger.info ("cluster config: "~name~" = "~value); 00166 if (name == "node") 00167 nodeSet.addNode (new Node (logger, value)); 00168 else 00169 if (name == "multicast_port") 00170 groupPort = Int.parse (value); 00171 else 00172 if (name == "multicast_prefix") 00173 groupPrefix = Int.parse (value); 00174 else 00175 if (name == "multicast_ttl") 00176 groupTTL = Int.parse (value); 00177 else 00178 throw new ClusterException ("Unrecognized attribute '"~name~"' in socket.Cluster configuration"); 00179 } 00180 00181 00182 // load up the cluster configuration 00183 Properties.load (conduit, &loader); 00184 00185 // finalize nodeSet 00186 nodeSet.optimize (); 00187 00188 // listen for cluster servers 00189 IChannel channel = createChannel ("cluster.server.advertise"); 00190 createConsumer (channel, IEvent.Style.Bulletin, this); 00191 00192 // ask who's currently running 00193 logger.trace ("discovering active cluster servers ..."); 00194 broadcast (channel, new RollCall (Socket.hostName(), 0, 0, true)); 00195 00196 // wait for enabled servers to respond ... 00197 System.sleep (System.Interval.Millisec * 250); 00198 } 00199 00200 /*********************************************************************** 00201 00202 Setup a Cluster instance. Currently the buffer & writer 00203 are shared for all bulletin serialization; this should 00204 probably change at some point such that we can support 00205 multiple threads broadcasting concurrently to different 00206 output ports. 00207 00208 ***********************************************************************/ 00209 00210 this (ILogger logger, uint serverPort) 00211 in { 00212 assert (logger); 00213 assert (serverPort > 1024); 00214 } 00215 body 00216 { 00217 this (logger); 00218 00219 Node node = new Node (logger, "local"); 00220 node.setCache (new InternetAddress ("localhost", serverPort)); 00221 node.setTasks (new InternetAddress ("localhost", serverPort+1)); 00222 node.setEnabled (true); 00223 nodeSet.addNode (node); 00224 nodeSet.optimize (); 00225 } 00226 00227 /*********************************************************************** 00228 00229 IEventListener interface method for listening to RollCall 00230 responses. These are sent out by cluster servers both when 00231 they get a RollCall request, and when they begin execution. 00232 00233 ***********************************************************************/ 00234 00235 void notify (IEvent event, IPayload payload) 00236 { 00237 RollCall rollcall = cast(RollCall) payload; 00238 00239 // ignore requests from clients (we're on a common channel) 00240 if (! rollcall.request) 00241 nodeSet.enable (rollcall.name, rollcall.port1, rollcall.port2); 00242 } 00243 00244 /*********************************************************************** 00245 00246 Create a channel instance. Our channel implementation 00247 includes a number of cached IO helpers (ProtolcolWriter 00248 and so on) which simplifies and speeds up execution. 00249 00250 ***********************************************************************/ 00251 00252 IChannel createChannel (char[] channel) 00253 { 00254 return new Channel (channel); 00255 } 00256 00257 /*********************************************************************** 00258 00259 Return the logger instance provided during construction. 00260 00261 ***********************************************************************/ 00262 00263 ILogger getLogger () 00264 { 00265 return logger; 00266 } 00267 00268 /*********************************************************************** 00269 00270 Broadcast a payload on the specified channel. This uses 00271 IP/Multicast to scatter the payload to all registered 00272 listeners (on the same multicast group). Note that the 00273 maximum payload size is limited to that of an Ethernet 00274 data frame, minus the IP/UDP header size (1472 bytes). 00275 00276 ***********************************************************************/ 00277 00278 synchronized void broadcast (IChannel channel, IPayload payload = null) 00279 { 00280 // serialize content 00281 mBuffer.clear (); 00282 mWriter.put (ProtocolWriter.Command.OK, channel.getName, payload); 00283 00284 // Ethernet data-frame size minus the 28 byte UDP/IP header: 00285 if (mBuffer.getPosition > 1472) 00286 throw new ClusterException ("payload is too large to broadcast"); 00287 00288 // send it to the appropriate multicast group 00289 mSocket.write (mBuffer, getGroup (channel.getName)); 00290 } 00291 00292 /*********************************************************************** 00293 00294 Create a listener of the specified type. Listeners are 00295 run within their own thread, since they spend the vast 00296 majority of their time blocked on a Socket read. Would 00297 be good to support multiplexed reading instead, such 00298 that a thread pool could be applied instead. 00299 00300 ***********************************************************************/ 00301 00302 IConsumer createConsumer (IChannel channel, IEvent.Style style, 00303 IEventListener notify) 00304 { 00305 IEvent event = new ClusterEvent (this, channel, style, notify); 00306 00307 if (logger) 00308 logger.info ("creating " ~ event.getStyleName ~ 00309 " consumer for channel '" ~ channel.getName ~ "'"); 00310 00311 switch (style) 00312 { 00313 case IEvent.Style.Message: 00314 return new MessageConsumer (this, event); 00315 00316 case IEvent.Style.Bulletin: 00317 return new BulletinConsumer (this, event); 00318 } 00319 } 00320 00321 /*********************************************************************** 00322 00323 Return a entry from the network cache, and optionally 00324 remove it. This is a synchronous operation as opposed 00325 to the asynchronous nature of an invalidate broadcast. 00326 00327 ***********************************************************************/ 00328 00329 IPayload getCache (IChannel channel, char[] key, bool remove) 00330 { 00331 Channel c = cast(Channel) channel; 00332 00333 c.writer.put (remove ? ProtocolWriter.Command.Remove : 00334 ProtocolWriter.Command.Copy, c.getName, null, key); 00335 return nodeSet.request (c.writer, c.reader, key); 00336 } 00337 00338 /*********************************************************************** 00339 00340 Place an entry into the network cache, replacing the 00341 entry with the identical key. Note that this may cause 00342 the oldest entry in the cache to be displaced if the 00343 cache is already full. 00344 00345 ***********************************************************************/ 00346 00347 IPayload putCache (IChannel channel, char[] key, IPayload payload) 00348 { 00349 Channel c = cast(Channel) channel; 00350 00351 c.writer.put (ProtocolWriter.Command.Add, c.getName, payload, key); 00352 return nodeSet.request (c.writer, c.reader, key); 00353 } 00354 00355 /*********************************************************************** 00356 00357 Add an entry to the specified network queue. May throw a 00358 QueueFullException if there's no room available. 00359 00360 ***********************************************************************/ 00361 00362 IPayload putQueue (IChannel channel, IPayload payload) 00363 { 00364 Channel c = cast(Channel) channel; 00365 00366 c.writer.put (ProtocolWriter.Command.AddQueue, c.getName, payload); 00367 nodeSet.request (c.writer, c.reader); 00368 return payload; 00369 } 00370 00371 00372 /*********************************************************************** 00373 00374 Query the cluster for queued entries on the corresponding 00375 channel. Returns, and removes, the first matching entry 00376 from the cluster. Note that this sweeps the cluster for 00377 matching entries, and is synchronous in nature. The more 00378 common approach is to setup a queue listener, which will 00379 grab and dispatch queue entries asynchronously. 00380 00381 ***********************************************************************/ 00382 00383 IPayload getQueue (IChannel channel) 00384 { 00385 IPayload payload; 00386 00387 Channel c = cast(Channel) channel; 00388 00389 // callback for NodeSet.scan() 00390 bool scan (Node node) 00391 { 00392 // ask this node ... 00393 c.writer.put (ProtocolWriter.Command.RemoveQueue, c.getName); 00394 node.request (node.cache, c.writer, c.reader, payload); 00395 return payload !== null; 00396 } 00397 00398 // make a pass over each Node, looking for channel entries 00399 nodeSet.scan (&scan); 00400 return payload; 00401 } 00402 00403 /*********************************************************************** 00404 00405 Load a network cache entry remotely. This sends the given 00406 Payload over the network to the cache host, where it will 00407 be executed locally. The benefit of doing so it that the 00408 host may deny access to the cache entry for the duration 00409 of the load operation. This, in turn, provides an elegant 00410 mechanism for gating/synchronizing multiple network clients 00411 over a given cache entry; handy for those entries that are 00412 relatively expensive to construct or access. 00413 00414 ***********************************************************************/ 00415 00416 void loadCache (IChannel channel, char[] key, IPayload payload) 00417 { 00418 Channel c = cast(Channel) channel; 00419 00420 c.writer.put (ProtocolWriter.Command.OK, c.getName, payload, key); 00421 Node node = nodeSet.selectNode (key); 00422 node.request (node.tasks, c.writer, c.reader, payload); 00423 } 00424 00425 /*********************************************************************** 00426 00427 Return an internet address representing the multicast 00428 group for the specified channel. We use three of the 00429 four address segments to represent the channel itself 00430 (via a hash on the channel name), and set the primary 00431 segment to be that of the broadcast prefix (above). 00432 00433 ***********************************************************************/ 00434 00435 synchronized InternetAddress getGroup (char[] channel) 00436 { 00437 InternetAddress group = cast(InternetAddress) groups.get (channel); 00438 00439 if (group is null) 00440 { 00441 char[5] tmp; 00442 char[] address; 00443 00444 // construct a group address from the prefix & channel-hash, 00445 // where the hash is folded down to 24 bits 00446 uint hash = groups.jhash (channel); 00447 hash = (hash >> 24) ^ (hash & 0x00ffffff); 00448 00449 address = Int.format (groupPrefix) ~ "." ~ 00450 Int.format ((hash >> 16) & 0xff) ~ "." ~ 00451 Int.format ((hash >> 8) & 0xff) ~ "." ~ 00452 Int.format (hash & 0xff); 00453 00454 //printf ("channel '%.*s' == '%.*s'\n", channel, address); 00455 00456 // insert InternetAddress into hashmap 00457 group = new InternetAddress (address, groupPort); 00458 groups.put (channel, group); 00459 } 00460 return group; 00461 } 00462 } 00463 00464 00465 /******************************************************************************* 00466 00467 A listener for multicast channel traffic. These are currently used 00468 for cache coherency, queue publishing, and node discovery activity; 00469 though could be used for direct messaging also. 00470 00471 *******************************************************************************/ 00472 00473 private class BulletinConsumer : SocketListener, IConsumer 00474 { 00475 private IEvent event; 00476 private Buffer buffer; 00477 private ProtocolReader reader; 00478 private Cluster cluster; 00479 private MulticastSocket consumer; 00480 00481 /*********************************************************************** 00482 00483 Construct a multicast consumer for the specified event. The 00484 event handler will be invoked whenever a message arrives for 00485 the associated channel. 00486 00487 ***********************************************************************/ 00488 00489 this (Cluster cluster, IEvent event) 00490 { 00491 this.event = event; 00492 this.cluster = cluster; 00493 00494 // buffer doesn't need to be larger than Ethernet data-frame 00495 buffer = new Buffer (1500); 00496 00497 // make the reader slice directly from the buffer content 00498 reader = new ProtocolReader (buffer); 00499 reader.setAllocator (new BufferAllocator); 00500 00501 // configure a listener socket 00502 consumer = new MulticastSocket; 00503 consumer.join (cluster.getGroup (event.getChannel.getName)); 00504 00505 super (consumer, buffer); 00506 00507 // fire up this listener 00508 start (); 00509 } 00510 00511 /*********************************************************************** 00512 00513 Notification callback invoked when we receive a multicast 00514 packet. Note that we check the packet channel-name against 00515 the one we're consuming, to check for cases where the group 00516 address had a hash collision. 00517 00518 ***********************************************************************/ 00519 00520 override void notify (IBuffer buffer) 00521 { 00522 ProtocolWriter.Command cmd; 00523 char[] channel; 00524 char[] element; 00525 00526 IPayload payload = reader.getPayload (channel, element, cmd); 00527 // printf ("notify '%.*s::%.*s' #'%.*s'\n", channel, element, event.getChannel.getName); 00528 00529 // check it's really for us first (might be a hash collision) 00530 if (channel == event.getChannel.getName) 00531 invoke (event, payload); 00532 } 00533 00534 /*********************************************************************** 00535 00536 Handle error conditions from the listener thread. 00537 00538 ***********************************************************************/ 00539 00540 override void exception (char [] msg) 00541 { 00542 cluster.getLogger.error ("BulletinConsumer: "~msg); 00543 } 00544 00545 /*********************************************************************** 00546 00547 Overridable mean of notifying the client code. 00548 00549 ***********************************************************************/ 00550 00551 protected void invoke (IEvent event, IPayload payload) 00552 { 00553 event.invoke (payload); 00554 } 00555 00556 /*********************************************************************** 00557 00558 Return the cluster instance we're associated with. 00559 00560 ***********************************************************************/ 00561 00562 Cluster getCluster () 00563 { 00564 return cluster; 00565 } 00566 00567 /*********************************************************************** 00568 00569 Temporarily halt listening. This can be used to ignore 00570 multicast messages while, for example, the consumer is 00571 busy doing other things. 00572 00573 ***********************************************************************/ 00574 00575 void pauseGroup () 00576 { 00577 consumer.pauseGroup (); 00578 } 00579 00580 /*********************************************************************** 00581 00582 Resume listening, post-pause. 00583 00584 ***********************************************************************/ 00585 00586 void resumeGroup () 00587 { 00588 consumer.resumeGroup (); 00589 } 00590 00591 /*********************************************************************** 00592 00593 Cancel this consumer. The listener is effectively disabled 00594 from this point forward. The listener thread does not halt 00595 at this point, but waits until the socket-read returns. 00596 Note that the D Interface implementation requires us to 00597 "reimplement and dispatch" trivial things like this ~ it's 00598 a pain in the neck to maintain. 00599 00600 ***********************************************************************/ 00601 00602 void cancel () 00603 { 00604 super.cancel (); 00605 } 00606 } 00607 00608 00609 /******************************************************************************* 00610 00611 A listener for queue events. These events are produced by the 00612 queue host on a periodic bases when it has available entries. 00613 We listen for them (rather than constantly scanning) and then 00614 begin a sweep to process as many as we can. Note that we will 00615 be in competition with other nodes to process these entries. 00616 00617 *******************************************************************************/ 00618 00619 private class MessageConsumer : BulletinConsumer 00620 { 00621 /*********************************************************************** 00622 00623 Construct a multicast consumer for the specified event 00624 00625 ***********************************************************************/ 00626 00627 this (Cluster cluster, IEvent event) 00628 { 00629 super (cluster, event); 00630 } 00631 00632 /*********************************************************************** 00633 00634 Handle error conditions from the listener thread. 00635 00636 ***********************************************************************/ 00637 00638 override void exception (char [] msg) 00639 { 00640 cluster.getLogger.error ("MessageConsumer: "~msg); 00641 } 00642 00643 /*********************************************************************** 00644 00645 override the default processing to sweep the cluster for 00646 queued entries. Each server node is queried until one is 00647 found that contains a payload. Note that it is possible 00648 to set things up where we are told exactly which node to 00649 go to; howerver given that we won't be listening whilst 00650 scanning, and that there's likely to be a group of new 00651 entries in the cluster, it's just as effective to scan. 00652 This will be far from ideal for all environments, so we 00653 should make the strategy plugable instead. 00654 00655 ***********************************************************************/ 00656 00657 override protected void invoke (IEvent event, IPayload payload) 00658 { 00659 //temporarilty pause listening 00660 pauseGroup (); 00661 00662 try { 00663 while ((payload = getCluster.getQueue (event.getChannel)) !== null) 00664 event.invoke (payload); 00665 } finally 00666 // resume listening 00667 resumeGroup (); 00668 } 00669 } 00670 00671 00672 /******************************************************************************* 00673 00674 A channel represents something akin to a publish/subscribe topic, 00675 or a radio station. These are used to segregate cluster operations 00676 into a set of groups, where each group is represented by a channel. 00677 Channel names are whatever you want then to be: use of dot notation 00678 has proved useful in the past. See Client.createChannel 00679 00680 *******************************************************************************/ 00681 00682 private class Channel : IChannel 00683 { 00684 char[] name; 00685 Buffer buffer; 00686 ProtocolReader reader; 00687 ProtocolWriter writer; 00688 00689 /*********************************************************************** 00690 00691 Construct a channel with the specified name. We cache 00692 a number of session-related constructs here also, in 00693 order to eliminate runtime overhead 00694 00695 ***********************************************************************/ 00696 00697 this (char[] name) 00698 in { 00699 assert (name.length); 00700 } 00701 body 00702 { 00703 this.name = name; 00704 00705 // this buffer will grow as required to house larger payloads 00706 buffer = new GrowableBuffer (1024 * 2); 00707 writer = new ProtocolWriter (buffer); 00708 00709 // make the reader slice directly from the buffer content 00710 reader = new ProtocolReader (buffer); 00711 reader.setAllocator (new BufferAllocator); 00712 } 00713 00714 /*********************************************************************** 00715 00716 Return the name of this channel. This is the name provided 00717 when the channel was constructed. 00718 00719 ***********************************************************************/ 00720 00721 char[] getName () 00722 { 00723 return name; 00724 } 00725 00726 /*********************************************************************** 00727 00728 Output this channel via the provided IWriter 00729 00730 ***********************************************************************/ 00731 00732 void write (IWriter writer) 00733 { 00734 writer.put (name); 00735 } 00736 00737 /*********************************************************************** 00738 00739 Input this channel via the provided IReader 00740 00741 ***********************************************************************/ 00742 00743 void read (IReader reader) 00744 { 00745 reader.get (name); 00746 } 00747 } 00748 00749 00750 /******************************************************************************* 00751 00752 An abstraction of a socket connection. Used internally by the 00753 socket-based Cluster. 00754 00755 *******************************************************************************/ 00756 00757 private class Connection 00758 { 00759 abstract bool reset(); 00760 00761 abstract void done (ulong time); 00762 00763 abstract SocketConduit getConduit (); 00764 } 00765 00766 00767 /******************************************************************************* 00768 00769 A pool of socket connections for accessing cluster nodes. Note 00770 that the entries will timeout after a period of inactivity, and 00771 will subsequently cause a connected host to drop the supporting 00772 session. 00773 00774 *******************************************************************************/ 00775 00776 private class ConnectionPool 00777 { 00778 private int count; 00779 private InternetAddress address; 00780 private PoolConnection freelist; 00781 private const ulong timeout = 60_000; 00782 00783 /*********************************************************************** 00784 00785 Utility class to provide the basic connection facilities 00786 provided by the connection pool. 00787 00788 ***********************************************************************/ 00789 00790 class PoolConnection : Connection 00791 { 00792 ulong time; 00793 PoolConnection next; 00794 ConnectionPool parent; 00795 SocketConduit conduit; 00796 00797 /*************************************************************** 00798 00799 Construct a new connection and set its parent 00800 00801 ***************************************************************/ 00802 00803 this (ConnectionPool pool) 00804 { 00805 parent = pool; 00806 reset (); 00807 } 00808 00809 /*************************************************************** 00810 00811 Return the socket belonging to this connection 00812 00813 ***************************************************************/ 00814 00815 SocketConduit getConduit () 00816 { 00817 return conduit; 00818 } 00819 00820 /*************************************************************** 00821 00822 Create a new socket and connect it to the specified 00823 server. This will cause a dedicated thread to start 00824 on the server. Said thread will quit when an error 00825 occurs. 00826 00827 ***************************************************************/ 00828 00829 bool reset () 00830 { 00831 try { 00832 conduit = new SocketConduit; 00833 conduit.connect (parent.address); 00834 00835 // set a 500ms timeout for read operations 00836 conduit.setTimeout (System.Interval.Millisec * 500); 00837 return true; 00838 00839 } catch (Object) 00840 { 00841 return false; 00842 } 00843 } 00844 00845 /*************************************************************** 00846 00847 Close the socket. This will cause any host session 00848 to be terminated. 00849 00850 ***************************************************************/ 00851 00852 void close () 00853 { 00854 conduit.close (); 00855 } 00856 00857 /*************************************************************** 00858 00859 Return this connection to the free-list. Note that 00860 we have to synchronize on the parent-pool itself. 00861 00862 ***************************************************************/ 00863 00864 void done (ulong time) 00865 { 00866 synchronized (parent) 00867 { 00868 next = parent.freelist; 00869 parent.freelist = this; 00870 this.time = time; 00871 } 00872 } 00873 } 00874 00875 00876 /*********************************************************************** 00877 00878 Create a connection-pool for the specified address. 00879 00880 ***********************************************************************/ 00881 00882 this (InternetAddress address) 00883 { 00884 this.address = address; 00885 } 00886 00887 /*********************************************************************** 00888 00889 Allocate a Connection from a list rather than creating a 00890 new one. Reap old entries as we go. 00891 00892 ***********************************************************************/ 00893 00894 synchronized Connection borrow (ulong time) 00895 { 00896 if (freelist) 00897 do { 00898 PoolConnection c = freelist; 00899 00900 freelist = c.next; 00901 if (freelist && (time - c.time > timeout)) 00902 c.close (); 00903 else 00904 return c; 00905 } while (true); 00906 00907 return new PoolConnection (this); 00908 } 00909 00910 /*********************************************************************** 00911 00912 Close this pool and drop all existing connections. 00913 00914 ***********************************************************************/ 00915 00916 synchronized void close () 00917 { 00918 PoolConnection c = freelist; 00919 freelist = null; 00920 while (c) 00921 { 00922 c.close (); 00923 c = c.next; 00924 } 00925 } 00926 } 00927 00928 00929 /******************************************************************************* 00930 00931 Class to represent a cluster node. Each node supports both cache 00932 and queue functionality. Note that the set of available nodes is 00933 configured at startup, simplifying the discovery process in some 00934 significant ways, and causing less thrashing of cache-keys. 00935 00936 *******************************************************************************/ 00937 00938 private class Node 00939 { 00940 private char[] name, 00941 port; 00942 private ILogger logger; 00943 private bool enabled; 00944 00945 private ConnectionPool tasks, 00946 cache; 00947 00948 /*********************************************************************** 00949 00950 Construct a node with the provided name. This name should 00951 be the network name of the hosting device. 00952 00953 ***********************************************************************/ 00954 00955 this (ILogger logger, char[] name) 00956 { 00957 this.logger = logger; 00958 this.name = name; 00959 } 00960 00961 /*********************************************************************** 00962 00963 Add a cache/queue reference for the remote node 00964 00965 ***********************************************************************/ 00966 00967 void setCache (InternetAddress address) 00968 { 00969 this.cache = new ConnectionPool (address); 00970 port = Int.format (address.port); 00971 } 00972 00973 /*********************************************************************** 00974 00975 Add a cache-loader reference for the remote node 00976 00977 ***********************************************************************/ 00978 00979 void setTasks (InternetAddress address) 00980 { 00981 this.tasks = new ConnectionPool (address); 00982 } 00983 00984 /*********************************************************************** 00985 00986 Return the name of this node (the network name of the device) 00987 00988 ***********************************************************************/ 00989 00990 override char[] toString () 00991 { 00992 return name; 00993 } 00994 00995 /*********************************************************************** 00996 00997 Remove this Node from the cluster. The node is disabled 00998 until it is seen to recover. 00999 01000 ***********************************************************************/ 01001 01002 void fail () 01003 { 01004 setEnabled (false); 01005 cache.close (); 01006 tasks.close (); 01007 } 01008 01009 /*********************************************************************** 01010 01011 Get the current state of this node 01012 01013 ***********************************************************************/ 01014 01015 bool isEnabled () 01016 { 01017 volatile 01018 return enabled; 01019 } 01020 01021 /*********************************************************************** 01022 01023 Set the enabled state of this node 01024 01025 ***********************************************************************/ 01026 01027 void setEnabled (bool enabled) 01028 { 01029 if (logger && logger.isEnabled (logger.Level.Trace)) 01030 { 01031 if (enabled) 01032 logger.trace ("enabling node '"~name~"' on port "~port); 01033 else 01034 logger.trace ("disabling node '"~name~"'"); 01035 } 01036 volatile 01037 this.enabled = enabled; 01038 } 01039 01040 /*********************************************************************** 01041 01042 request data; fail this Node if we can't connect. Note 01043 that we make several attempts to connect before writing 01044 the node off as a failure. 01045 01046 ***********************************************************************/ 01047 01048 bool request (ConnectionPool pool, ProtocolWriter writer, ProtocolReader reader, out IPayload payload) 01049 { 01050 ProtocolWriter.Command cmd; 01051 ulong time; 01052 char[] channel; 01053 char[] element; 01054 Connection connect; 01055 01056 // it's possible that the pool may have failed between 01057 // the point of selecting it, and the invocation itself 01058 if (pool is null) 01059 return false; 01060 01061 // get a connection to the server 01062 connect = pool.borrow (time = System.getMillisecs); 01063 01064 // talk to the server (try a few times if necessary) 01065 for (int attempts=3; attempts--;) 01066 try { 01067 // attach connection to reader and writer 01068 writer.getBuffer.setConduit (connect.getConduit); 01069 reader.getBuffer.setConduit (connect.getConduit); 01070 01071 // send request 01072 writer.flush (); 01073 01074 // load the returned object (don't retry!) 01075 attempts = 0; 01076 payload = reader.getPayload (channel, element, cmd); 01077 01078 // return borrowed connection 01079 connect.done (time); 01080 01081 } catch (PickleException x) 01082 { 01083 connect.done (time); 01084 throw x; 01085 } 01086 catch (IOException x) 01087 // attempt to reconnect? 01088 if (attempts == 0 || !connect.reset) 01089 { 01090 // that server is offline 01091 fail (); 01092 01093 // state that we failed 01094 return false; 01095 } 01096 01097 // is payload an exception? 01098 if (cmd != ProtocolWriter.Command.OK) 01099 { 01100 // is node full? 01101 if (cmd == ProtocolWriter.Command.Full) 01102 throw new ClusterFullException (channel); 01103 01104 // did node barf? 01105 if (cmd == ProtocolWriter.Command.Exception) 01106 throw new ClusterException (channel); 01107 01108 // bogus response 01109 throw new ClusterException ("invalid response from cluster server"); 01110 } 01111 01112 return true; 01113 } 01114 } 01115 01116 01117 /******************************************************************************* 01118 01119 Models a set of remote cluster nodes. 01120 01121 *******************************************************************************/ 01122 01123 private class NodeSet 01124 { 01125 private HashMap map; 01126 private Node[] nodes; 01127 private Node[] random; 01128 private ILogger logger; 01129 01130 /*********************************************************************** 01131 01132 ***********************************************************************/ 01133 01134 this (ILogger logger) 01135 { 01136 this.logger = logger; 01137 map = new HashMap (128, 0.75, 4); 01138 } 01139 01140 /*********************************************************************** 01141 01142 ***********************************************************************/ 01143 01144 ILogger getLogger () 01145 { 01146 return logger; 01147 } 01148 01149 /*********************************************************************** 01150 01151 Add a node to the list of servers, and sort them such that 01152 all clients will have the same ordered set 01153 01154 ***********************************************************************/ 01155 01156 synchronized void addNode (Node node) 01157 { 01158 char[] name = node.toString; 01159 01160 if (map.get (name)) 01161 throw new ClusterException ("Attempt to add cluster node '"~name~"' more than once"); 01162 01163 map.put (name, node); 01164 nodes ~= node; 01165 //nodes.sort; 01166 } 01167 01168 /*********************************************************************** 01169 01170 ***********************************************************************/ 01171 01172 void optimize () 01173 { 01174 // nodes are already in the same order across the cluster 01175 // since the configuration file should be identical ... 01176 01177 // nodes.sort; 01178 01179 // copy the node list 01180 random = nodes.dup; 01181 01182 // muddle up the duplicate list. This randomized list 01183 // is used when scanning the cluster for queued entries 01184 foreach (int i, Node n; random) 01185 { 01186 int j = Random.get (random.length); 01187 Node tmp = random[i]; 01188 random[i] = random[j]; 01189 random[j] = tmp; 01190 } 01191 } 01192 01193 /*********************************************************************** 01194 01195 ***********************************************************************/ 01196 01197 synchronized void enable (char[] name, ushort port1, ushort port2) 01198 { 01199 Node node = cast(Node) map.get (name); 01200 if (node is null) 01201 throw new ClusterException ("Attempt to enable unknown cluster node '"~name~"'"); 01202 else 01203 if (! node.isEnabled) 01204 { 01205 node.setCache (new InternetAddress (name, port1)); 01206 node.setTasks (new InternetAddress (name, port2)); 01207 node.setEnabled (true); 01208 } 01209 } 01210 01211 /*********************************************************************** 01212 01213 ***********************************************************************/ 01214 01215 IPayload request (ProtocolWriter writer, ProtocolReader reader, char[] key = null) 01216 { 01217 Node node; 01218 IPayload payload; 01219 01220 do { 01221 node = selectNode (key); 01222 } while (! node.request (node.cache, writer, reader, payload)); 01223 return payload; 01224 } 01225 01226 /*********************************************************************** 01227 01228 Select a cluster server based on a starting index. If the 01229 selected server is not currently enabled, we just try the 01230 next one. This behaviour should be consistent across each 01231 cluster client. 01232 01233 ***********************************************************************/ 01234 01235 private final Node selectNode (uint index) 01236 { 01237 uint i = nodes.length; 01238 01239 if (i) 01240 { 01241 index %= i; 01242 01243 while (i--) 01244 { 01245 Node node = nodes[index]; 01246 if (node.isEnabled) 01247 return node; 01248 01249 if (++index >= nodes.length) 01250 index = 0; 01251 } 01252 } 01253 throw new ClusterEmptyException ("No cluster servers are available"); 01254 } 01255 01256 /*********************************************************************** 01257 01258 Select a cluster server based on the specified key. If the 01259 selected server is not currently enabled, we just try the 01260 next one. This behaviour should be consistent across each 01261 cluster client. 01262 01263 ***********************************************************************/ 01264 01265 final Node selectNode (char[] key = null) 01266 { 01267 static uint index; 01268 01269 if (key.length) 01270 return selectNode (HashMap.jhash (key)); 01271 01272 // no key provided, so just roll the counter 01273 return selectNode (++index); 01274 } 01275 01276 /*********************************************************************** 01277 01278 Sweep the cluster servers. Returns true if the delegate 01279 returns true, false otherwise. The sweep is aborted when 01280 the delegate returns true. Note that this scans nodes in 01281 a randomized pattern, which should tend to avoid 'bursty' 01282 activity by a set of clients upon any one cluster server. 01283 01284 ***********************************************************************/ 01285 01286 bool scan (bool delegate (Node) dg) 01287 { 01288 Node node; 01289 int index = nodes.length; 01290 01291 while (index--) 01292 { 01293 // lookup the randomized set of server nodes 01294 node = random [index]; 01295 01296 // callback on each enabled node 01297 if (node.isEnabled) 01298 if (dg (node)) 01299 return true; 01300 } 01301 return false; 01302 } 01303 } 01304 01305 01306