00001 /******************************************************************************* 00002 00003 @file Cluster.d 00004 00005 Copyright (c) 2004 Kris Bell 00006 00007 This software is provided 'as-is', without any express or implied 00008 warranty. In no event will the authors be held liable for damages 00009 of any kind arising from the use of this software. 00010 00011 Permission is hereby granted to anyone to use this software for any 00012 purpose, including commercial applications, and to alter it and/or 00013 redistribute it freely, subject to the following restrictions: 00014 00015 1. The origin of this software must not be misrepresented; you must 00016 not claim that you wrote the original software. If you use this 00017 software in a product, an acknowledgment within documentation of 00018 said product would be appreciated but is not required. 00019 00020 2. Altered source versions must be plainly marked as such, and must 00021 not be misrepresented as being the original software. 00022 00023 3. This notice may not be removed or altered from any distribution 00024 of the source. 00025 00026 4. Derivative works are permitted, but they must carry this notice 00027 in full and credit the original source. 00028 00029 00030 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 00031 00032 00033 @version Initial version, July 2004 00034 @author Kris 00035 00036 00037 *******************************************************************************/ 00038 00039 module mango.cluster.qos.socket.Cluster; 00040 00041 private import mango.cache.HashMap; 00042 00043 private import mango.utils.Text, 00044 mango.utils.Random; 00045 00046 private import mango.base.System; 00047 00048 private import mango.io.Buffer, 00049 mango.io.Socket, 00050 mango.io.Exception, 00051 mango.io.Properties, 00052 mango.io.ArrayAllocator, 00053 mango.io.SocketConduit, 00054 mango.io.SocketListener, 00055 mango.io.MulticastSocket; 00056 00057 private import mango.io.model.IConduit; 00058 00059 private import mango.log.model.ILogger; 00060 00061 private import mango.cluster.Client; 00062 00063 public import mango.cluster.model.ICluster; 00064 00065 private import mango.cluster.qos.socket.RollCall, 00066 mango.cluster.qos.socket.ClusterEvent, 00067 mango.cluster.qos.socket.ProtocolReader, 00068 mango.cluster.qos.socket.ProtocolWriter; 00069 00070 00071 /******************************************************************************* 00072 00073 QOS implementation for sockets. All cluster-client activity is 00074 gated through here by the higher level classes; NetworkQueue & 00075 NetworkCache for example. You gain access to the cluster by 00076 creating an instance of the QOS (quality of service) you desire 00077 and mapping client classes onto it. For example: 00078 00079 @code 00080 import mango.cluster.NetworkCache; 00081 import mango.cluster.qos.socket.Cluster; 00082 00083 ICluster cluster = new Cluster (...); 00084 NetworkCache cache = new NetworkCache (cluster, ...); 00085 00086 cache.put (...); 00087 cache.get (...); 00088 cache.invalidate (...); 00089 @endcode 00090 00091 Please see the cluster clients for additional details. Currently 00092 these include CacheInvalidator, CacheInvalidatee, NetworkMessage, 00093 NetworkTask, NetworkQueue, NetworkCache, NetworkCombo, plus the 00094 Client base-class. 00095 00096 *******************************************************************************/ 00097 00098 class Cluster : ICluster, IEventListener 00099 { 00100 private static HashMap groups; 00101 private ILogger logger; 00102 private NodeSet nodeSet; 00103 private Buffer mBuffer; 00104 private ProtocolWriter mWriter; 00105 private MulticastSocket mSocket; 00106 00107 private ushort groupTTL = 1; 00108 private ushort groupPort = 3333; 00109 private ubyte groupPrefix = 225; 00110 00111 /*********************************************************************** 00112 00113 Setup a hashmap for multicast group addresses 00114 00115 ***********************************************************************/ 00116 00117 static this () 00118 { 00119 groups = new HashMap (128, 0.75, 2); 00120 } 00121 00122 /*********************************************************************** 00123 00124 Setup a Cluster instance. Currently the buffer & writer 00125 are shared for all bulletin serialization; this should 00126 probably change at some point such that we can support 00127 multiple threads broadcasting concurrently to different 00128 output ports. 00129 00130 ***********************************************************************/ 00131 00132 this (ILogger logger = null) 00133 { 00134 this.logger = logger; 00135 nodeSet = new NodeSet (logger); 00136 mBuffer = new Buffer (1024 * 4); 00137 mSocket = new MulticastSocket; 00138 mWriter = new ProtocolWriter (mBuffer); 00139 } 00140 00141 /*********************************************************************** 00142 00143 Setup a Cluster instance. Currently the buffer & writer 00144 are shared for all bulletin serialization; this should 00145 probably change at some point such that we can support 00146 multiple threads broadcasting concurrently to different 00147 output ports. 00148 00149 ***********************************************************************/ 00150 00151 this (ILogger logger, IConduit conduit) 00152 in { 00153 assert (logger); 00154 assert (conduit); 00155 } 00156 body 00157 { 00158 this (logger); 00159 00160 // callback for loading cluster configuration 00161 void loader (char[] name, char[] value) 00162 { 00163 logger.info ("cluster config: "~name~" = "~value); 00164 if (name == "node") 00165 nodeSet.addNode (new Node (logger, value)); 00166 else 00167 if (name == "multicast_port") 00168 groupPort = Text.atoi (value); 00169 else 00170 if (name == "multicast_prefix") 00171 groupPrefix = Text.atoi (value); 00172 else 00173 if (name == "multicast_ttl") 00174 groupTTL = Text.atoi (value); 00175 else 00176 throw new ClusterException ("Unrecognized attribute '"~name~"' in socket.Cluster configuration"); 00177 } 00178 00179 00180 // load up the cluster configuration 00181 Properties.load (conduit, &loader); 00182 00183 // finalize nodeSet 00184 nodeSet.optimize (); 00185 00186 // listen for cluster servers 00187 IChannel channel = createChannel ("cluster.server.advertise"); 00188 createConsumer (channel, IEvent.Style.Bulletin, this); 00189 00190 // ask who's currently running 00191 logger.trace ("discovering active cluster servers ..."); 00192 broadcast (channel, new RollCall (Socket.hostName(), 0, 0, true)); 00193 00194 // wait for enabled servers to respond ... 00195 System.sleep (System.Interval.Millisec * 250); 00196 } 00197 00198 /*********************************************************************** 00199 00200 Setup a Cluster instance. Currently the buffer & writer 00201 are shared for all bulletin serialization; this should 00202 probably change at some point such that we can support 00203 multiple threads broadcasting concurrently to different 00204 output ports. 00205 00206 ***********************************************************************/ 00207 00208 this (ILogger logger, uint serverPort) 00209 in { 00210 assert (logger); 00211 assert (serverPort > 1024); 00212 } 00213 body 00214 { 00215 this (logger); 00216 00217 Node node = new Node (logger, "local"); 00218 node.setCache (new InternetAddress ("localhost", serverPort)); 00219 node.setTasks (new InternetAddress ("localhost", serverPort+1)); 00220 node.setEnabled (true); 00221 nodeSet.addNode (node); 00222 nodeSet.optimize (); 00223 } 00224 00225 /*********************************************************************** 00226 00227 IEventListener interface method for listening to RollCall 00228 responses. These are sent out by cluster servers both when 00229 they get a RollCall request, and when they begin execution. 00230 00231 ***********************************************************************/ 00232 00233 void notify (IEvent event, IPayload payload) 00234 { 00235 RollCall rollcall = cast(RollCall) payload; 00236 00237 // ignore requests from clients (we're on a common channel) 00238 if (! rollcall.request) 00239 nodeSet.enable (rollcall.name, rollcall.port1, rollcall.port2); 00240 } 00241 00242 /*********************************************************************** 00243 00244 Create a channel instance. Our channel implementation 00245 includes a number of cached IO helpers (ProtolcolWriter 00246 and so on) which simplifies and speeds up execution. 00247 00248 ***********************************************************************/ 00249 00250 IChannel createChannel (char[] channel) 00251 { 00252 return new Channel (channel); 00253 } 00254 00255 /*********************************************************************** 00256 00257 Return the logger instance provided during construction. 00258 00259 ***********************************************************************/ 00260 00261 ILogger getLogger () 00262 { 00263 return logger; 00264 } 00265 00266 /*********************************************************************** 00267 00268 Broadcast a payload on the specified channel. This uses 00269 IP/Multicast to scatter the payload to all registered 00270 listeners (on the same multicast group). Note that the 00271 maximum payload size is limited to that of an Ethernet 00272 data frame, minus the IP/UDP header size (1472 bytes). 00273 00274 ***********************************************************************/ 00275 00276 synchronized void broadcast (IChannel channel, IPayload payload = null) 00277 { 00278 // serialize content 00279 mBuffer.clear (); 00280 mWriter.put (ProtocolWriter.Command.OK, channel.getName, payload); 00281 00282 // Ethernet data-frame size minus the 28 byte UDP/IP header: 00283 if (mBuffer.getPosition > 1472) 00284 throw new ClusterException ("payload is too large to broadcast"); 00285 00286 // send it to the appropriate multicast group 00287 mSocket.write (mBuffer, getGroup (channel.getName)); 00288 } 00289 00290 /*********************************************************************** 00291 00292 Create a listener of the specified type. Listeners are 00293 run within their own thread, since they spend the vast 00294 majority of their time blocked on a Socket read. Would 00295 be good to support multiplexed reading instead, such 00296 that a thread pool could be applied instead. 00297 00298 ***********************************************************************/ 00299 00300 IConsumer createConsumer (IChannel channel, IEvent.Style style, 00301 IEventListener notify) 00302 { 00303 IEvent event = new ClusterEvent (this, channel, style, notify); 00304 00305 if (logger) 00306 logger.info ("creating " ~ event.getStyleName ~ 00307 " consumer for channel '" ~ channel.getName ~ "'"); 00308 00309 switch (style) 00310 { 00311 case IEvent.Style.Message: 00312 return new MessageConsumer (this, event); 00313 00314 case IEvent.Style.Bulletin: 00315 return new BulletinConsumer (this, event); 00316 } 00317 } 00318 00319 /*********************************************************************** 00320 00321 Return a entry from the network cache, and optionally 00322 remove it. This is a synchronous operation as opposed 00323 to the asynchronous nature of an invalidate broadcast. 00324 00325 ***********************************************************************/ 00326 00327 IPayload getCache (IChannel channel, char[] key, bool remove) 00328 { 00329 Channel c = cast(Channel) channel; 00330 00331 c.writer.put (remove ? ProtocolWriter.Command.Remove : 00332 ProtocolWriter.Command.Copy, c.getName, null, key); 00333 return nodeSet.request (c.writer, c.reader, key); 00334 } 00335 00336 /*********************************************************************** 00337 00338 Place an entry into the network cache, replacing the 00339 entry with the identical key. Note that this may cause 00340 the oldest entry in the cache to be displaced if the 00341 cache is already full. 00342 00343 ***********************************************************************/ 00344 00345 IPayload putCache (IChannel channel, char[] key, IPayload payload) 00346 { 00347 Channel c = cast(Channel) channel; 00348 00349 c.writer.put (ProtocolWriter.Command.Add, c.getName, payload, key); 00350 return nodeSet.request (c.writer, c.reader, key); 00351 } 00352 00353 /*********************************************************************** 00354 00355 Add an entry to the specified network queue. May throw a 00356 QueueFullException if there's no room available. 00357 00358 ***********************************************************************/ 00359 00360 IPayload putQueue (IChannel channel, IPayload payload) 00361 { 00362 Channel c = cast(Channel) channel; 00363 00364 c.writer.put (ProtocolWriter.Command.AddQueue, c.getName, payload); 00365 nodeSet.request (c.writer, c.reader); 00366 return payload; 00367 } 00368 00369 00370 /*********************************************************************** 00371 00372 Query the cluster for queued entries on the corresponding 00373 channel. Returns, and removes, the first matching entry 00374 from the cluster. Note that this sweeps the cluster for 00375 matching entries, and is synchronous in nature. The more 00376 common approach is to setup a queue listener, which will 00377 grab and dispatch queue entries asynchronously. 00378 00379 ***********************************************************************/ 00380 00381 IPayload getQueue (IChannel channel) 00382 { 00383 IPayload payload; 00384 00385 Channel c = cast(Channel) channel; 00386 00387 // callback for NodeSet.scan() 00388 bool scan (Node node) 00389 { 00390 // ask this node ... 00391 c.writer.put (ProtocolWriter.Command.RemoveQueue, c.getName); 00392 node.request (node.cache, c.writer, c.reader, payload); 00393 return payload !== null; 00394 } 00395 00396 // make a pass over each Node, looking for channel entries 00397 nodeSet.scan (&scan); 00398 return payload; 00399 } 00400 00401 /*********************************************************************** 00402 00403 Load a network cache entry remotely. This sends the given 00404 Payload over the network to the cache host, where it will 00405 be executed locally. The benefit of doing so it that the 00406 host may deny access to the cache entry for the duration 00407 of the load operation. This, in turn, provides an elegant 00408 mechanism for gating/synchronizing multiple network clients 00409 over a given cache entry; handy for those entries that are 00410 relatively expensive to construct or access. 00411 00412 ***********************************************************************/ 00413 00414 void loadCache (IChannel channel, char[] key, IPayload payload) 00415 { 00416 Channel c = cast(Channel) channel; 00417 00418 c.writer.put (ProtocolWriter.Command.OK, c.getName, payload, key); 00419 Node node = nodeSet.selectNode (key); 00420 node.request (node.tasks, c.writer, c.reader, payload); 00421 } 00422 00423 /*********************************************************************** 00424 00425 Return an internet address representing the multicast 00426 group for the specified channel. We use three of the 00427 four address segments to represent the channel itself 00428 (via a hash on the channel name), and set the primary 00429 segment to be that of the broadcast prefix (above). 00430 00431 ***********************************************************************/ 00432 00433 synchronized InternetAddress getGroup (char[] channel) 00434 { 00435 InternetAddress group = cast(InternetAddress) groups.get (channel); 00436 00437 if (group is null) 00438 { 00439 char[] address; 00440 00441 // construct a group address from the prefix & channel-hash, 00442 // where the hash is folded down to 24 bits 00443 uint hash = groups.jhash (channel); 00444 hash = (hash >> 24) ^ (hash & 0x00ffffff); 00445 00446 address = Text.toString (groupPrefix) ~ "." ~ 00447 Text.toString ((hash >> 16) & 0xff) ~ "." ~ 00448 Text.toString ((hash >> 8) & 0xff) ~ "." ~ 00449 Text.toString (hash & 0xff); 00450 00451 //printf ("channel '%.*s' == '%.*s'\n", channel, address); 00452 00453 // insert InternetAddress into hashmap 00454 group = new InternetAddress (address, groupPort); 00455 groups.put (channel, group); 00456 } 00457 return group; 00458 } 00459 } 00460 00461 00462 /******************************************************************************* 00463 00464 A listener for multicast channel traffic. These are currently used 00465 for cache coherency, queue publishing, and node discovery activity; 00466 though could be used for direct messaging also. 00467 00468 *******************************************************************************/ 00469 00470 private class BulletinConsumer : SocketListener, IConsumer 00471 { 00472 private IEvent event; 00473 private Buffer buffer; 00474 private ProtocolReader reader; 00475 private Cluster cluster; 00476 private MulticastSocket consumer; 00477 00478 /*********************************************************************** 00479 00480 Construct a multicast consumer for the specified event. The 00481 event handler will be invoked whenever a message arrives for 00482 the associated channel. 00483 00484 ***********************************************************************/ 00485 00486 this (Cluster cluster, IEvent event) 00487 { 00488 this.event = event; 00489 this.cluster = cluster; 00490 00491 // buffer doesn't need to be larger than Ethernet data-frame 00492 buffer = new Buffer (1500); 00493 00494 // make the reader slice directly from the buffer content 00495 reader = new ProtocolReader (buffer); 00496 reader.setAllocator (new BufferAllocator); 00497 00498 // configure a listener socket 00499 consumer = new MulticastSocket; 00500 consumer.join (cluster.getGroup (event.getChannel.getName)); 00501 00502 super (consumer, buffer); 00503 00504 // fire up this listener 00505 start (); 00506 } 00507 00508 /*********************************************************************** 00509 00510 Notification callback invoked when we receive a multicast 00511 packet. Note that we check the packet channel-name against 00512 the one we're consuming, to check for cases where the group 00513 address had a hash collision. 00514 00515 ***********************************************************************/ 00516 00517 override void notify (IBuffer buffer) 00518 { 00519 ProtocolWriter.Command cmd; 00520 char[] channel; 00521 char[] element; 00522 00523 IPayload payload = reader.getPayload (channel, element, cmd); 00524 // printf ("notify '%.*s::%.*s' #'%.*s'\n", channel, element, event.getChannel.getName); 00525 00526 // check it's really for us first (might be a hash collision) 00527 if (channel == event.getChannel.getName) 00528 invoke (event, payload); 00529 } 00530 00531 /*********************************************************************** 00532 00533 Handle error conditions from the listener thread. 00534 00535 ***********************************************************************/ 00536 00537 override void exception (char [] msg) 00538 { 00539 cluster.getLogger.error ("BulletinConsumer: "~msg); 00540 } 00541 00542 /*********************************************************************** 00543 00544 Overridable mean of notifying the client code. 00545 00546 ***********************************************************************/ 00547 00548 protected void invoke (IEvent event, IPayload payload) 00549 { 00550 event.invoke (payload); 00551 } 00552 00553 /*********************************************************************** 00554 00555 Return the cluster instance we're associated with. 00556 00557 ***********************************************************************/ 00558 00559 Cluster getCluster () 00560 { 00561 return cluster; 00562 } 00563 00564 /*********************************************************************** 00565 00566 Temporarily halt listening. This can be used to ignore 00567 multicast messages while, for example, the consumer is 00568 busy doing other things. 00569 00570 ***********************************************************************/ 00571 00572 void pauseGroup () 00573 { 00574 consumer.pauseGroup (); 00575 } 00576 00577 /*********************************************************************** 00578 00579 Resume listening, post-pause. 00580 00581 ***********************************************************************/ 00582 00583 void resumeGroup () 00584 { 00585 consumer.resumeGroup (); 00586 } 00587 00588 /*********************************************************************** 00589 00590 Cancel this consumer. The listener is effectively disabled 00591 from this point forward. The listener thread does not halt 00592 at this point, but waits until the socket-read returns. 00593 Note that the D Interface implementation requires us to 00594 "reimplement and dispatch" trivial things like this ~ it's 00595 a pain in the neck to maintain. 00596 00597 ***********************************************************************/ 00598 00599 void cancel () 00600 { 00601 super.cancel (); 00602 } 00603 } 00604 00605 00606 /******************************************************************************* 00607 00608 A listener for queue events. These events are produced by the 00609 queue host on a periodic bases when it has available entries. 00610 We listen for them (rather than constantly scanning) and then 00611 begin a sweep to process as many as we can. Note that we will 00612 be in competition with other nodes to process these entries. 00613 00614 *******************************************************************************/ 00615 00616 private class MessageConsumer : BulletinConsumer 00617 { 00618 /*********************************************************************** 00619 00620 Construct a multicast consumer for the specified event 00621 00622 ***********************************************************************/ 00623 00624 this (Cluster cluster, IEvent event) 00625 { 00626 super (cluster, event); 00627 } 00628 00629 /*********************************************************************** 00630 00631 Handle error conditions from the listener thread. 00632 00633 ***********************************************************************/ 00634 00635 override void exception (char [] msg) 00636 { 00637 cluster.getLogger.error ("MessageConsumer: "~msg); 00638 } 00639 00640 /*********************************************************************** 00641 00642 override the default processing to sweep the cluster for 00643 queued entries. Each server node is queried until one is 00644 found that contains a payload. Note that it is possible 00645 to set things up where we are told exactly which node to 00646 go to; howerver given that we won't be listening whilst 00647 scanning, and that there's likely to be a group of new 00648 entries in the cluster, it's just as effective to scan. 00649 This will be far from ideal for all environments, so we 00650 should make the strategy plugable instead. 00651 00652 ***********************************************************************/ 00653 00654 override protected void invoke (IEvent event, IPayload payload) 00655 { 00656 //temporarilty pause listening 00657 pauseGroup (); 00658 00659 try { 00660 while ((payload = getCluster.getQueue (event.getChannel)) !== null) 00661 event.invoke (payload); 00662 } finally 00663 // resume listening 00664 resumeGroup (); 00665 } 00666 } 00667 00668 00669 /******************************************************************************* 00670 00671 A channel represents something akin to a publish/subscribe topic, 00672 or a radio station. These are used to segregate cluster operations 00673 into a set of groups, where each group is represented by a channel. 00674 Channel names are whatever you want then to be: use of dot notation 00675 has proved useful in the past. See Client.createChannel 00676 00677 *******************************************************************************/ 00678 00679 private class Channel : IChannel 00680 { 00681 char[] name; 00682 Buffer buffer; 00683 ProtocolReader reader; 00684 ProtocolWriter writer; 00685 00686 /*********************************************************************** 00687 00688 Construct a channel with the specified name. We cache 00689 a number of session-related constructs here also, in 00690 order to eliminate runtime overhead 00691 00692 ***********************************************************************/ 00693 00694 this (char[] name) 00695 in { 00696 assert (name.length); 00697 } 00698 body 00699 { 00700 this.name = name; 00701 00702 // this buffer will grow as required to house larger payloads 00703 buffer = new GrowableBuffer (1024 * 2); 00704 writer = new ProtocolWriter (buffer); 00705 00706 // make the reader slice directly from the buffer content 00707 reader = new ProtocolReader (buffer); 00708 reader.setAllocator (new BufferAllocator); 00709 } 00710 00711 /*********************************************************************** 00712 00713 Return the name of this channel. This is the name provided 00714 when the channel was constructed. 00715 00716 ***********************************************************************/ 00717 00718 char[] getName () 00719 { 00720 return name; 00721 } 00722 00723 /*********************************************************************** 00724 00725 Output this channel via the provided IWriter 00726 00727 ***********************************************************************/ 00728 00729 void write (IWriter writer) 00730 { 00731 writer.put (name); 00732 } 00733 00734 /*********************************************************************** 00735 00736 Input this channel via the provided IReader 00737 00738 ***********************************************************************/ 00739 00740 void read (IReader reader) 00741 { 00742 reader.get (name); 00743 } 00744 } 00745 00746 00747 /******************************************************************************* 00748 00749 An abstraction of a socket connection. Used internally by the 00750 socket-based Cluster. 00751 00752 *******************************************************************************/ 00753 00754 private class Connection 00755 { 00756 abstract bool reset(); 00757 00758 abstract void done (ulong time); 00759 00760 abstract SocketConduit getConduit (); 00761 } 00762 00763 00764 /******************************************************************************* 00765 00766 A pool of socket connections for accessing cluster nodes. Note 00767 that the entries will timeout after a period of inactivity, and 00768 will subsequently cause a connected host to drop the supporting 00769 session. 00770 00771 *******************************************************************************/ 00772 00773 private class ConnectionPool 00774 { 00775 private int count; 00776 private InternetAddress address; 00777 private PoolConnection freelist; 00778 private const ulong timeout = 60_000; 00779 00780 /*********************************************************************** 00781 00782 Utility class to provide the basic connection facilities 00783 provided by the connection pool. 00784 00785 ***********************************************************************/ 00786 00787 class PoolConnection : Connection 00788 { 00789 ulong time; 00790 PoolConnection next; 00791 ConnectionPool parent; 00792 SocketConduit conduit; 00793 00794 /*************************************************************** 00795 00796 Construct a new connection and set its parent 00797 00798 ***************************************************************/ 00799 00800 this (ConnectionPool pool) 00801 { 00802 parent = pool; 00803 reset (); 00804 } 00805 00806 /*************************************************************** 00807 00808 Return the socket belonging to this connection 00809 00810 ***************************************************************/ 00811 00812 SocketConduit getConduit () 00813 { 00814 return conduit; 00815 } 00816 00817 /*************************************************************** 00818 00819 Create a new socket and connect it to the specified 00820 server. This will cause a dedicated thread to start 00821 on the server. Said thread will quit when an error 00822 occurs. 00823 00824 ***************************************************************/ 00825 00826 bool reset () 00827 { 00828 try { 00829 conduit = new SocketConduit; 00830 conduit.connect (parent.address); 00831 00832 // set a 500ms timeout for read operations 00833 conduit.setTimeout (System.Interval.Millisec * 500); 00834 return true; 00835 00836 } catch (Object) 00837 { 00838 return false; 00839 } 00840 } 00841 00842 /*************************************************************** 00843 00844 Close the socket. This will cause any host session 00845 to be terminated. 00846 00847 ***************************************************************/ 00848 00849 void close () 00850 { 00851 conduit.close (); 00852 } 00853 00854 /*************************************************************** 00855 00856 Return this connection to the free-list. Note that 00857 we have to synchronize on the parent-pool itself. 00858 00859 ***************************************************************/ 00860 00861 void done (ulong time) 00862 { 00863 synchronized (parent) 00864 { 00865 next = parent.freelist; 00866 parent.freelist = this; 00867 this.time = time; 00868 } 00869 } 00870 } 00871 00872 00873 /*********************************************************************** 00874 00875 Create a connection-pool for the specified address. 00876 00877 ***********************************************************************/ 00878 00879 this (InternetAddress address) 00880 { 00881 this.address = address; 00882 } 00883 00884 /*********************************************************************** 00885 00886 Allocate a Connection from a list rather than creating a 00887 new one. Reap old entries as we go. 00888 00889 ***********************************************************************/ 00890 00891 synchronized Connection borrow (ulong time) 00892 { 00893 if (freelist) 00894 do { 00895 PoolConnection c = freelist; 00896 00897 freelist = c.next; 00898 if (freelist && (time - c.time > timeout)) 00899 c.close (); 00900 else 00901 return c; 00902 } while (true); 00903 00904 return new PoolConnection (this); 00905 } 00906 00907 /*********************************************************************** 00908 00909 Close this pool and drop all existing connections. 00910 00911 ***********************************************************************/ 00912 00913 synchronized void close () 00914 { 00915 PoolConnection c = freelist; 00916 freelist = null; 00917 while (c) 00918 { 00919 c.close (); 00920 c = c.next; 00921 } 00922 } 00923 } 00924 00925 00926 /******************************************************************************* 00927 00928 Class to represent a cluster node. Each node supports both cache 00929 and queue functionality. Note that the set of available nodes is 00930 configured at startup, simplifying the discovery process in some 00931 significant ways, and causing less thrashing of cache-keys. 00932 00933 *******************************************************************************/ 00934 00935 private class Node 00936 { 00937 private char[] name, 00938 port; 00939 private ILogger logger; 00940 private bool enabled; 00941 00942 private ConnectionPool tasks, 00943 cache; 00944 00945 /*********************************************************************** 00946 00947 Construct a node with the provided name. This name should 00948 be the network name of the hosting device. 00949 00950 ***********************************************************************/ 00951 00952 this (ILogger logger, char[] name) 00953 { 00954 this.logger = logger; 00955 this.name = name; 00956 } 00957 00958 /*********************************************************************** 00959 00960 Add a cache/queue reference for the remote node 00961 00962 ***********************************************************************/ 00963 00964 void setCache (InternetAddress address) 00965 { 00966 this.cache = new ConnectionPool (address); 00967 port = Text.toString (address.port); 00968 } 00969 00970 /*********************************************************************** 00971 00972 Add a cache-loader reference for the remote node 00973 00974 ***********************************************************************/ 00975 00976 void setTasks (InternetAddress address) 00977 { 00978 this.tasks = new ConnectionPool (address); 00979 } 00980 00981 /*********************************************************************** 00982 00983 Return the name of this node (the network name of the device) 00984 00985 ***********************************************************************/ 00986 00987 override char[] toString () 00988 { 00989 return name; 00990 } 00991 00992 /*********************************************************************** 00993 00994 Remove this Node from the cluster. The node is disabled 00995 until it is seen to recover. 00996 00997 ***********************************************************************/ 00998 00999 void fail () 01000 { 01001 setEnabled (false); 01002 cache.close (); 01003 tasks.close (); 01004 } 01005 01006 /*********************************************************************** 01007 01008 Get the current state of this node 01009 01010 ***********************************************************************/ 01011 01012 bool isEnabled () 01013 { 01014 volatile 01015 return enabled; 01016 } 01017 01018 /*********************************************************************** 01019 01020 Set the enabled state of this node 01021 01022 ***********************************************************************/ 01023 01024 void setEnabled (bool enabled) 01025 { 01026 if (logger && logger.isEnabled (logger.Level.Trace)) 01027 { 01028 if (enabled) 01029 logger.trace ("enabling node '"~name~"' on port "~port); 01030 else 01031 logger.trace ("disabling node '"~name~"'"); 01032 } 01033 volatile 01034 this.enabled = enabled; 01035 } 01036 01037 /*********************************************************************** 01038 01039 request data; fail this Node if we can't connect. Note 01040 that we make several attempts to connect before writing 01041 the node off as a failure. 01042 01043 ***********************************************************************/ 01044 01045 bool request (ConnectionPool pool, ProtocolWriter writer, ProtocolReader reader, out IPayload payload) 01046 { 01047 ProtocolWriter.Command cmd; 01048 ulong time; 01049 char[] channel; 01050 char[] element; 01051 Connection connect; 01052 01053 // it's possible that the pool may have failed between 01054 // the point of selecting it, and the invocation itself 01055 if (pool is null) 01056 return false; 01057 01058 // get a connection to the server 01059 connect = pool.borrow (time = System.getMillisecs); 01060 01061 // talk to the server (try a few times if necessary) 01062 for (int attempts=3; attempts--;) 01063 try { 01064 // attach connection to reader and writer 01065 writer.getBuffer.setConduit (connect.getConduit); 01066 reader.getBuffer.setConduit (connect.getConduit); 01067 01068 // send request 01069 writer.flush (); 01070 01071 // load the returned object (don't retry!) 01072 attempts = 0; 01073 payload = reader.getPayload (channel, element, cmd); 01074 01075 // return borrowed connection 01076 connect.done (time); 01077 01078 } catch (PickleException x) 01079 { 01080 connect.done (time); 01081 throw x; 01082 } 01083 catch (IOException x) 01084 // attempt to reconnect? 01085 if (attempts == 0 || !connect.reset) 01086 { 01087 // that server is offline 01088 fail (); 01089 01090 // state that we failed 01091 return false; 01092 } 01093 01094 // is payload an exception? 01095 if (cmd != ProtocolWriter.Command.OK) 01096 { 01097 // is node full? 01098 if (cmd == ProtocolWriter.Command.Full) 01099 throw new ClusterFullException (channel); 01100 01101 // did node barf? 01102 if (cmd == ProtocolWriter.Command.Exception) 01103 throw new ClusterException (channel); 01104 01105 // bogus response 01106 throw new ClusterException ("invalid response from cluster server"); 01107 } 01108 01109 return true; 01110 } 01111 } 01112 01113 01114 /******************************************************************************* 01115 01116 Models a set of remote cluster nodes. 01117 01118 *******************************************************************************/ 01119 01120 private class NodeSet 01121 { 01122 private HashMap map; 01123 private Node[] nodes; 01124 private Node[] random; 01125 private ILogger logger; 01126 01127 /*********************************************************************** 01128 01129 ***********************************************************************/ 01130 01131 this (ILogger logger) 01132 { 01133 this.logger = logger; 01134 map = new HashMap (128, 0.75, 4); 01135 } 01136 01137 /*********************************************************************** 01138 01139 ***********************************************************************/ 01140 01141 ILogger getLogger () 01142 { 01143 return logger; 01144 } 01145 01146 /*********************************************************************** 01147 01148 Add a node to the list of servers, and sort them such that 01149 all clients will have the same ordered set 01150 01151 ***********************************************************************/ 01152 01153 synchronized void addNode (Node node) 01154 { 01155 char[] name = node.toString; 01156 01157 if (map.get (name)) 01158 throw new ClusterException ("Attempt to add cluster node '"~name~"' more than once"); 01159 01160 map.put (name, node); 01161 nodes ~= node; 01162 //nodes.sort; 01163 } 01164 01165 /*********************************************************************** 01166 01167 ***********************************************************************/ 01168 01169 void optimize () 01170 { 01171 // nodes are already in the same order across the cluster 01172 // since the configuration file should be identical ... 01173 01174 // nodes.sort; 01175 01176 // copy the node list 01177 random = nodes.dup; 01178 01179 // muddle up the duplicate list. This randomized list 01180 // is used when scanning the cluster for queued entries 01181 foreach (int i, Node n; random) 01182 { 01183 int j = Random.get (random.length); 01184 Node tmp = random[i]; 01185 random[i] = random[j]; 01186 random[j] = tmp; 01187 } 01188 } 01189 01190 /*********************************************************************** 01191 01192 ***********************************************************************/ 01193 01194 synchronized void enable (char[] name, ushort port1, ushort port2) 01195 { 01196 Node node = cast(Node) map.get (name); 01197 if (node is null) 01198 throw new ClusterException ("Attempt to enable unknown cluster node '"~name~"'"); 01199 else 01200 if (! node.isEnabled) 01201 { 01202 node.setCache (new InternetAddress (name, port1)); 01203 node.setTasks (new InternetAddress (name, port2)); 01204 node.setEnabled (true); 01205 } 01206 } 01207 01208 /*********************************************************************** 01209 01210 ***********************************************************************/ 01211 01212 IPayload request (ProtocolWriter writer, ProtocolReader reader, char[] key = null) 01213 { 01214 Node node; 01215 IPayload payload; 01216 01217 do { 01218 node = selectNode (key); 01219 } while (! node.request (node.cache, writer, reader, payload)); 01220 return payload; 01221 } 01222 01223 /*********************************************************************** 01224 01225 Select a cluster server based on a starting index. If the 01226 selected server is not currently enabled, we just try the 01227 next one. This behaviour should be consistent across each 01228 cluster client. 01229 01230 ***********************************************************************/ 01231 01232 private final Node selectNode (uint index) 01233 { 01234 uint i = nodes.length; 01235 01236 if (i) 01237 { 01238 index %= i; 01239 01240 while (i--) 01241 { 01242 Node node = nodes[index]; 01243 if (node.isEnabled) 01244 return node; 01245 01246 if (++index >= nodes.length) 01247 index = 0; 01248 } 01249 } 01250 throw new ClusterEmptyException ("No cluster servers are available"); 01251 } 01252 01253 /*********************************************************************** 01254 01255 Select a cluster server based on the specified key. If the 01256 selected server is not currently enabled, we just try the 01257 next one. This behaviour should be consistent across each 01258 cluster client. 01259 01260 ***********************************************************************/ 01261 01262 final Node selectNode (char[] key = null) 01263 { 01264 static uint index; 01265 01266 if (key.length) 01267 return selectNode (HashMap.jhash (key)); 01268 01269 // no key provided, so just roll the counter 01270 return selectNode (++index); 01271 } 01272 01273 /*********************************************************************** 01274 01275 Sweep the cluster servers. Returns true if the delegate 01276 returns true, false otherwise. The sweep is aborted when 01277 the delegate returns true. Note that this scans nodes in 01278 a randomized pattern, which should tend to avoid 'bursty' 01279 activity by a set of clients upon any one cluster server. 01280 01281 ***********************************************************************/ 01282 01283 bool scan (bool delegate (Node) dg) 01284 { 01285 Node node; 01286 int index = nodes.length; 01287 01288 while (index--) 01289 { 01290 // lookup the randomized set of server nodes 01291 node = random [index]; 01292 01293 // callback on each enabled node 01294 if (node.isEnabled) 01295 if (dg (node)) 01296 return true; 01297 } 01298 return false; 01299 } 01300 } 01301 01302 01303