00001 /******************************************************************************* 00002 00003 @file Cluster.d 00004 00005 Copyright (C) 2004 Kris Bell 00006 00007 This software is provided 'as-is', without any express or implied 00008 warranty. In no event will the authors be held liable for damages 00009 of any kind arising from the use of this software. 00010 00011 Permission is hereby granted to anyone to use this software for any 00012 purpose, including commercial applications, and to alter it and/or 00013 redistribute it freely, subject to the following restrictions: 00014 00015 1. The origin of this software must not be misrepresented; you must 00016 not claim that you wrote the original software. If you use this 00017 software in a product, an acknowledgment within documentation of 00018 said product would be appreciated but is not required. 00019 00020 2. Altered source versions must be plainly marked as such, and must 00021 not be misrepresented as being the original software. 00022 00023 3. This notice may not be removed or altered from any distribution 00024 of the source. 00025 00026 00027 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 00028 00029 00030 @version Initial version, July 2004 00031 @author Kris 00032 00033 00034 *******************************************************************************/ 00035 00036 module mango.cluster.qos.socket.Cluster; 00037 00038 private import mango.cache.HashMap; 00039 00040 private import mango.utils.Text, 00041 mango.utils.Random; 00042 00043 private import mango.base.System; 00044 00045 private import mango.io.Buffer, 00046 mango.io.Socket, 00047 mango.io.Exception, 00048 mango.io.Properties, 00049 mango.io.ArrayAllocator, 00050 mango.io.SocketConduit, 00051 mango.io.SocketListener, 00052 mango.io.MulticastSocket; 00053 00054 private import mango.io.model.IConduit; 00055 00056 private import mango.log.model.ILogger; 00057 00058 private import mango.cluster.Client; 00059 00060 public import mango.cluster.model.ICluster; 00061 00062 private import mango.cluster.qos.socket.RollCall, 00063 mango.cluster.qos.socket.ClusterEvent, 00064 mango.cluster.qos.socket.ProtocolReader, 00065 mango.cluster.qos.socket.ProtocolWriter; 00066 00067 00068 /******************************************************************************* 00069 00070 QOS implementation for sockets. All cluster-client activity is 00071 gated through here by the higher level classes; NetworkQueue & 00072 NetworkCache for example. You gain access to the cluster by 00073 creating an instance of the QOS (quality of service) you desire 00074 and mapping client classes onto it. For example: 00075 00076 @code 00077 import mango.cluster.NetworkCache; 00078 import mango.cluster.qos.socket.Cluster; 00079 00080 ICluster cluster = new Cluster (...); 00081 NetworkCache cache = new NetworkCache (cluster, ...); 00082 00083 cache.put (...); 00084 cache.get (...); 00085 cache.invalidate (...); 00086 @endcode 00087 00088 Please see the cluster clients for additional details. Currently 00089 these include CacheInvalidator, CacheInvalidatee, NetworkMessage, 00090 NetworkTask, NetworkQueue, NetworkCache, NetworkCombo, plus the 00091 Client base-class. 00092 00093 *******************************************************************************/ 00094 00095 class Cluster : ICluster, IEventListener 00096 { 00097 private static HashMap groups; 00098 private ILogger logger; 00099 private NodeSet nodeSet; 00100 private Buffer mBuffer; 00101 private ProtocolWriter mWriter; 00102 private MulticastSocket mSocket; 00103 00104 private ushort groupTTL = 1; 00105 private ushort groupPort = 3333; 00106 private ubyte groupPrefix = 225; 00107 00108 /*********************************************************************** 00109 00110 Setup a hashmap for multicast group addresses 00111 00112 ***********************************************************************/ 00113 00114 static this () 00115 { 00116 groups = new HashMap (128, 0.75, 2); 00117 } 00118 00119 /*********************************************************************** 00120 00121 Setup a Cluster instance. Currently the buffer & writer 00122 are shared for all bulletin serialization; this should 00123 probably change at some point such that we can support 00124 multiple threads broadcasting concurrently to different 00125 output ports. 00126 00127 ***********************************************************************/ 00128 00129 this (ILogger logger = null) 00130 { 00131 this.logger = logger; 00132 nodeSet = new NodeSet (logger); 00133 mBuffer = new Buffer (1024 * 4); 00134 mSocket = new MulticastSocket; 00135 mWriter = new ProtocolWriter (mBuffer); 00136 } 00137 00138 /*********************************************************************** 00139 00140 Setup a Cluster instance. Currently the buffer & writer 00141 are shared for all bulletin serialization; this should 00142 probably change at some point such that we can support 00143 multiple threads broadcasting concurrently to different 00144 output ports. 00145 00146 ***********************************************************************/ 00147 00148 this (ILogger logger, IConduit conduit) 00149 in { 00150 assert (logger); 00151 assert (conduit); 00152 } 00153 body 00154 { 00155 this (logger); 00156 00157 // callback for loading cluster configuration 00158 void loader (char[] name, char[] value) 00159 { 00160 logger.info ("cluster config: "~name~" = "~value); 00161 if (name == "node") 00162 nodeSet.addNode (new Node (logger, value)); 00163 else 00164 if (name == "multicast_port") 00165 groupPort = Text.atoi (value); 00166 else 00167 if (name == "multicast_prefix") 00168 groupPrefix = Text.atoi (value); 00169 else 00170 if (name == "multicast_ttl") 00171 groupTTL = Text.atoi (value); 00172 else 00173 throw new ClusterException ("Unrecognized attribute '"~name~"' in socket.Cluster configuration"); 00174 } 00175 00176 00177 // load up the cluster configuration 00178 Properties.load (conduit, &loader); 00179 00180 // finalize nodeSet 00181 nodeSet.optimize (); 00182 00183 // listen for cluster servers 00184 IChannel channel = createChannel ("cluster.server.advertise"); 00185 createConsumer (channel, IEvent.Style.Bulletin, this); 00186 00187 // ask who's currently running 00188 logger.trace ("discovering active cluster servers ..."); 00189 broadcast (channel, new RollCall (Socket.hostName(), 0, 0, true)); 00190 00191 // wait for enabled servers to respond ... 00192 System.sleep (System.Interval.Millisec * 250); 00193 } 00194 00195 /*********************************************************************** 00196 00197 Setup a Cluster instance. Currently the buffer & writer 00198 are shared for all bulletin serialization; this should 00199 probably change at some point such that we can support 00200 multiple threads broadcasting concurrently to different 00201 output ports. 00202 00203 ***********************************************************************/ 00204 00205 this (ILogger logger, uint serverPort) 00206 in { 00207 assert (logger); 00208 assert (serverPort > 1024); 00209 } 00210 body 00211 { 00212 this (logger); 00213 00214 Node node = new Node (logger, "local"); 00215 node.setCache (new InternetAddress ("lucy", serverPort)); 00216 node.setTasks (new InternetAddress ("lucy", serverPort+1)); 00217 node.setEnabled (true); 00218 nodeSet.addNode (node); 00219 nodeSet.optimize (); 00220 } 00221 00222 /*********************************************************************** 00223 00224 IEventListener interface method for listening to RollCall 00225 responses. These are sent out by cluster servers both when 00226 they get a RollCall request, and when they begin execution. 00227 00228 ***********************************************************************/ 00229 00230 void notify (IEvent event, IPayload payload) 00231 { 00232 RollCall rollcall = cast(RollCall) payload; 00233 00234 // ignore requests from clients (we're on a common channel) 00235 if (! rollcall.request) 00236 nodeSet.enable (rollcall.name, rollcall.port1, rollcall.port2); 00237 } 00238 00239 /*********************************************************************** 00240 00241 Create a channel instance. Our channel implementation 00242 includes a number of cached IO helpers (ProtolcolWriter 00243 and so on) which simplifies and speeds up execution. 00244 00245 ***********************************************************************/ 00246 00247 IChannel createChannel (char[] channel) 00248 { 00249 return new Channel (channel); 00250 } 00251 00252 /*********************************************************************** 00253 00254 Return the logger instance provided during construction. 00255 00256 ***********************************************************************/ 00257 00258 ILogger getLogger () 00259 { 00260 return logger; 00261 } 00262 00263 /*********************************************************************** 00264 00265 Broadcast a payload on the specified channel. This uses 00266 IP/Multicast to scatter the payload to all registered 00267 listeners (on the same multicast group). Note that the 00268 maximum payload size is limited to that of an Ethernet 00269 data frame, minus the IP/UDP header size (1472 bytes). 00270 00271 ***********************************************************************/ 00272 00273 synchronized void broadcast (IChannel channel, IPayload payload = null) 00274 { 00275 // serialize content 00276 mBuffer.clear (); 00277 mWriter.put (ProtocolWriter.Command.OK, channel.getName, payload); 00278 00279 // Ethernet data-frame size minus the 28 byte UDP/IP header: 00280 if (mBuffer.getPosition > 1472) 00281 throw new ClusterException ("payload is too large to broadcast"); 00282 00283 // send it to the appropriate multicast group 00284 mSocket.write (mBuffer, getGroup (channel.getName)); 00285 } 00286 00287 /*********************************************************************** 00288 00289 Create a listener of the specified type. Listeners are 00290 run within their own thread, since they spend the vast 00291 majority of their time blocked on a Socket read. Would 00292 be good to support multiplexed reading instead, such 00293 that a thread pool could be applied instead. 00294 00295 ***********************************************************************/ 00296 00297 IConsumer createConsumer (IChannel channel, IEvent.Style style, 00298 IEventListener notify) 00299 { 00300 IEvent event = new ClusterEvent (this, channel, style, notify); 00301 00302 if (logger) 00303 logger.info ("creating " ~ event.getStyleName ~ 00304 " consumer for channel '" ~ channel.getName ~ "'"); 00305 00306 switch (style) 00307 { 00308 case IEvent.Style.Message: 00309 return new MessageConsumer (this, event); 00310 00311 case IEvent.Style.Bulletin: 00312 return new BulletinConsumer (this, event); 00313 } 00314 } 00315 00316 /*********************************************************************** 00317 00318 Return a entry from the network cache, and optionally 00319 remove it. This is a synchronous operation as opposed 00320 to the asynchronous nature of an invalidate broadcast. 00321 00322 ***********************************************************************/ 00323 00324 IPayload getCache (IChannel channel, char[] key, bool remove) 00325 { 00326 Channel c = cast(Channel) channel; 00327 00328 c.writer.put (remove ? ProtocolWriter.Command.Remove : 00329 ProtocolWriter.Command.Copy, c.getName, null, key); 00330 return nodeSet.request (c.writer, c.reader, key); 00331 } 00332 00333 /*********************************************************************** 00334 00335 Place an entry into the network cache, replacing the 00336 entry with the identical key. Note that this may cause 00337 the oldest entry in the cache to be displaced if the 00338 cache is already full. 00339 00340 ***********************************************************************/ 00341 00342 IPayload putCache (IChannel channel, char[] key, IPayload payload) 00343 { 00344 Channel c = cast(Channel) channel; 00345 00346 c.writer.put (ProtocolWriter.Command.Add, c.getName, payload, key); 00347 return nodeSet.request (c.writer, c.reader, key); 00348 } 00349 00350 /*********************************************************************** 00351 00352 Add an entry to the specified network queue. May throw a 00353 QueueFullException if there's no room available. 00354 00355 ***********************************************************************/ 00356 00357 IPayload putQueue (IChannel channel, IPayload payload) 00358 { 00359 Channel c = cast(Channel) channel; 00360 00361 c.writer.put (ProtocolWriter.Command.AddQueue, c.getName, payload); 00362 nodeSet.request (c.writer, c.reader); 00363 return payload; 00364 } 00365 00366 00367 /*********************************************************************** 00368 00369 Query the cluster for queued entries on the corresponding 00370 channel. Returns, and removes, the first matching entry 00371 from the cluster. Note that this sweeps the cluster for 00372 matching entries, and is synchronous in nature. The more 00373 common approach is to setup a queue listener, which will 00374 grab and dispatch queue entries asynchronously. 00375 00376 ***********************************************************************/ 00377 00378 IPayload getQueue (IChannel channel) 00379 { 00380 IPayload payload; 00381 00382 Channel c = cast(Channel) channel; 00383 00384 // callback for NodeSet.scan() 00385 bool scan (Node node) 00386 { 00387 // ask this node ... 00388 c.writer.put (ProtocolWriter.Command.RemoveQueue, c.getName); 00389 node.request (node.cache, c.writer, c.reader, payload); 00390 return payload !== null; 00391 } 00392 00393 // make a pass over each Node, looking for channel entries 00394 nodeSet.scan (&scan); 00395 return payload; 00396 } 00397 00398 /*********************************************************************** 00399 00400 Load a network cache entry remotely. This sends the given 00401 Payload over the network to the cache host, where it will 00402 be executed locally. The benefit of doing so it that the 00403 host may deny access to the cache entry for the duration 00404 of the load operation. This, in turn, provides an elegant 00405 mechanism for gating/synchronizing multiple network clients 00406 over a given cache entry; handy for those entries that are 00407 relatively expensive to construct or access. 00408 00409 ***********************************************************************/ 00410 00411 void loadCache (IChannel channel, char[] key, IPayload payload) 00412 { 00413 Channel c = cast(Channel) channel; 00414 00415 c.writer.put (ProtocolWriter.Command.OK, c.getName, payload, key); 00416 Node node = nodeSet.selectNode (key); 00417 node.request (node.tasks, c.writer, c.reader, payload); 00418 } 00419 00420 /*********************************************************************** 00421 00422 Return an internet address representing the multicast 00423 group for the specified channel. We use three of the 00424 four address segments to represent the channel itself 00425 (via a hash on the channel name), and set the primary 00426 segment to be that of the broadcast prefix (above). 00427 00428 ***********************************************************************/ 00429 00430 synchronized InternetAddress getGroup (char[] channel) 00431 { 00432 InternetAddress group = cast(InternetAddress) groups.get (channel); 00433 00434 if (group is null) 00435 { 00436 char[] address; 00437 00438 // construct a group address from the prefix & channel-hash, 00439 // where the hash is folded down to 24 bits 00440 uint hash = groups.jhash (channel); 00441 hash = (hash >> 24) ^ (hash & 0x00ffffff); 00442 00443 address = Text.toString (groupPrefix) ~ "." ~ 00444 Text.toString ((hash >> 16) & 0xff) ~ "." ~ 00445 Text.toString ((hash >> 8) & 0xff) ~ "." ~ 00446 Text.toString (hash & 0xff); 00447 00448 //printf ("channel '%.*s' == '%.*s'\n", channel, address); 00449 00450 // insert InternetAddress into hashmap 00451 group = new InternetAddress (address, groupPort); 00452 groups.put (channel, group); 00453 } 00454 return group; 00455 } 00456 } 00457 00458 00459 /******************************************************************************* 00460 00461 A listener for multicast channel traffic. These are currently used 00462 for cache coherency, queue publishing, and node discovery activity; 00463 though could be used for direct messaging also. 00464 00465 *******************************************************************************/ 00466 00467 private class BulletinConsumer : SocketListener, IConsumer 00468 { 00469 private IEvent event; 00470 private Buffer buffer; 00471 private ProtocolReader reader; 00472 private Cluster cluster; 00473 private MulticastSocket consumer; 00474 00475 /*********************************************************************** 00476 00477 Construct a multicast consumer for the specified event. The 00478 event handler will be invoked whenever a message arrives for 00479 the associated channel. 00480 00481 ***********************************************************************/ 00482 00483 this (Cluster cluster, IEvent event) 00484 { 00485 this.event = event; 00486 this.cluster = cluster; 00487 00488 // buffer doesn't need to be larger than Ethernet data-frame 00489 buffer = new Buffer (1500); 00490 00491 // make the reader slice directly from the buffer content 00492 reader = new ProtocolReader (buffer); 00493 reader.setAllocator (new BufferAllocator (reader)); 00494 00495 // configure a listener socket 00496 consumer = new MulticastSocket; 00497 consumer.join (cluster.getGroup (event.getChannel.getName)); 00498 00499 super (consumer, buffer); 00500 00501 // fire up this listener 00502 start (); 00503 } 00504 00505 /*********************************************************************** 00506 00507 Notification callback invoked when we receive a multicast 00508 packet. Note that we check the packet channel-name against 00509 the one we're consuming, to check for cases where the group 00510 address had a hash collision. 00511 00512 ***********************************************************************/ 00513 00514 override void notify (IBuffer buffer) 00515 { 00516 ProtocolWriter.Command cmd; 00517 char[] channel; 00518 char[] element; 00519 00520 IPayload payload = reader.getPayload (channel, element, cmd); 00521 // printf ("notify '%.*s::%.*s' #'%.*s'\n", channel, element, event.getChannel.getName); 00522 00523 // check it's really for us first (might be a hash collision) 00524 if (channel == event.getChannel.getName) 00525 invoke (event, payload); 00526 } 00527 00528 /*********************************************************************** 00529 00530 Handle error conditions from the listener thread. 00531 00532 ***********************************************************************/ 00533 00534 override void exception (char [] msg) 00535 { 00536 cluster.getLogger.error ("BulletinConsumer: "~msg); 00537 } 00538 00539 /*********************************************************************** 00540 00541 Overridable mean of notifying the client code. 00542 00543 ***********************************************************************/ 00544 00545 protected void invoke (IEvent event, IPayload payload) 00546 { 00547 event.invoke (payload); 00548 } 00549 00550 /*********************************************************************** 00551 00552 Return the cluster instance we're associated with. 00553 00554 ***********************************************************************/ 00555 00556 Cluster getCluster () 00557 { 00558 return cluster; 00559 } 00560 00561 /*********************************************************************** 00562 00563 Temporarily halt listening. This can be used to ignore 00564 multicast messages while, for example, the consumer is 00565 busy doing other things. 00566 00567 ***********************************************************************/ 00568 00569 void pauseGroup () 00570 { 00571 consumer.pauseGroup (); 00572 } 00573 00574 /*********************************************************************** 00575 00576 Resume listening, post-pause. 00577 00578 ***********************************************************************/ 00579 00580 void resumeGroup () 00581 { 00582 consumer.resumeGroup (); 00583 } 00584 00585 /*********************************************************************** 00586 00587 Cancel this consumer. The listener is effectively disabled 00588 from this point forward. The listener thread does not halt 00589 at this point, but waits until the socket-read returns. 00590 Note that the D Interface implementation requires us to 00591 "reimplement and dispatch" trivial things like this ~ it's 00592 a pain in the neck to maintain. 00593 00594 ***********************************************************************/ 00595 00596 void cancel () 00597 { 00598 super.cancel (); 00599 } 00600 } 00601 00602 00603 /******************************************************************************* 00604 00605 A listener for queue events. These events are produced by the 00606 queue host on a periodic bases when it has available entries. 00607 We listen for them (rather than constantly scanning) and then 00608 begin a sweep to process as many as we can. Note that we will 00609 be in competition with other nodes to process these entries. 00610 00611 *******************************************************************************/ 00612 00613 private class MessageConsumer : BulletinConsumer 00614 { 00615 /*********************************************************************** 00616 00617 Construct a multicast consumer for the specified event 00618 00619 ***********************************************************************/ 00620 00621 this (Cluster cluster, IEvent event) 00622 { 00623 super (cluster, event); 00624 } 00625 00626 /*********************************************************************** 00627 00628 Handle error conditions from the listener thread. 00629 00630 ***********************************************************************/ 00631 00632 override void exception (char [] msg) 00633 { 00634 cluster.getLogger.error ("MessageConsumer: "~msg); 00635 } 00636 00637 /*********************************************************************** 00638 00639 override the default processing to sweep the cluster for 00640 queued entries. Each server node is queried until one is 00641 found that contains a payload. Note that it is possible 00642 to set things up where we are told exactly which node to 00643 go to; howerver given that we won't be listening whilst 00644 scanning, and that there's likely to be a group of new 00645 entries in the cluster, it's just as effective to scan. 00646 This will be far from ideal for all environments, so we 00647 should make the strategy plugable instead. 00648 00649 ***********************************************************************/ 00650 00651 override protected void invoke (IEvent event, IPayload payload) 00652 { 00653 //temporarilty pause listening 00654 pauseGroup (); 00655 00656 try { 00657 while ((payload = getCluster.getQueue (event.getChannel)) !== null) 00658 event.invoke (payload); 00659 } finally 00660 // resume listening 00661 resumeGroup (); 00662 } 00663 } 00664 00665 00666 /******************************************************************************* 00667 00668 A channel represents something akin to a publish/subscribe topic, 00669 or a radio station. These are used to segregate cluster operations 00670 into a set of groups, where each group is represented by a channel. 00671 Channel names are whatever you want then to be: use of dot notation 00672 has proved useful in the past. See Client.createChannel 00673 00674 *******************************************************************************/ 00675 00676 private class Channel : IChannel 00677 { 00678 char[] name; 00679 Buffer buffer; 00680 ProtocolReader reader; 00681 ProtocolWriter writer; 00682 00683 /*********************************************************************** 00684 00685 Construct a channel with the specified name. We cache 00686 a number of session-related constructs here also, in 00687 order to eliminate runtime overhead 00688 00689 ***********************************************************************/ 00690 00691 this (char[] name) 00692 in { 00693 assert (name.length); 00694 } 00695 body 00696 { 00697 this.name = name; 00698 00699 // this buffer will grow as required to house larger payloads 00700 buffer = new GrowableBuffer (1024 * 2); 00701 writer = new ProtocolWriter (buffer); 00702 00703 // make the reader slice directly from the buffer content 00704 reader = new ProtocolReader (buffer); 00705 reader.setAllocator (new BufferAllocator (reader)); 00706 } 00707 00708 /*********************************************************************** 00709 00710 Return the name of this channel. This is the name provided 00711 when the channel was constructed. 00712 00713 ***********************************************************************/ 00714 00715 char[] getName () 00716 { 00717 return name; 00718 } 00719 00720 /*********************************************************************** 00721 00722 Output this channel via the provided IWriter 00723 00724 ***********************************************************************/ 00725 00726 void write (IWriter writer) 00727 { 00728 writer.put (name.length).put(name); 00729 } 00730 00731 /*********************************************************************** 00732 00733 Input this channel via the provided IReader 00734 00735 ***********************************************************************/ 00736 00737 void read (IReader reader) 00738 { 00739 name.length = reader.length (); 00740 reader.get (name); 00741 } 00742 } 00743 00744 00745 /******************************************************************************* 00746 00747 An abstraction of a socket connection. Used internally by the 00748 socket-based Cluster. 00749 00750 *******************************************************************************/ 00751 00752 private class Connection 00753 { 00754 abstract bool reset(); 00755 00756 abstract void done (ulong time); 00757 00758 abstract SocketConduit getConduit (); 00759 } 00760 00761 00762 /******************************************************************************* 00763 00764 A pool of socket connections for accessing cluster nodes. Note 00765 that the entries will timeout after a period of inactivity, and 00766 will subsequently cause a connected host to drop the supporting 00767 session. 00768 00769 *******************************************************************************/ 00770 00771 private class ConnectionPool 00772 { 00773 private int count; 00774 private InternetAddress address; 00775 private PoolConnection freelist; 00776 private const ulong timeout = 60_000; 00777 00778 /*********************************************************************** 00779 00780 Utility class to provide the basic connection facilities 00781 provided by the connection pool. 00782 00783 ***********************************************************************/ 00784 00785 class PoolConnection : Connection 00786 { 00787 ulong time; 00788 PoolConnection next; 00789 ConnectionPool parent; 00790 SocketConduit conduit; 00791 00792 /*************************************************************** 00793 00794 Construct a new connection and set its parent 00795 00796 ***************************************************************/ 00797 00798 this (ConnectionPool pool) 00799 { 00800 parent = pool; 00801 reset (); 00802 } 00803 00804 /*************************************************************** 00805 00806 Return the socket belonging to this connection 00807 00808 ***************************************************************/ 00809 00810 SocketConduit getConduit () 00811 { 00812 return conduit; 00813 } 00814 00815 /*************************************************************** 00816 00817 Create a new socket and connect it to the specified 00818 server. This will cause a dedicated thread to start 00819 on the server. Said thread will quit when an error 00820 occurs. 00821 00822 ***************************************************************/ 00823 00824 bool reset () 00825 { 00826 try { 00827 conduit = new SocketConduit; 00828 conduit.connect (parent.address); 00829 00830 // set a 500ms timeout for read operations 00831 conduit.setTimeout (System.Interval.Millisec * 500); 00832 return true; 00833 00834 } catch (Object) 00835 { 00836 return false; 00837 } 00838 } 00839 00840 /*************************************************************** 00841 00842 Close the socket. This will cause any host session 00843 to be terminated. 00844 00845 ***************************************************************/ 00846 00847 void close () 00848 { 00849 conduit.close (); 00850 } 00851 00852 /*************************************************************** 00853 00854 Return this connection to the free-list. Note that 00855 we have to synchronize on the parent-pool itself. 00856 00857 ***************************************************************/ 00858 00859 void done (ulong time) 00860 { 00861 synchronized (parent) 00862 { 00863 next = parent.freelist; 00864 parent.freelist = this; 00865 this.time = time; 00866 } 00867 } 00868 } 00869 00870 00871 /*********************************************************************** 00872 00873 Create a connection-pool for the specified address. 00874 00875 ***********************************************************************/ 00876 00877 this (InternetAddress address) 00878 { 00879 this.address = address; 00880 } 00881 00882 /*********************************************************************** 00883 00884 Allocate a Connection from a list rather than creating a 00885 new one. Reap old entries as we go. 00886 00887 ***********************************************************************/ 00888 00889 synchronized Connection borrow (ulong time) 00890 { 00891 if (freelist) 00892 do { 00893 PoolConnection c = freelist; 00894 00895 freelist = c.next; 00896 if (freelist && (time - c.time > timeout)) 00897 c.close (); 00898 else 00899 return c; 00900 } while (true); 00901 00902 return new PoolConnection (this); 00903 } 00904 00905 /*********************************************************************** 00906 00907 Close this pool and drop all existing connections. 00908 00909 ***********************************************************************/ 00910 00911 synchronized void close () 00912 { 00913 PoolConnection c = freelist; 00914 freelist = null; 00915 while (c) 00916 { 00917 c.close (); 00918 c = c.next; 00919 } 00920 } 00921 } 00922 00923 00924 /******************************************************************************* 00925 00926 Class to represent a cluster node. Each node supports both cache 00927 and queue functionality. Note that the set of available nodes is 00928 configured at startup, simplifying the discovery process in some 00929 significant ways, and causing less thrashing of cache-keys. 00930 00931 *******************************************************************************/ 00932 00933 private class Node 00934 { 00935 private char[] name, 00936 port; 00937 private ILogger logger; 00938 private bool enabled; 00939 00940 private ConnectionPool tasks, 00941 cache; 00942 00943 /*********************************************************************** 00944 00945 Construct a node with the provided name. This name should 00946 be the network name of the hosting device. 00947 00948 ***********************************************************************/ 00949 00950 this (ILogger logger, char[] name) 00951 { 00952 this.logger = logger; 00953 this.name = name; 00954 } 00955 00956 /*********************************************************************** 00957 00958 Add a cache/queue reference for the remote node 00959 00960 ***********************************************************************/ 00961 00962 void setCache (InternetAddress address) 00963 { 00964 this.cache = new ConnectionPool (address); 00965 port = Text.toString (address.port); 00966 } 00967 00968 /*********************************************************************** 00969 00970 Add a cache-loader reference for the remote node 00971 00972 ***********************************************************************/ 00973 00974 void setTasks (InternetAddress address) 00975 { 00976 this.tasks = new ConnectionPool (address); 00977 } 00978 00979 /*********************************************************************** 00980 00981 Return the name of this node (the network name of the device) 00982 00983 ***********************************************************************/ 00984 00985 override char[] toString () 00986 { 00987 return name; 00988 } 00989 00990 /*********************************************************************** 00991 00992 Remove this Node from the cluster. The node is disabled 00993 until it is seen to recover. 00994 00995 ***********************************************************************/ 00996 00997 void fail () 00998 { 00999 setEnabled (false); 01000 cache.close (); 01001 tasks.close (); 01002 } 01003 01004 /*********************************************************************** 01005 01006 Get the current state of this node 01007 01008 ***********************************************************************/ 01009 01010 bool isEnabled () 01011 { 01012 volatile 01013 return enabled; 01014 } 01015 01016 /*********************************************************************** 01017 01018 Set the enabled state of this node 01019 01020 ***********************************************************************/ 01021 01022 void setEnabled (bool enabled) 01023 { 01024 if (logger && logger.isEnabled (logger.Level.Trace)) 01025 { 01026 if (enabled) 01027 logger.trace ("enabling node '"~name~"' on port "~port); 01028 else 01029 logger.trace ("disabling node '"~name~"'"); 01030 } 01031 volatile 01032 this.enabled = enabled; 01033 } 01034 01035 /*********************************************************************** 01036 01037 request data; fail this Node if we can't connect. Note 01038 that we make several attempts to connect before writing 01039 the node off as a failure. 01040 01041 ***********************************************************************/ 01042 01043 bool request (ConnectionPool pool, ProtocolWriter writer, ProtocolReader reader, out IPayload payload) 01044 { 01045 ProtocolWriter.Command cmd; 01046 ulong time; 01047 char[] channel; 01048 char[] element; 01049 Connection connect; 01050 01051 // it's possible that the pool may have failed between 01052 // the point of selecting it, and the invocation itself 01053 if (pool is null) 01054 return false; 01055 01056 // get a connection to the server 01057 connect = pool.borrow (time = System.getMillisecs); 01058 01059 // talk to the server (try a few times if necessary) 01060 for (int attempts=3; attempts--;) 01061 try { 01062 // attach connection to reader and writer 01063 writer.getBuffer.setConduit (connect.getConduit); 01064 reader.getBuffer.setConduit (connect.getConduit); 01065 01066 // send request 01067 writer.flush (); 01068 01069 // load the returned object (don't retry!) 01070 attempts = 0; 01071 payload = reader.getPayload (channel, element, cmd); 01072 01073 // return borrowed connection 01074 connect.done (time); 01075 01076 } catch (PickleException x) 01077 { 01078 connect.done (time); 01079 throw x; 01080 } 01081 catch (IOException x) 01082 // attempt to reconnect? 01083 if (attempts == 0 || !connect.reset) 01084 { 01085 // that server is offline 01086 fail (); 01087 01088 // state that we failed 01089 return false; 01090 } 01091 01092 // is payload an exception? 01093 if (cmd != ProtocolWriter.Command.OK) 01094 { 01095 // is node full? 01096 if (cmd == ProtocolWriter.Command.Full) 01097 throw new ClusterFullException (channel); 01098 01099 // did node barf? 01100 if (cmd == ProtocolWriter.Command.Exception) 01101 throw new ClusterException (channel); 01102 01103 // bogus response 01104 throw new ClusterException ("invalid response from cluster server"); 01105 } 01106 01107 return true; 01108 } 01109 } 01110 01111 01112 /******************************************************************************* 01113 01114 Models a set of remote cluster nodes. 01115 01116 *******************************************************************************/ 01117 01118 private class NodeSet 01119 { 01120 private HashMap map; 01121 private Node[] nodes; 01122 private Node[] random; 01123 private ILogger logger; 01124 01125 /*********************************************************************** 01126 01127 ***********************************************************************/ 01128 01129 this (ILogger logger) 01130 { 01131 this.logger = logger; 01132 map = new HashMap (128, 0.75, 4); 01133 } 01134 01135 /*********************************************************************** 01136 01137 ***********************************************************************/ 01138 01139 ILogger getLogger () 01140 { 01141 return logger; 01142 } 01143 01144 /*********************************************************************** 01145 01146 Add a node to the list of servers, and sort them such that 01147 all clients will have the same ordered set 01148 01149 ***********************************************************************/ 01150 01151 synchronized void addNode (Node node) 01152 { 01153 char[] name = node.toString; 01154 01155 if (map.get (name)) 01156 throw new ClusterException ("Attempt to add cluster node '"~name~"' more than once"); 01157 01158 map.put (name, node); 01159 nodes ~= node; 01160 //nodes.sort; 01161 } 01162 01163 /*********************************************************************** 01164 01165 ***********************************************************************/ 01166 01167 void optimize () 01168 { 01169 // nodes are already in the same order across the cluster 01170 // since the configuration file should be identical ... 01171 01172 // nodes.sort; 01173 01174 // copy the node list 01175 random = nodes.dup; 01176 01177 // muddle up the duplicate list. This randomized list 01178 // is used when scanning the cluster for queued entries 01179 foreach (int i, Node n; random) 01180 { 01181 int j = Random.get (random.length); 01182 Node tmp = random[i]; 01183 random[i] = random[j]; 01184 random[j] = tmp; 01185 } 01186 } 01187 01188 /*********************************************************************** 01189 01190 ***********************************************************************/ 01191 01192 synchronized void enable (char[] name, ushort port1, ushort port2) 01193 { 01194 Node node = cast(Node) map.get (name); 01195 if (node is null) 01196 throw new ClusterException ("Attempt to enable unknown cluster node '"~name~"'"); 01197 else 01198 if (! node.isEnabled) 01199 { 01200 node.setCache (new InternetAddress (name, port1)); 01201 node.setTasks (new InternetAddress (name, port2)); 01202 node.setEnabled (true); 01203 } 01204 } 01205 01206 /*********************************************************************** 01207 01208 ***********************************************************************/ 01209 01210 IPayload request (ProtocolWriter writer, ProtocolReader reader, char[] key = null) 01211 { 01212 Node node; 01213 IPayload payload; 01214 01215 do { 01216 node = selectNode (key); 01217 } while (! node.request (node.cache, writer, reader, payload)); 01218 return payload; 01219 } 01220 01221 /*********************************************************************** 01222 01223 Select a cluster server based on a starting index. If the 01224 selected server is not currently enabled, we just try the 01225 next one. This behaviour should be consistent across each 01226 cluster client. 01227 01228 ***********************************************************************/ 01229 01230 private final Node selectNode (uint index) 01231 { 01232 uint i = nodes.length; 01233 01234 if (i) 01235 { 01236 index %= i; 01237 01238 while (i--) 01239 { 01240 Node node = nodes[index]; 01241 if (node.isEnabled) 01242 return node; 01243 01244 if (++index >= nodes.length) 01245 index = 0; 01246 } 01247 } 01248 throw new ClusterEmptyException ("No cluster servers are available"); 01249 } 01250 01251 /*********************************************************************** 01252 01253 Select a cluster server based on the specified key. If the 01254 selected server is not currently enabled, we just try the 01255 next one. This behaviour should be consistent across each 01256 cluster client. 01257 01258 ***********************************************************************/ 01259 01260 final Node selectNode (char[] key = null) 01261 { 01262 static uint index; 01263 01264 if (key.length) 01265 return selectNode (HashMap.jhash (key)); 01266 01267 // no key provided, so just roll the counter 01268 return selectNode (++index); 01269 } 01270 01271 /*********************************************************************** 01272 01273 Sweep the cluster servers. Returns true if the delegate 01274 returns true, false otherwise. The sweep is aborted when 01275 the delegate returns true. Note that this scans nodes in 01276 a randomized pattern, which should tend to avoid 'bursty' 01277 activity by a set of clients upon any one cluster server. 01278 01279 ***********************************************************************/ 01280 01281 bool scan (bool delegate (Node) dg) 01282 { 01283 Node node; 01284 int index = nodes.length; 01285 01286 while (index--) 01287 { 01288 // lookup the randomized set of server nodes 01289 node = random [index]; 01290 01291 // callback on each enabled node 01292 if (node.isEnabled) 01293 if (dg (node)) 01294 return true; 01295 } 01296 return false; 01297 } 01298 } 01299 01300 01301