00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 module mango.cluster.qos.socket.ClusterThread;
00040
00041 private import std.thread;
00042
00043 private import mango.io.Exception,
00044 mango.io.GrowBuffer,
00045 mango.io.ServerSocket,
00046 mango.io.ArrayAllocator;
00047
00048 private import mango.io.model.IConduit;
00049
00050 private import mango.utils.AbstractServer;
00051
00052 private import mango.cluster.qos.socket.Cluster,
00053 mango.cluster.qos.socket.ClusterCache,
00054 mango.cluster.qos.socket.ProtocolReader,
00055 mango.cluster.qos.socket.ProtocolWriter;
00056
00057
00058
00059
00060
00061
00062
00063 class ClusterThread : Thread
00064 {
00065 private ClusterCache cache;
00066 private ClusterQueue queue;
00067 private GrowBuffer buffer;
00068 private ProtocolReader reader;
00069 private ProtocolWriter writer;
00070 private ILogger logger;
00071 private char[] client;
00072 private Cluster cluster;
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082 this (AbstractServer server, IConduit conduit, Cluster cluster,
00083 ClusterCache cache, ClusterQueue queue)
00084 {
00085 buffer = new GrowBuffer (1024 * 8);
00086 buffer.setConduit (conduit);
00087
00088
00089 client = server.getRemoteAddress (conduit);
00090
00091
00092 writer = new ProtocolWriter (buffer);
00093
00094
00095 reader = new ProtocolReader (buffer);
00096 reader.setAllocator (new BufferAllocator);
00097
00098
00099 logger = server.getLogger;
00100 this.cluster = cluster;
00101 this.cache = cache;
00102 this.queue = queue;
00103 }
00104
00105
00106
00107
00108
00109 private final char[] msg (char[] action, char[] target)
00110 {
00111 return client ~ ": " ~ action ~ target ~ "'";
00112 }
00113
00114
00115
00116
00117
00118 private final char[] msg1 (char[] action, char[] target, char[] channel)
00119 {
00120 return msg (action, target) ~ " on channel '" ~ channel ~ "'";
00121 }
00122
00123
00124
00125
00126
00127 version (Ares)
00128 alias void ThreadReturn;
00129 else
00130 alias int ThreadReturn;
00131
00132 override ThreadReturn run ()
00133 {
00134 ProtocolWriter.Command cmd;
00135 char[] channel;
00136 char[] element;
00137
00138 logger.info (client ~ ": starting service handler");
00139
00140
00141
00142 try {
00143 while (true)
00144 {
00145
00146 buffer.clear ();
00147
00148
00149 ClusterContent content = reader.getPacket (channel, element, cmd);
00150
00151
00152 bool log = logger.isEnabled (logger.Level.Trace);
00153
00154 try {
00155 switch (cmd)
00156 {
00157 case ProtocolWriter.Command.Add:
00158 if (log)
00159 logger.trace (msg1 ("add cache entry '", element, channel));
00160
00161 cache.put (channel, element, content);
00162 writer.success ("success");
00163 break;
00164
00165 case ProtocolWriter.Command.Copy:
00166 if (log)
00167 logger.trace (msg1 ("copy cache entry '", element, channel));
00168
00169 writer.reply (cache.get (channel, element));
00170 break;
00171
00172 case ProtocolWriter.Command.Remove:
00173 if (log)
00174 logger.trace (msg1 ("remove cache entry '", element, channel));
00175
00176 writer.reply (cache.extract (channel, element));
00177 break;
00178
00179 case ProtocolWriter.Command.AddQueue:
00180 if (log)
00181 logger.trace (msg ("add queue entry '", channel));
00182
00183 if (queue.put (channel, content))
00184 writer.success ();
00185 else
00186 writer.full ("cluster queue is full");
00187 break;
00188
00189 case ProtocolWriter.Command.RemoveQueue:
00190 if (log)
00191 logger.trace (msg ("remove queue entry '", channel));
00192
00193 writer.reply (queue.get (channel));
00194 break;
00195
00196 default:
00197 throw new Exception ("invalid command");
00198 }
00199 } catch (Object x)
00200 {
00201 logger.error (msg ("cluster request exception '", x.toString));
00202 writer.exception ("cluster request exception: "~x.toString);
00203 }
00204
00205
00206 buffer.flush ();
00207 }
00208
00209 } catch (IOException x)
00210 if (! Socket.isHalting)
00211 logger.trace (msg ("cluster socket exception '", x.toString));
00212
00213 catch (Object x)
00214 logger.fatal (msg ("cluster runtime exception '", x.toString));
00215
00216
00217 logger.info (client ~ ": halting service handler");
00218
00219
00220 buffer.getConduit.close ();
00221
00222 return 0;
00223 }
00224 }
00225