00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 module mango.cluster.qos.socket.ClusterThread;
00040
00041 private import std.thread;
00042
00043 private import mango.io.Buffer,
00044 mango.io.Exception,
00045 mango.io.ServerSocket,
00046 mango.io.ArrayAllocator;
00047
00048 private import mango.io.model.IBuffer,
00049 mango.io.model.IConduit;
00050
00051 private import mango.utils.AbstractServer;
00052
00053 private import mango.cluster.qos.socket.Cluster,
00054 mango.cluster.qos.socket.ClusterCache,
00055 mango.cluster.qos.socket.ProtocolReader,
00056 mango.cluster.qos.socket.ProtocolWriter;
00057
00058
00059
00060
00061
00062
00063
00064 class ClusterThread : Thread
00065 {
00066 private ClusterCache cache;
00067 private ClusterQueue queue;
00068 private IBuffer buffer;
00069 private ProtocolReader reader;
00070 private ProtocolWriter writer;
00071 private ILogger logger;
00072 private char[] client;
00073 private Cluster cluster;
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083 this (AbstractServer server, IConduit conduit, Cluster cluster,
00084 ClusterCache cache, ClusterQueue queue)
00085 {
00086 buffer = new GrowableBuffer (1024 * 8);
00087 buffer.setConduit (conduit);
00088
00089
00090 client = server.getRemoteAddress (conduit);
00091
00092
00093 writer = new ProtocolWriter (buffer);
00094
00095
00096 reader = new ProtocolReader (buffer);
00097 reader.setAllocator (new BufferAllocator);
00098
00099
00100 logger = server.getLogger;
00101 this.cluster = cluster;
00102 this.cache = cache;
00103 this.queue = queue;
00104 }
00105
00106
00107
00108
00109
00110 private final char[] msg (char[] action, char[] target)
00111 {
00112 return client ~ ": " ~ action ~ target ~ "'";
00113 }
00114
00115
00116
00117
00118
00119 private final char[] msg1 (char[] action, char[] target, char[] channel)
00120 {
00121 return msg (action, target) ~ " on channel '" ~ channel ~ "'";
00122 }
00123
00124
00125
00126
00127
00128 override int run ()
00129 {
00130 ProtocolWriter.Command cmd;
00131 char[] channel;
00132 char[] element;
00133
00134 logger.info (client ~ ": starting service handler");
00135
00136
00137
00138 try {
00139 while (true)
00140 {
00141
00142 buffer.clear ();
00143
00144
00145 ClusterContent content = reader.getPacket (channel, element, cmd);
00146
00147
00148 bool log = logger.isEnabled (logger.Level.Trace);
00149
00150 try {
00151 switch (cmd)
00152 {
00153 case ProtocolWriter.Command.Add:
00154 if (log)
00155 logger.trace (msg1 ("add cache entry '", element, channel));
00156
00157 cache.put (channel, element, content);
00158 writer.success ("success");
00159 break;
00160
00161 case ProtocolWriter.Command.Copy:
00162 if (log)
00163 logger.trace (msg1 ("copy cache entry '", element, channel));
00164
00165 writer.reply (cache.get (channel, element));
00166 break;
00167
00168 case ProtocolWriter.Command.Remove:
00169 if (log)
00170 logger.trace (msg1 ("remove cache entry '", element, channel));
00171
00172 writer.reply (cache.extract (channel, element));
00173 break;
00174
00175 case ProtocolWriter.Command.AddQueue:
00176 if (log)
00177 logger.trace (msg ("add queue entry '", channel));
00178
00179 if (queue.put (channel, content))
00180 writer.success ();
00181 else
00182 writer.full ("cluster queue is full");
00183 break;
00184
00185 case ProtocolWriter.Command.RemoveQueue:
00186 if (log)
00187 logger.trace (msg ("remove queue entry '", channel));
00188
00189 writer.reply (queue.get (channel));
00190 break;
00191
00192 default:
00193 throw new Exception ("invalid command");
00194 }
00195 } catch (Object x)
00196 {
00197 logger.error (msg ("cluster request exception '", x.toString));
00198 writer.exception ("cluster request exception: "~x.toString);
00199 }
00200
00201
00202 buffer.flush ();
00203 }
00204
00205 } catch (IOException x)
00206 if (! Socket.isCancelled)
00207 logger.trace (msg ("cluster socket exception '", x.toString));
00208
00209 catch (Object x)
00210 logger.fatal (msg ("cluster runtime exception '", x.toString));
00211
00212
00213 logger.info (client ~ ": halting service handler");
00214
00215
00216 buffer.getConduit.close ();
00217
00218 return 0;
00219 }
00220 }
00221