00001 /******************************************************************************* 00002 00003 @file NetworkQueue.d 00004 00005 Copyright (C) 2004 Kris Bell 00006 00007 This software is provided 'as-is', without any express or implied 00008 warranty. In no event will the authors be held liable for damages 00009 of any kind arising from the use of this software. 00010 00011 Permission is hereby granted to anyone to use this software for any 00012 purpose, including commercial applications, and to alter it and/or 00013 redistribute it freely, subject to the following restrictions: 00014 00015 1. The origin of this software must not be misrepresented; you must 00016 not claim that you wrote the original software. If you use this 00017 software in a product, an acknowledgment within documentation of 00018 said product would be appreciated but is not required. 00019 00020 2. Altered source versions must be plainly marked as such, and must 00021 not be misrepresented as being the original software. 00022 00023 3. This notice may not be removed or altered from any distribution 00024 of the source. 00025 00026 00027 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 00028 00029 00030 @version Initial version, July 2004 00031 @author Kris 00032 00033 00034 *******************************************************************************/ 00035 00036 module mango.cluster.NetworkQueue; 00037 00038 private import mango.cluster.Client; 00039 00040 /******************************************************************************* 00041 00042 Exposes a gateway to the cluster queues, which collect IPayload 00043 objects until they are removed. Because there is a finite limit 00044 to the quantity of entries stored, the put() method may throw a 00045 ClusterFullException if it cannot add a new entry. 00046 00047 *******************************************************************************/ 00048 00049 class NetworkQueue : Client 00050 { 00051 /*********************************************************************** 00052 00053 Construct a NetworkQueue gateway on the provided QOS cluster 00054 for the specified channel. Each subsequent queue operation 00055 will take place over the given channel. 00056 00057 ***********************************************************************/ 00058 00059 this (ICluster cluster, char[] channel) 00060 { 00061 super (cluster, channel); 00062 } 00063 00064 /*********************************************************************** 00065 00066 Create a listener for this channel. Listeners are invoked 00067 when new content is placed into a corresponding queue. 00068 00069 ***********************************************************************/ 00070 00071 IConsumer createConsumer (IEventListener listener) 00072 { 00073 return getCluster.createConsumer (getChannel, IEvent.Style.Message, listener); 00074 } 00075 00076 /*********************************************************************** 00077 00078 Add an IPayload entry to the corresponding queue. This 00079 will throw a ClusterFullException if there is no space 00080 left in the clustered queue. 00081 00082 ***********************************************************************/ 00083 00084 void put (IPayload payload) 00085 { 00086 getCluster.putQueue (getChannel, payload); 00087 } 00088 00089 /*********************************************************************** 00090 00091 Query the cluster for queued entries on our corresponding 00092 channel. Returns, and removes, a matching entry from the 00093 cluster. This is the synchronous (polling) approach; you 00094 should use createConsumer() instead for asynchronous style 00095 notification instead. 00096 00097 ***********************************************************************/ 00098 00099 IPayload get () 00100 { 00101 return getCluster.getQueue (getChannel); 00102 } 00103 } 00104 00105 00106 /******************************************************************************* 00107 00108 *******************************************************************************/ 00109 00110 class NetworkMessage : NetworkQueue, IConsumer 00111 { 00112 private IChannel reply; 00113 private IConsumer consumer; 00114 00115 /*********************************************************************** 00116 00117 Construct a NetworkMessage gateway on the provided QOS cluster 00118 for the specified channel. Each subsequent queue operation 00119 will take place over the given channel. 00120 00121 You can listen for cluster replies by providing an optional 00122 IEventListener. Outgoing messages will be tagged appropriately 00123 such that a consumer can respond using IEvent.reply(). 00124 00125 ***********************************************************************/ 00126 00127 this (ICluster cluster, char[] channel, IEventListener listener = null) 00128 { 00129 super (cluster, channel); 00130 00131 if (listener) 00132 { 00133 reply = cluster.createChannel (channel ~ ".reply"); 00134 consumer = cluster.createConsumer (reply, IEvent.Style.Message, listener); 00135 } 00136 } 00137 00138 /*********************************************************************** 00139 00140 Cancel the listener. No more events will be dispatched to 00141 the reply IEventListener. 00142 00143 ***********************************************************************/ 00144 00145 void cancel() 00146 { 00147 if (consumer) 00148 consumer.cancel (); 00149 } 00150 00151 /*********************************************************************** 00152 00153 Add an IMessage entry to the corresponding queue. This 00154 will throw a ClusterFullException if there is no space 00155 left in the clustered queue. 00156 00157 ***********************************************************************/ 00158 00159 void put (IMessage message) 00160 { 00161 if (reply) 00162 message.setReply (reply.getName); 00163 00164 super.put (message); 00165 } 00166 } 00167 00168 00169 /******************************************************************************* 00170 00171 *******************************************************************************/ 00172 00173 class NetworkTask : NetworkMessage 00174 { 00175 /*********************************************************************** 00176 00177 Construct a NetworkTask gateway on the provided QOS cluster 00178 for the specified channel. Each subsequent queue operation 00179 will take place over the given channel. 00180 00181 You can listen for cluster replies by providing an optional 00182 IEventListener. Outgoing tasks will be tagged appropriately 00183 such that a consumer can respond using IEvent.reply(). 00184 00185 ***********************************************************************/ 00186 00187 this (ICluster cluster, char[] channel, IEventListener listener = null) 00188 { 00189 super (cluster, channel, listener); 00190 } 00191 00192 /*********************************************************************** 00193 00194 Add an ITask entry to the corresponding queue. This 00195 will throw a ClusterFullException if there is no space 00196 left in the clustered queue. 00197 00198 ***********************************************************************/ 00199 00200 void put (ITask task) 00201 { 00202 super.put (task); 00203 } 00204 }