00001 /******************************************************************************* 00002 00003 @file Cluster.d 00004 00005 Copyright (c) 2004 Kris Bell 00006 00007 This software is provided 'as-is', without any express or implied 00008 warranty. In no event will the authors be held liable for damages 00009 of any kind arising from the use of this software. 00010 00011 Permission is hereby granted to anyone to use this software for any 00012 purpose, including commercial applications, and to alter it and/or 00013 redistribute it freely, subject to the following restrictions: 00014 00015 1. The origin of this software must not be misrepresented; you must 00016 not claim that you wrote the original software. If you use this 00017 software in a product, an acknowledgment within documentation of 00018 said product would be appreciated but is not required. 00019 00020 2. Altered source versions must be plainly marked as such, and must 00021 not be misrepresented as being the original software. 00022 00023 3. This notice may not be removed or altered from any distribution 00024 of the source. 00025 00026 4. Derivative works are permitted, but they must carry this notice 00027 in full and credit the original source. 00028 00029 00030 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 00031 00032 00033 @version Initial version, July 2004 00034 @author Kris 00035 00036 00037 *******************************************************************************/ 00038 00039 module mango.cluster.qos.socket.Cluster; 00040 00041 private import mango.cache.HashMap; 00042 00043 private import mango.utils.Random; 00044 00045 private import mango.format.Int; 00046 00047 private import mango.sys.System; 00048 00049 private import mango.io.Buffer, 00050 mango.io.Socket, 00051 mango.io.Exception, 00052 mango.io.Properties, 00053 mango.io.ArrayAllocator, 00054 mango.io.SocketConduit, 00055 mango.io.SocketListener, 00056 mango.io.MulticastSocket; 00057 00058 private import mango.io.model.IConduit; 00059 00060 private import mango.log.model.ILogger; 00061 00062 private import mango.cluster.Client; 00063 00064 public import mango.cluster.model.ICluster; 00065 00066 private import mango.cluster.qos.socket.RollCall, 00067 mango.cluster.qos.socket.ClusterEvent, 00068 mango.cluster.qos.socket.ProtocolReader, 00069 mango.cluster.qos.socket.ProtocolWriter; 00070 00071 00072 /******************************************************************************* 00073 00074 QOS implementation for sockets. All cluster-client activity is 00075 gated through here by the higher level classes; NetworkQueue & 00076 NetworkCache for example. You gain access to the cluster by 00077 creating an instance of the QOS (quality of service) you desire 00078 and mapping client classes onto it. For example: 00079 00080 @code 00081 import mango.cluster.NetworkCache; 00082 import mango.cluster.qos.socket.Cluster; 00083 00084 ICluster cluster = new Cluster (...); 00085 NetworkCache cache = new NetworkCache (cluster, ...); 00086 00087 cache.put (...); 00088 cache.get (...); 00089 cache.invalidate (...); 00090 @endcode 00091 00092 Please see the cluster clients for additional details. Currently 00093 these include CacheInvalidator, CacheInvalidatee, NetworkMessage, 00094 NetworkTask, NetworkQueue, NetworkCache, NetworkCombo, plus the 00095 Client base-class. 00096 00097 *******************************************************************************/ 00098 00099 class Cluster : ICluster, IEventListener 00100 { 00101 private static HashMap groups; 00102 private ILogger logger; 00103 private NodeSet nodeSet; 00104 private Buffer mBuffer; 00105 private ProtocolWriter mWriter; 00106 private MulticastSocket mSocket; 00107 00108 private int groupTTL = 1; 00109 private int groupPort = 3333; 00110 private int groupPrefix = 225; 00111 00112 /*********************************************************************** 00113 00114 Setup a hashmap for multicast group addresses 00115 00116 ***********************************************************************/ 00117 00118 static this () 00119 { 00120 groups = new HashMap (128, 0.75, 2); 00121 } 00122 00123 /*********************************************************************** 00124 00125 Setup a Cluster instance. Currently the buffer & writer 00126 are shared for all bulletin serialization; this should 00127 probably change at some point such that we can support 00128 multiple threads broadcasting concurrently to different 00129 output ports. 00130 00131 ***********************************************************************/ 00132 00133 this (ILogger logger = null) 00134 { 00135 this.logger = logger; 00136 nodeSet = new NodeSet (logger); 00137 mBuffer = new Buffer (1024 * 4); 00138 mSocket = new MulticastSocket; 00139 mWriter = new ProtocolWriter (mBuffer); 00140 } 00141 00142 /*********************************************************************** 00143 00144 Setup a Cluster instance. Currently the buffer & writer 00145 are shared for all bulletin serialization; this should 00146 probably change at some point such that we can support 00147 multiple threads broadcasting concurrently to different 00148 output ports. 00149 00150 ***********************************************************************/ 00151 00152 this (ILogger logger, IConduit conduit) 00153 in { 00154 assert (logger); 00155 assert (conduit); 00156 } 00157 body 00158 { 00159 this (logger); 00160 00161 // callback for loading cluster configuration 00162 void loader (char[] name, char[] value) 00163 { 00164 logger.info ("cluster config: "~name~" = "~value); 00165 if (name == "node") 00166 nodeSet.addNode (new Node (logger, value)); 00167 else 00168 if (name == "multicast_port") 00169 groupPort = Int.parse (value); 00170 else 00171 if (name == "multicast_prefix") 00172 groupPrefix = Int.parse (value); 00173 else 00174 if (name == "multicast_ttl") 00175 groupTTL = Int.parse (value); 00176 else 00177 throw new ClusterException ("Unrecognized attribute '"~name~"' in socket.Cluster configuration"); 00178 } 00179 00180 00181 // load up the cluster configuration 00182 Properties.load (conduit, &loader); 00183 00184 // finalize nodeSet 00185 nodeSet.optimize (); 00186 00187 // listen for cluster servers 00188 IChannel channel = createChannel ("cluster.server.advertise"); 00189 createConsumer (channel, IEvent.Style.Bulletin, this); 00190 00191 // ask who's currently running 00192 logger.trace ("discovering active cluster servers ..."); 00193 broadcast (channel, new RollCall (Socket.hostName(), 0, 0, true)); 00194 00195 // wait for enabled servers to respond ... 00196 System.sleep (System.Interval.Millisec * 250); 00197 } 00198 00199 /*********************************************************************** 00200 00201 Setup a Cluster instance. Currently the buffer & writer 00202 are shared for all bulletin serialization; this should 00203 probably change at some point such that we can support 00204 multiple threads broadcasting concurrently to different 00205 output ports. 00206 00207 ***********************************************************************/ 00208 00209 this (ILogger logger, uint serverPort) 00210 in { 00211 assert (logger); 00212 assert (serverPort > 1024); 00213 } 00214 body 00215 { 00216 this (logger); 00217 00218 Node node = new Node (logger, "local"); 00219 node.setCache (new InternetAddress ("localhost", serverPort)); 00220 node.setTasks (new InternetAddress ("localhost", serverPort+1)); 00221 node.setEnabled (true); 00222 nodeSet.addNode (node); 00223 nodeSet.optimize (); 00224 } 00225 00226 /*********************************************************************** 00227 00228 IEventListener interface method for listening to RollCall 00229 responses. These are sent out by cluster servers both when 00230 they get a RollCall request, and when they begin execution. 00231 00232 ***********************************************************************/ 00233 00234 void notify (IEvent event, IPayload payload) 00235 { 00236 RollCall rollcall = cast(RollCall) payload; 00237 00238 // ignore requests from clients (we're on a common channel) 00239 if (! rollcall.request) 00240 nodeSet.enable (rollcall.name, rollcall.port1, rollcall.port2); 00241 } 00242 00243 /*********************************************************************** 00244 00245 Create a channel instance. Our channel implementation 00246 includes a number of cached IO helpers (ProtolcolWriter 00247 and so on) which simplifies and speeds up execution. 00248 00249 ***********************************************************************/ 00250 00251 IChannel createChannel (char[] channel) 00252 { 00253 return new Channel (channel); 00254 } 00255 00256 /*********************************************************************** 00257 00258 Return the logger instance provided during construction. 00259 00260 ***********************************************************************/ 00261 00262 ILogger getLogger () 00263 { 00264 return logger; 00265 } 00266 00267 /*********************************************************************** 00268 00269 Broadcast a payload on the specified channel. This uses 00270 IP/Multicast to scatter the payload to all registered 00271 listeners (on the same multicast group). Note that the 00272 maximum payload size is limited to that of an Ethernet 00273 data frame, minus the IP/UDP header size (1472 bytes). 00274 00275 ***********************************************************************/ 00276 00277 synchronized void broadcast (IChannel channel, IPayload payload = null) 00278 { 00279 // serialize content 00280 mBuffer.clear (); 00281 mWriter.put (ProtocolWriter.Command.OK, channel.getName, payload); 00282 00283 // Ethernet data-frame size minus the 28 byte UDP/IP header: 00284 if (mBuffer.getPosition > 1472) 00285 throw new ClusterException ("payload is too large to broadcast"); 00286 00287 // send it to the appropriate multicast group 00288 mSocket.write (mBuffer, getGroup (channel.getName)); 00289 } 00290 00291 /*********************************************************************** 00292 00293 Create a listener of the specified type. Listeners are 00294 run within their own thread, since they spend the vast 00295 majority of their time blocked on a Socket read. Would 00296 be good to support multiplexed reading instead, such 00297 that a thread pool could be applied instead. 00298 00299 ***********************************************************************/ 00300 00301 IConsumer createConsumer (IChannel channel, IEvent.Style style, 00302 IEventListener notify) 00303 { 00304 IEvent event = new ClusterEvent (this, channel, style, notify); 00305 00306 if (logger) 00307 logger.info ("creating " ~ event.getStyleName ~ 00308 " consumer for channel '" ~ channel.getName ~ "'"); 00309 00310 switch (style) 00311 { 00312 case IEvent.Style.Message: 00313 return new MessageConsumer (this, event); 00314 00315 case IEvent.Style.Bulletin: 00316 return new BulletinConsumer (this, event); 00317 } 00318 } 00319 00320 /*********************************************************************** 00321 00322 Return a entry from the network cache, and optionally 00323 remove it. This is a synchronous operation as opposed 00324 to the asynchronous nature of an invalidate broadcast. 00325 00326 ***********************************************************************/ 00327 00328 IPayload getCache (IChannel channel, char[] key, bool remove) 00329 { 00330 Channel c = cast(Channel) channel; 00331 00332 c.writer.put (remove ? ProtocolWriter.Command.Remove : 00333 ProtocolWriter.Command.Copy, c.getName, null, key); 00334 return nodeSet.request (c.writer, c.reader, key); 00335 } 00336 00337 /*********************************************************************** 00338 00339 Place an entry into the network cache, replacing the 00340 entry with the identical key. Note that this may cause 00341 the oldest entry in the cache to be displaced if the 00342 cache is already full. 00343 00344 ***********************************************************************/ 00345 00346 IPayload putCache (IChannel channel, char[] key, IPayload payload) 00347 { 00348 Channel c = cast(Channel) channel; 00349 00350 c.writer.put (ProtocolWriter.Command.Add, c.getName, payload, key); 00351 return nodeSet.request (c.writer, c.reader, key); 00352 } 00353 00354 /*********************************************************************** 00355 00356 Add an entry to the specified network queue. May throw a 00357 QueueFullException if there's no room available. 00358 00359 ***********************************************************************/ 00360 00361 IPayload putQueue (IChannel channel, IPayload payload) 00362 { 00363 Channel c = cast(Channel) channel; 00364 00365 c.writer.put (ProtocolWriter.Command.AddQueue, c.getName, payload); 00366 nodeSet.request (c.writer, c.reader); 00367 return payload; 00368 } 00369 00370 00371 /*********************************************************************** 00372 00373 Query the cluster for queued entries on the corresponding 00374 channel. Returns, and removes, the first matching entry 00375 from the cluster. Note that this sweeps the cluster for 00376 matching entries, and is synchronous in nature. The more 00377 common approach is to setup a queue listener, which will 00378 grab and dispatch queue entries asynchronously. 00379 00380 ***********************************************************************/ 00381 00382 IPayload getQueue (IChannel channel) 00383 { 00384 IPayload payload; 00385 00386 Channel c = cast(Channel) channel; 00387 00388 // callback for NodeSet.scan() 00389 bool scan (Node node) 00390 { 00391 // ask this node ... 00392 c.writer.put (ProtocolWriter.Command.RemoveQueue, c.getName); 00393 node.request (node.cache, c.writer, c.reader, payload); 00394 return payload !== null; 00395 } 00396 00397 // make a pass over each Node, looking for channel entries 00398 nodeSet.scan (&scan); 00399 return payload; 00400 } 00401 00402 /*********************************************************************** 00403 00404 Load a network cache entry remotely. This sends the given 00405 Payload over the network to the cache host, where it will 00406 be executed locally. The benefit of doing so it that the 00407 host may deny access to the cache entry for the duration 00408 of the load operation. This, in turn, provides an elegant 00409 mechanism for gating/synchronizing multiple network clients 00410 over a given cache entry; handy for those entries that are 00411 relatively expensive to construct or access. 00412 00413 ***********************************************************************/ 00414 00415 void loadCache (IChannel channel, char[] key, IPayload payload) 00416 { 00417 Channel c = cast(Channel) channel; 00418 00419 c.writer.put (ProtocolWriter.Command.OK, c.getName, payload, key); 00420 Node node = nodeSet.selectNode (key); 00421 node.request (node.tasks, c.writer, c.reader, payload); 00422 } 00423 00424 /*********************************************************************** 00425 00426 Return an internet address representing the multicast 00427 group for the specified channel. We use three of the 00428 four address segments to represent the channel itself 00429 (via a hash on the channel name), and set the primary 00430 segment to be that of the broadcast prefix (above). 00431 00432 ***********************************************************************/ 00433 00434 synchronized InternetAddress getGroup (char[] channel) 00435 { 00436 InternetAddress group = cast(InternetAddress) groups.get (channel); 00437 00438 if (group is null) 00439 { 00440 char[5] tmp; 00441 char[] address; 00442 00443 // construct a group address from the prefix & channel-hash, 00444 // where the hash is folded down to 24 bits 00445 uint hash = groups.jhash (channel); 00446 hash = (hash >> 24) ^ (hash & 0x00ffffff); 00447 00448 address = Int.format (groupPrefix) ~ "." ~ 00449 Int.format ((hash >> 16) & 0xff) ~ "." ~ 00450 Int.format ((hash >> 8) & 0xff) ~ "." ~ 00451 Int.format (hash & 0xff); 00452 00453 //printf ("channel '%.*s' == '%.*s'\n", channel, address); 00454 00455 // insert InternetAddress into hashmap 00456 group = new InternetAddress (address, groupPort); 00457 groups.put (channel, group); 00458 } 00459 return group; 00460 } 00461 } 00462 00463 00464 /******************************************************************************* 00465 00466 A listener for multicast channel traffic. These are currently used 00467 for cache coherency, queue publishing, and node discovery activity; 00468 though could be used for direct messaging also. 00469 00470 *******************************************************************************/ 00471 00472 private class BulletinConsumer : SocketListener, IConsumer 00473 { 00474 private IEvent event; 00475 private Buffer buffer; 00476 private ProtocolReader reader; 00477 private Cluster cluster; 00478 private MulticastSocket consumer; 00479 00480 /*********************************************************************** 00481 00482 Construct a multicast consumer for the specified event. The 00483 event handler will be invoked whenever a message arrives for 00484 the associated channel. 00485 00486 ***********************************************************************/ 00487 00488 this (Cluster cluster, IEvent event) 00489 { 00490 this.event = event; 00491 this.cluster = cluster; 00492 00493 // buffer doesn't need to be larger than Ethernet data-frame 00494 buffer = new Buffer (1500); 00495 00496 // make the reader slice directly from the buffer content 00497 reader = new ProtocolReader (buffer); 00498 reader.setAllocator (new BufferAllocator); 00499 00500 // configure a listener socket 00501 consumer = new MulticastSocket; 00502 consumer.join (cluster.getGroup (event.getChannel.getName)); 00503 00504 super (consumer, buffer); 00505 00506 // fire up this listener 00507 start (); 00508 } 00509 00510 /*********************************************************************** 00511 00512 Notification callback invoked when we receive a multicast 00513 packet. Note that we check the packet channel-name against 00514 the one we're consuming, to check for cases where the group 00515 address had a hash collision. 00516 00517 ***********************************************************************/ 00518 00519 override void notify (IBuffer buffer) 00520 { 00521 ProtocolWriter.Command cmd; 00522 char[] channel; 00523 char[] element; 00524 00525 IPayload payload = reader.getPayload (channel, element, cmd); 00526 // printf ("notify '%.*s::%.*s' #'%.*s'\n", channel, element, event.getChannel.getName); 00527 00528 // check it's really for us first (might be a hash collision) 00529 if (channel == event.getChannel.getName) 00530 invoke (event, payload); 00531 } 00532 00533 /*********************************************************************** 00534 00535 Handle error conditions from the listener thread. 00536 00537 ***********************************************************************/ 00538 00539 override void exception (char [] msg) 00540 { 00541 cluster.getLogger.error ("BulletinConsumer: "~msg); 00542 } 00543 00544 /*********************************************************************** 00545 00546 Overridable mean of notifying the client code. 00547 00548 ***********************************************************************/ 00549 00550 protected void invoke (IEvent event, IPayload payload) 00551 { 00552 event.invoke (payload); 00553 } 00554 00555 /*********************************************************************** 00556 00557 Return the cluster instance we're associated with. 00558 00559 ***********************************************************************/ 00560 00561 Cluster getCluster () 00562 { 00563 return cluster; 00564 } 00565 00566 /*********************************************************************** 00567 00568 Temporarily halt listening. This can be used to ignore 00569 multicast messages while, for example, the consumer is 00570 busy doing other things. 00571 00572 ***********************************************************************/ 00573 00574 void pauseGroup () 00575 { 00576 consumer.pauseGroup (); 00577 } 00578 00579 /*********************************************************************** 00580 00581 Resume listening, post-pause. 00582 00583 ***********************************************************************/ 00584 00585 void resumeGroup () 00586 { 00587 consumer.resumeGroup (); 00588 } 00589 00590 /*********************************************************************** 00591 00592 Cancel this consumer. The listener is effectively disabled 00593 from this point forward. The listener thread does not halt 00594 at this point, but waits until the socket-read returns. 00595 Note that the D Interface implementation requires us to 00596 "reimplement and dispatch" trivial things like this ~ it's 00597 a pain in the neck to maintain. 00598 00599 ***********************************************************************/ 00600 00601 void cancel () 00602 { 00603 super.cancel (); 00604 } 00605 } 00606 00607 00608 /******************************************************************************* 00609 00610 A listener for queue events. These events are produced by the 00611 queue host on a periodic bases when it has available entries. 00612 We listen for them (rather than constantly scanning) and then 00613 begin a sweep to process as many as we can. Note that we will 00614 be in competition with other nodes to process these entries. 00615 00616 *******************************************************************************/ 00617 00618 private class MessageConsumer : BulletinConsumer 00619 { 00620 /*********************************************************************** 00621 00622 Construct a multicast consumer for the specified event 00623 00624 ***********************************************************************/ 00625 00626 this (Cluster cluster, IEvent event) 00627 { 00628 super (cluster, event); 00629 } 00630 00631 /*********************************************************************** 00632 00633 Handle error conditions from the listener thread. 00634 00635 ***********************************************************************/ 00636 00637 override void exception (char [] msg) 00638 { 00639 cluster.getLogger.error ("MessageConsumer: "~msg); 00640 } 00641 00642 /*********************************************************************** 00643 00644 override the default processing to sweep the cluster for 00645 queued entries. Each server node is queried until one is 00646 found that contains a payload. Note that it is possible 00647 to set things up where we are told exactly which node to 00648 go to; howerver given that we won't be listening whilst 00649 scanning, and that there's likely to be a group of new 00650 entries in the cluster, it's just as effective to scan. 00651 This will be far from ideal for all environments, so we 00652 should make the strategy plugable instead. 00653 00654 ***********************************************************************/ 00655 00656 override protected void invoke (IEvent event, IPayload payload) 00657 { 00658 //temporarilty pause listening 00659 pauseGroup (); 00660 00661 try { 00662 while ((payload = getCluster.getQueue (event.getChannel)) !== null) 00663 event.invoke (payload); 00664 } finally 00665 // resume listening 00666 resumeGroup (); 00667 } 00668 } 00669 00670 00671 /******************************************************************************* 00672 00673 A channel represents something akin to a publish/subscribe topic, 00674 or a radio station. These are used to segregate cluster operations 00675 into a set of groups, where each group is represented by a channel. 00676 Channel names are whatever you want then to be: use of dot notation 00677 has proved useful in the past. See Client.createChannel 00678 00679 *******************************************************************************/ 00680 00681 private class Channel : IChannel 00682 { 00683 char[] name; 00684 Buffer buffer; 00685 ProtocolReader reader; 00686 ProtocolWriter writer; 00687 00688 /*********************************************************************** 00689 00690 Construct a channel with the specified name. We cache 00691 a number of session-related constructs here also, in 00692 order to eliminate runtime overhead 00693 00694 ***********************************************************************/ 00695 00696 this (char[] name) 00697 in { 00698 assert (name.length); 00699 } 00700 body 00701 { 00702 this.name = name; 00703 00704 // this buffer will grow as required to house larger payloads 00705 buffer = new GrowableBuffer (1024 * 2); 00706 writer = new ProtocolWriter (buffer); 00707 00708 // make the reader slice directly from the buffer content 00709 reader = new ProtocolReader (buffer); 00710 reader.setAllocator (new BufferAllocator); 00711 } 00712 00713 /*********************************************************************** 00714 00715 Return the name of this channel. This is the name provided 00716 when the channel was constructed. 00717 00718 ***********************************************************************/ 00719 00720 char[] getName () 00721 { 00722 return name; 00723 } 00724 00725 /*********************************************************************** 00726 00727 Output this channel via the provided IWriter 00728 00729 ***********************************************************************/ 00730 00731 void write (IWriter writer) 00732 { 00733 writer.put (name); 00734 } 00735 00736 /*********************************************************************** 00737 00738 Input this channel via the provided IReader 00739 00740 ***********************************************************************/ 00741 00742 void read (IReader reader) 00743 { 00744 reader.get (name); 00745 } 00746 } 00747 00748 00749 /******************************************************************************* 00750 00751 An abstraction of a socket connection. Used internally by the 00752 socket-based Cluster. 00753 00754 *******************************************************************************/ 00755 00756 private class Connection 00757 { 00758 abstract bool reset(); 00759 00760 abstract void done (ulong time); 00761 00762 abstract SocketConduit getConduit (); 00763 } 00764 00765 00766 /******************************************************************************* 00767 00768 A pool of socket connections for accessing cluster nodes. Note 00769 that the entries will timeout after a period of inactivity, and 00770 will subsequently cause a connected host to drop the supporting 00771 session. 00772 00773 *******************************************************************************/ 00774 00775 private class ConnectionPool 00776 { 00777 private int count; 00778 private InternetAddress address; 00779 private PoolConnection freelist; 00780 private const ulong timeout = 60_000; 00781 00782 /*********************************************************************** 00783 00784 Utility class to provide the basic connection facilities 00785 provided by the connection pool. 00786 00787 ***********************************************************************/ 00788 00789 class PoolConnection : Connection 00790 { 00791 ulong time; 00792 PoolConnection next; 00793 ConnectionPool parent; 00794 SocketConduit conduit; 00795 00796 /*************************************************************** 00797 00798 Construct a new connection and set its parent 00799 00800 ***************************************************************/ 00801 00802 this (ConnectionPool pool) 00803 { 00804 parent = pool; 00805 reset (); 00806 } 00807 00808 /*************************************************************** 00809 00810 Return the socket belonging to this connection 00811 00812 ***************************************************************/ 00813 00814 SocketConduit getConduit () 00815 { 00816 return conduit; 00817 } 00818 00819 /*************************************************************** 00820 00821 Create a new socket and connect it to the specified 00822 server. This will cause a dedicated thread to start 00823 on the server. Said thread will quit when an error 00824 occurs. 00825 00826 ***************************************************************/ 00827 00828 bool reset () 00829 { 00830 try { 00831 conduit = new SocketConduit; 00832 conduit.connect (parent.address); 00833 00834 // set a 500ms timeout for read operations 00835 conduit.setTimeout (System.Interval.Millisec * 500); 00836 return true; 00837 00838 } catch (Object) 00839 { 00840 return false; 00841 } 00842 } 00843 00844 /*************************************************************** 00845 00846 Close the socket. This will cause any host session 00847 to be terminated. 00848 00849 ***************************************************************/ 00850 00851 void close () 00852 { 00853 conduit.close (); 00854 } 00855 00856 /*************************************************************** 00857 00858 Return this connection to the free-list. Note that 00859 we have to synchronize on the parent-pool itself. 00860 00861 ***************************************************************/ 00862 00863 void done (ulong time) 00864 { 00865 synchronized (parent) 00866 { 00867 next = parent.freelist; 00868 parent.freelist = this; 00869 this.time = time; 00870 } 00871 } 00872 } 00873 00874 00875 /*********************************************************************** 00876 00877 Create a connection-pool for the specified address. 00878 00879 ***********************************************************************/ 00880 00881 this (InternetAddress address) 00882 { 00883 this.address = address; 00884 } 00885 00886 /*********************************************************************** 00887 00888 Allocate a Connection from a list rather than creating a 00889 new one. Reap old entries as we go. 00890 00891 ***********************************************************************/ 00892 00893 synchronized Connection borrow (ulong time) 00894 { 00895 if (freelist) 00896 do { 00897 PoolConnection c = freelist; 00898 00899 freelist = c.next; 00900 if (freelist && (time - c.time > timeout)) 00901 c.close (); 00902 else 00903 return c; 00904 } while (true); 00905 00906 return new PoolConnection (this); 00907 } 00908 00909 /*********************************************************************** 00910 00911 Close this pool and drop all existing connections. 00912 00913 ***********************************************************************/ 00914 00915 synchronized void close () 00916 { 00917 PoolConnection c = freelist; 00918 freelist = null; 00919 while (c) 00920 { 00921 c.close (); 00922 c = c.next; 00923 } 00924 } 00925 } 00926 00927 00928 /******************************************************************************* 00929 00930 Class to represent a cluster node. Each node supports both cache 00931 and queue functionality. Note that the set of available nodes is 00932 configured at startup, simplifying the discovery process in some 00933 significant ways, and causing less thrashing of cache-keys. 00934 00935 *******************************************************************************/ 00936 00937 private class Node 00938 { 00939 private char[] name, 00940 port; 00941 private ILogger logger; 00942 private bool enabled; 00943 00944 private ConnectionPool tasks, 00945 cache; 00946 00947 /*********************************************************************** 00948 00949 Construct a node with the provided name. This name should 00950 be the network name of the hosting device. 00951 00952 ***********************************************************************/ 00953 00954 this (ILogger logger, char[] name) 00955 { 00956 this.logger = logger; 00957 this.name = name; 00958 } 00959 00960 /*********************************************************************** 00961 00962 Add a cache/queue reference for the remote node 00963 00964 ***********************************************************************/ 00965 00966 void setCache (InternetAddress address) 00967 { 00968 this.cache = new ConnectionPool (address); 00969 port = Int.format (address.port); 00970 } 00971 00972 /*********************************************************************** 00973 00974 Add a cache-loader reference for the remote node 00975 00976 ***********************************************************************/ 00977 00978 void setTasks (InternetAddress address) 00979 { 00980 this.tasks = new ConnectionPool (address); 00981 } 00982 00983 /*********************************************************************** 00984 00985 Return the name of this node (the network name of the device) 00986 00987 ***********************************************************************/ 00988 00989 override char[] toString () 00990 { 00991 return name; 00992 } 00993 00994 /*********************************************************************** 00995 00996 Remove this Node from the cluster. The node is disabled 00997 until it is seen to recover. 00998 00999 ***********************************************************************/ 01000 01001 void fail () 01002 { 01003 setEnabled (false); 01004 cache.close (); 01005 tasks.close (); 01006 } 01007 01008 /*********************************************************************** 01009 01010 Get the current state of this node 01011 01012 ***********************************************************************/ 01013 01014 bool isEnabled () 01015 { 01016 volatile 01017 return enabled; 01018 } 01019 01020 /*********************************************************************** 01021 01022 Set the enabled state of this node 01023 01024 ***********************************************************************/ 01025 01026 void setEnabled (bool enabled) 01027 { 01028 if (logger && logger.isEnabled (logger.Level.Trace)) 01029 { 01030 if (enabled) 01031 logger.trace ("enabling node '"~name~"' on port "~port); 01032 else 01033 logger.trace ("disabling node '"~name~"'"); 01034 } 01035 volatile 01036 this.enabled = enabled; 01037 } 01038 01039 /*********************************************************************** 01040 01041 request data; fail this Node if we can't connect. Note 01042 that we make several attempts to connect before writing 01043 the node off as a failure. 01044 01045 ***********************************************************************/ 01046 01047 bool request (ConnectionPool pool, ProtocolWriter writer, ProtocolReader reader, out IPayload payload) 01048 { 01049 ProtocolWriter.Command cmd; 01050 ulong time; 01051 char[] channel; 01052 char[] element; 01053 Connection connect; 01054 01055 // it's possible that the pool may have failed between 01056 // the point of selecting it, and the invocation itself 01057 if (pool is null) 01058 return false; 01059 01060 // get a connection to the server 01061 connect = pool.borrow (time = System.getMillisecs); 01062 01063 // talk to the server (try a few times if necessary) 01064 for (int attempts=3; attempts--;) 01065 try { 01066 // attach connection to reader and writer 01067 writer.getBuffer.setConduit (connect.getConduit); 01068 reader.getBuffer.setConduit (connect.getConduit); 01069 01070 // send request 01071 writer.flush (); 01072 01073 // load the returned object (don't retry!) 01074 attempts = 0; 01075 payload = reader.getPayload (channel, element, cmd); 01076 01077 // return borrowed connection 01078 connect.done (time); 01079 01080 } catch (PickleException x) 01081 { 01082 connect.done (time); 01083 throw x; 01084 } 01085 catch (IOException x) 01086 // attempt to reconnect? 01087 if (attempts == 0 || !connect.reset) 01088 { 01089 // that server is offline 01090 fail (); 01091 01092 // state that we failed 01093 return false; 01094 } 01095 01096 // is payload an exception? 01097 if (cmd != ProtocolWriter.Command.OK) 01098 { 01099 // is node full? 01100 if (cmd == ProtocolWriter.Command.Full) 01101 throw new ClusterFullException (channel); 01102 01103 // did node barf? 01104 if (cmd == ProtocolWriter.Command.Exception) 01105 throw new ClusterException (channel); 01106 01107 // bogus response 01108 throw new ClusterException ("invalid response from cluster server"); 01109 } 01110 01111 return true; 01112 } 01113 } 01114 01115 01116 /******************************************************************************* 01117 01118 Models a set of remote cluster nodes. 01119 01120 *******************************************************************************/ 01121 01122 private class NodeSet 01123 { 01124 private HashMap map; 01125 private Node[] nodes; 01126 private Node[] random; 01127 private ILogger logger; 01128 01129 /*********************************************************************** 01130 01131 ***********************************************************************/ 01132 01133 this (ILogger logger) 01134 { 01135 this.logger = logger; 01136 map = new HashMap (128, 0.75, 4); 01137 } 01138 01139 /*********************************************************************** 01140 01141 ***********************************************************************/ 01142 01143 ILogger getLogger () 01144 { 01145 return logger; 01146 } 01147 01148 /*********************************************************************** 01149 01150 Add a node to the list of servers, and sort them such that 01151 all clients will have the same ordered set 01152 01153 ***********************************************************************/ 01154 01155 synchronized void addNode (Node node) 01156 { 01157 char[] name = node.toString; 01158 01159 if (map.get (name)) 01160 throw new ClusterException ("Attempt to add cluster node '"~name~"' more than once"); 01161 01162 map.put (name, node); 01163 nodes ~= node; 01164 //nodes.sort; 01165 } 01166 01167 /*********************************************************************** 01168 01169 ***********************************************************************/ 01170 01171 void optimize () 01172 { 01173 // nodes are already in the same order across the cluster 01174 // since the configuration file should be identical ... 01175 01176 // nodes.sort; 01177 01178 // copy the node list 01179 random = nodes.dup; 01180 01181 // muddle up the duplicate list. This randomized list 01182 // is used when scanning the cluster for queued entries 01183 foreach (int i, Node n; random) 01184 { 01185 int j = Random.get (random.length); 01186 Node tmp = random[i]; 01187 random[i] = random[j]; 01188 random[j] = tmp; 01189 } 01190 } 01191 01192 /*********************************************************************** 01193 01194 ***********************************************************************/ 01195 01196 synchronized void enable (char[] name, ushort port1, ushort port2) 01197 { 01198 Node node = cast(Node) map.get (name); 01199 if (node is null) 01200 throw new ClusterException ("Attempt to enable unknown cluster node '"~name~"'"); 01201 else 01202 if (! node.isEnabled) 01203 { 01204 node.setCache (new InternetAddress (name, port1)); 01205 node.setTasks (new InternetAddress (name, port2)); 01206 node.setEnabled (true); 01207 } 01208 } 01209 01210 /*********************************************************************** 01211 01212 ***********************************************************************/ 01213 01214 IPayload request (ProtocolWriter writer, ProtocolReader reader, char[] key = null) 01215 { 01216 Node node; 01217 IPayload payload; 01218 01219 do { 01220 node = selectNode (key); 01221 } while (! node.request (node.cache, writer, reader, payload)); 01222 return payload; 01223 } 01224 01225 /*********************************************************************** 01226 01227 Select a cluster server based on a starting index. If the 01228 selected server is not currently enabled, we just try the 01229 next one. This behaviour should be consistent across each 01230 cluster client. 01231 01232 ***********************************************************************/ 01233 01234 private final Node selectNode (uint index) 01235 { 01236 uint i = nodes.length; 01237 01238 if (i) 01239 { 01240 index %= i; 01241 01242 while (i--) 01243 { 01244 Node node = nodes[index]; 01245 if (node.isEnabled) 01246 return node; 01247 01248 if (++index >= nodes.length) 01249 index = 0; 01250 } 01251 } 01252 throw new ClusterEmptyException ("No cluster servers are available"); 01253 } 01254 01255 /*********************************************************************** 01256 01257 Select a cluster server based on the specified key. If the 01258 selected server is not currently enabled, we just try the 01259 next one. This behaviour should be consistent across each 01260 cluster client. 01261 01262 ***********************************************************************/ 01263 01264 final Node selectNode (char[] key = null) 01265 { 01266 static uint index; 01267 01268 if (key.length) 01269 return selectNode (HashMap.jhash (key)); 01270 01271 // no key provided, so just roll the counter 01272 return selectNode (++index); 01273 } 01274 01275 /*********************************************************************** 01276 01277 Sweep the cluster servers. Returns true if the delegate 01278 returns true, false otherwise. The sweep is aborted when 01279 the delegate returns true. Note that this scans nodes in 01280 a randomized pattern, which should tend to avoid 'bursty' 01281 activity by a set of clients upon any one cluster server. 01282 01283 ***********************************************************************/ 01284 01285 bool scan (bool delegate (Node) dg) 01286 { 01287 Node node; 01288 int index = nodes.length; 01289 01290 while (index--) 01291 { 01292 // lookup the randomized set of server nodes 01293 node = random [index]; 01294 01295 // callback on each enabled node 01296 if (node.isEnabled) 01297 if (dg (node)) 01298 return true; 01299 } 01300 return false; 01301 } 01302 } 01303 01304 01305