Main Page | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Class Members | File Members | Related Pages

ClusterThread.d

Go to the documentation of this file.
00001 /*******************************************************************************
00002 
00003         @file ClusterThread.d
00004         
00005         Copyright (c) 2004 Kris Bell
00006         
00007         This software is provided 'as-is', without any express or implied
00008         warranty. In no event will the authors be held liable for damages
00009         of any kind arising from the use of this software.
00010         
00011         Permission is hereby granted to anyone to use this software for any 
00012         purpose, including commercial applications, and to alter it and/or 
00013         redistribute it freely, subject to the following restrictions:
00014         
00015         1. The origin of this software must not be misrepresented; you must 
00016            not claim that you wrote the original software. If you use this 
00017            software in a product, an acknowledgment within documentation of 
00018            said product would be appreciated but is not required.
00019 
00020         2. Altered source versions must be plainly marked as such, and must 
00021            not be misrepresented as being the original software.
00022 
00023         3. This notice may not be removed or altered from any distribution
00024            of the source.
00025 
00026         4. Derivative works are permitted, but they must carry this notice
00027            in full and credit the original source.
00028 
00029 
00030                         ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
00031 
00032         
00033         @version        Initial version, July 2004      
00034         @author         Kris
00035 
00036 
00037 *******************************************************************************/
00038 
00039 module mango.cluster.qos.socket.ClusterThread;
00040 
00041 private import  std.thread;
00042 
00043 private import  mango.io.Buffer,
00044                 mango.io.Exception,
00045                 mango.io.ServerSocket,
00046                 mango.io.ArrayAllocator;
00047 
00048 private import  mango.io.model.IBuffer,
00049                 mango.io.model.IConduit;
00050 
00051 private import  mango.utils.AbstractServer;
00052 
00053 private import  mango.cluster.qos.socket.Cluster,
00054                 mango.cluster.qos.socket.ClusterCache,
00055                 mango.cluster.qos.socket.ProtocolReader,
00056                 mango.cluster.qos.socket.ProtocolWriter;
00057 
00058 /******************************************************************************
00059 
00060         The socket QOS thread for handling client requests.
00061 
00062 ******************************************************************************/
00063 
00064 class ClusterThread : Thread
00065 {
00066         private ClusterCache    cache;
00067         private ClusterQueue    queue;
00068         private IBuffer         buffer;
00069         private ProtocolReader  reader;
00070         private ProtocolWriter  writer;
00071         private ILogger         logger;
00072         private char[]          client;
00073         private Cluster         cluster;
00074 
00075         /**********************************************************************
00076 
00077                 Note that the conduit stays open until the client kills it.
00078                 Also note that we use a GrowableBuffer here, which expands
00079                 as necessary to contain larger payloads.
00080 
00081         **********************************************************************/
00082 
00083         this (AbstractServer server, IConduit conduit, Cluster cluster, 
00084               ClusterCache cache, ClusterQueue queue)
00085         {
00086                 buffer = new GrowableBuffer (1024 * 8);
00087                 buffer.setConduit (conduit);
00088 
00089                 // get client infomation
00090                 client = server.getRemoteAddress (conduit);
00091 
00092                 // setup cluster protocol transcoders
00093                 writer = new ProtocolWriter (buffer);
00094 
00095                 // make the reader slice directly from the buffer content
00096                 reader = new ProtocolReader (buffer);
00097                 reader.setAllocator (new BufferAllocator);
00098 
00099                 // save state
00100                 logger = server.getLogger;
00101                 this.cluster = cluster;
00102                 this.cache = cache;
00103                 this.queue = queue;
00104         }
00105 
00106         /**********************************************************************
00107 
00108         **********************************************************************/
00109 
00110         private final char[] msg (char[] action, char[] target)
00111         {
00112                 return client ~ ": " ~ action ~ target ~ "'"; 
00113         }
00114 
00115         /**********************************************************************
00116 
00117         **********************************************************************/
00118 
00119         private final char[] msg1 (char[] action, char[] target, char[] channel)
00120         {
00121                 return msg (action, target) ~ " on channel '" ~ channel ~ "'"; 
00122         }
00123 
00124         /**********************************************************************
00125 
00126         **********************************************************************/
00127 
00128         version (Ares) 
00129                  alias void ThreadReturn;
00130               else
00131                  alias int ThreadReturn;
00132 
00133         override ThreadReturn run ()
00134         {
00135                 ProtocolWriter.Command  cmd;
00136                 char[]                  channel;
00137                 char[]                  element;
00138 
00139                 logger.info (client ~ ": starting service handler");
00140                 //cache.trace ();
00141                 //queue.trace ();
00142                 
00143                 try {
00144                     while (true)
00145                           {
00146                           // start with a clear buffer
00147                           buffer.clear ();
00148 
00149                           // wait for a request to arrive
00150                           ClusterContent content = reader.getPacket (channel, element, cmd);
00151 
00152                           // are we enabled for logging?
00153                           bool log = logger.isEnabled (logger.Level.Trace);
00154 
00155                           try {
00156                               switch (cmd)
00157                                      {
00158                                      case ProtocolWriter.Command.Add:
00159                                           if (log)
00160                                               logger.trace (msg1 ("add cache entry '", element, channel)); 
00161 
00162                                           cache.put (channel, element, content);
00163                                           writer.success ("success"); 
00164                                           break;
00165  
00166                                      case ProtocolWriter.Command.Copy:
00167                                           if (log)
00168                                               logger.trace (msg1 ("copy cache entry '", element, channel)); 
00169 
00170                                           writer.reply (cache.get (channel, element)); 
00171                                           break;
00172   
00173                                      case ProtocolWriter.Command.Remove:
00174                                           if (log)
00175                                               logger.trace (msg1 ("remove cache entry '", element, channel)); 
00176 
00177                                           writer.reply (cache.extract (channel, element));
00178                                           break;
00179 
00180                                      case ProtocolWriter.Command.AddQueue:
00181                                           if (log)
00182                                               logger.trace (msg ("add queue entry '", channel)); 
00183 
00184                                           if (queue.put (channel, content))
00185                                               writer.success ();
00186                                           else
00187                                              writer.full ("cluster queue is full");
00188                                           break;
00189 
00190                                      case ProtocolWriter.Command.RemoveQueue:
00191                                           if (log)
00192                                               logger.trace (msg ("remove queue entry '", channel)); 
00193 
00194                                           writer.reply (queue.get (channel));
00195                                           break;
00196 
00197                                      default:
00198                                           throw new Exception ("invalid command");
00199                                      }
00200                                 } catch (Object x)
00201                                         {
00202                                         logger.error (msg ("cluster request exception '", x.toString));
00203                                         writer.exception ("cluster request exception: "~x.toString);
00204                                         }
00205 
00206                           // send response back to client
00207                           buffer.flush ();
00208                           }
00209 
00210                     } catch (IOException x)
00211                              if (! Socket.isCancelled)
00212                                    logger.trace (msg ("cluster socket exception '", x.toString));
00213 
00214                       catch (Object x)
00215                              logger.fatal (msg ("cluster runtime exception '", x.toString));
00216 
00217                 // log our halt status
00218                 logger.info (client ~ ": halting service handler");
00219 
00220                 // make sure we close the conduit
00221                 buffer.getConduit.close ();
00222 
00223                 return 0;
00224         }
00225 }
00226 

Generated on Mon Nov 14 10:59:37 2005 for Mango by  doxygen 1.4.0