00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036 module mango.cluster.qos.socket.ClusterThread;
00037
00038 private import std.thread;
00039
00040 private import mango.io.Buffer,
00041 mango.io.Exception,
00042 mango.io.ServerSocket,
00043 mango.io.ArrayAllocator;
00044
00045 private import mango.io.model.IBuffer,
00046 mango.io.model.IConduit;
00047
00048 private import mango.utils.AbstractServer;
00049
00050 private import mango.cluster.qos.socket.Cluster,
00051 mango.cluster.qos.socket.ClusterCache,
00052 mango.cluster.qos.socket.ProtocolReader,
00053 mango.cluster.qos.socket.ProtocolWriter;
00054
00055
00056
00057
00058
00059
00060
00061 class ClusterThread : Thread
00062 {
00063 private ClusterCache cache;
00064 private ClusterQueue queue;
00065 private IBuffer buffer;
00066 private ProtocolReader reader;
00067 private ProtocolWriter writer;
00068 private ILogger logger;
00069 private char[] client;
00070 private Cluster cluster;
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080 this (AbstractServer server, IConduit conduit, Cluster cluster,
00081 ClusterCache cache, ClusterQueue queue)
00082 {
00083 buffer = new GrowableBuffer (1024 * 8);
00084 buffer.setConduit (conduit);
00085
00086
00087 client = server.getRemoteAddress (conduit);
00088
00089
00090 writer = new ProtocolWriter (buffer);
00091
00092
00093 reader = new ProtocolReader (buffer);
00094 reader.setAllocator (new BufferAllocator (reader));
00095
00096
00097 logger = server.getLogger;
00098 this.cluster = cluster;
00099 this.cache = cache;
00100 this.queue = queue;
00101 }
00102
00103
00104
00105
00106
00107 private final char[] msg (char[] action, char[] target)
00108 {
00109 return client ~ ": " ~ action ~ target ~ "'";
00110 }
00111
00112
00113
00114
00115
00116 private final char[] msg1 (char[] action, char[] target, char[] channel)
00117 {
00118 return msg (action, target) ~ " on channel '" ~ channel ~ "'";
00119 }
00120
00121
00122
00123
00124
00125 override int run ()
00126 {
00127 ProtocolWriter.Command cmd;
00128 char[] channel;
00129 char[] element;
00130
00131 logger.info (client ~ ": starting service handler");
00132
00133
00134
00135 try {
00136 while (true)
00137 {
00138
00139 buffer.clear ();
00140
00141
00142 ClusterContent content = reader.getPacket (channel, element, cmd);
00143
00144
00145 bool log = logger.isEnabled (logger.Level.Trace);
00146
00147 try {
00148 switch (cmd)
00149 {
00150 case ProtocolWriter.Command.Add:
00151 if (log)
00152 logger.trace (msg1 ("add cache entry '", element, channel));
00153
00154 cache.put (channel, element, content);
00155 writer.success ("success");
00156 break;
00157
00158 case ProtocolWriter.Command.Copy:
00159 if (log)
00160 logger.trace (msg1 ("copy cache entry '", element, channel));
00161
00162 writer.reply (cache.get (channel, element));
00163 break;
00164
00165 case ProtocolWriter.Command.Remove:
00166 if (log)
00167 logger.trace (msg1 ("remove cache entry '", element, channel));
00168
00169 writer.reply (cache.extract (channel, element));
00170 break;
00171
00172 case ProtocolWriter.Command.AddQueue:
00173 if (log)
00174 logger.trace (msg ("add queue entry '", channel));
00175
00176 if (queue.put (channel, content))
00177 writer.success ();
00178 else
00179 writer.full ("cluster queue is full");
00180 break;
00181
00182 case ProtocolWriter.Command.RemoveQueue:
00183 if (log)
00184 logger.trace (msg ("remove queue entry '", channel));
00185
00186 writer.reply (queue.get (channel));
00187 break;
00188
00189 default:
00190 throw new Exception ("invalid command");
00191 }
00192 } catch (Object x)
00193 {
00194 logger.error (msg ("cluster request exception '", x.toString));
00195 writer.exception ("cluster request exception: "~x.toString);
00196 }
00197
00198
00199 buffer.flush ();
00200 }
00201
00202 } catch (IOException x)
00203 if (! Socket.isCancelled)
00204 logger.trace (msg ("cluster socket exception '", x.toString));
00205
00206 catch (Object x)
00207 logger.fatal (msg ("cluster runtime exception '", x.toString));
00208
00209
00210 logger.info (client ~ ": halting service handler");
00211
00212
00213 buffer.getConduit.close ();
00214
00215 return 0;
00216 }
00217 }
00218