00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 module mango.cluster.qos.socket.ClusterThread;
00040
00041 private import std.thread;
00042
00043 private import mango.io.Buffer,
00044 mango.io.Exception,
00045 mango.io.ServerSocket,
00046 mango.io.ArrayAllocator;
00047
00048 private import mango.io.model.IBuffer,
00049 mango.io.model.IConduit;
00050
00051 private import mango.utils.AbstractServer;
00052
00053 private import mango.cluster.qos.socket.Cluster,
00054 mango.cluster.qos.socket.ClusterCache,
00055 mango.cluster.qos.socket.ProtocolReader,
00056 mango.cluster.qos.socket.ProtocolWriter;
00057
00058
00059
00060
00061
00062
00063
00064 class ClusterThread : Thread
00065 {
00066 private ClusterCache cache;
00067 private ClusterQueue queue;
00068 private IBuffer buffer;
00069 private ProtocolReader reader;
00070 private ProtocolWriter writer;
00071 private ILogger logger;
00072 private char[] client;
00073 private Cluster cluster;
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083 this (AbstractServer server, IConduit conduit, Cluster cluster,
00084 ClusterCache cache, ClusterQueue queue)
00085 {
00086 buffer = new GrowableBuffer (1024 * 8);
00087 buffer.setConduit (conduit);
00088
00089
00090 client = server.getRemoteAddress (conduit);
00091
00092
00093 writer = new ProtocolWriter (buffer);
00094
00095
00096 reader = new ProtocolReader (buffer);
00097 reader.setAllocator (new BufferAllocator);
00098
00099
00100 logger = server.getLogger;
00101 this.cluster = cluster;
00102 this.cache = cache;
00103 this.queue = queue;
00104 }
00105
00106
00107
00108
00109
00110 private final char[] msg (char[] action, char[] target)
00111 {
00112 return client ~ ": " ~ action ~ target ~ "'";
00113 }
00114
00115
00116
00117
00118
00119 private final char[] msg1 (char[] action, char[] target, char[] channel)
00120 {
00121 return msg (action, target) ~ " on channel '" ~ channel ~ "'";
00122 }
00123
00124
00125
00126
00127
00128 version (Ares)
00129 alias void ThreadReturn;
00130 else
00131 alias int ThreadReturn;
00132
00133 override ThreadReturn run ()
00134 {
00135 ProtocolWriter.Command cmd;
00136 char[] channel;
00137 char[] element;
00138
00139 logger.info (client ~ ": starting service handler");
00140
00141
00142
00143 try {
00144 while (true)
00145 {
00146
00147 buffer.clear ();
00148
00149
00150 ClusterContent content = reader.getPacket (channel, element, cmd);
00151
00152
00153 bool log = logger.isEnabled (logger.Level.Trace);
00154
00155 try {
00156 switch (cmd)
00157 {
00158 case ProtocolWriter.Command.Add:
00159 if (log)
00160 logger.trace (msg1 ("add cache entry '", element, channel));
00161
00162 cache.put (channel, element, content);
00163 writer.success ("success");
00164 break;
00165
00166 case ProtocolWriter.Command.Copy:
00167 if (log)
00168 logger.trace (msg1 ("copy cache entry '", element, channel));
00169
00170 writer.reply (cache.get (channel, element));
00171 break;
00172
00173 case ProtocolWriter.Command.Remove:
00174 if (log)
00175 logger.trace (msg1 ("remove cache entry '", element, channel));
00176
00177 writer.reply (cache.extract (channel, element));
00178 break;
00179
00180 case ProtocolWriter.Command.AddQueue:
00181 if (log)
00182 logger.trace (msg ("add queue entry '", channel));
00183
00184 if (queue.put (channel, content))
00185 writer.success ();
00186 else
00187 writer.full ("cluster queue is full");
00188 break;
00189
00190 case ProtocolWriter.Command.RemoveQueue:
00191 if (log)
00192 logger.trace (msg ("remove queue entry '", channel));
00193
00194 writer.reply (queue.get (channel));
00195 break;
00196
00197 default:
00198 throw new Exception ("invalid command");
00199 }
00200 } catch (Object x)
00201 {
00202 logger.error (msg ("cluster request exception '", x.toString));
00203 writer.exception ("cluster request exception: "~x.toString);
00204 }
00205
00206
00207 buffer.flush ();
00208 }
00209
00210 } catch (IOException x)
00211 if (! Socket.isCancelled)
00212 logger.trace (msg ("cluster socket exception '", x.toString));
00213
00214 catch (Object x)
00215 logger.fatal (msg ("cluster runtime exception '", x.toString));
00216
00217
00218 logger.info (client ~ ": halting service handler");
00219
00220
00221 buffer.getConduit.close ();
00222
00223 return 0;
00224 }
00225 }
00226