Main Page | Class Hierarchy | Alphabetical List | Class List | File List | Class Members | File Members | Related Pages

ClusterThread.d

Go to the documentation of this file.
00001 /*******************************************************************************
00002 
00003         @file ClusterThread.d
00004         
00005         Copyright (C) 2004 Kris Bell
00006         
00007         This software is provided 'as-is', without any express or implied
00008         warranty. In no event will the authors be held liable for damages
00009         of any kind arising from the use of this software.
00010         
00011         Permission is hereby granted to anyone to use this software for any 
00012         purpose, including commercial applications, and to alter it and/or 
00013         redistribute it freely, subject to the following restrictions:
00014         
00015         1. The origin of this software must not be misrepresented; you must 
00016            not claim that you wrote the original software. If you use this 
00017            software in a product, an acknowledgment within documentation of 
00018            said product would be appreciated but is not required.
00019 
00020         2. Altered source versions must be plainly marked as such, and must 
00021            not be misrepresented as being the original software.
00022 
00023         3. This notice may not be removed or altered from any distribution
00024            of the source.
00025 
00026 
00027                         ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
00028 
00029         
00030         @version        Initial version, July 2004      
00031         @author         Kris
00032 
00033 
00034 *******************************************************************************/
00035 
00036 module mango.cluster.qos.socket.ClusterThread;
00037 
00038 private import  std.thread;
00039 
00040 private import  mango.io.Buffer,
00041                 mango.io.Exception,
00042                 mango.io.ServerSocket,
00043                 mango.io.ArrayAllocator;
00044 
00045 private import  mango.io.model.IBuffer,
00046                 mango.io.model.IConduit;
00047 
00048 private import  mango.utils.AbstractServer;
00049 
00050 private import  mango.cluster.qos.socket.Cluster,
00051                 mango.cluster.qos.socket.ClusterCache,
00052                 mango.cluster.qos.socket.ProtocolReader,
00053                 mango.cluster.qos.socket.ProtocolWriter;
00054 
00055 /******************************************************************************
00056 
00057         The socket QOS thread for handling client requests.
00058 
00059 ******************************************************************************/
00060 
00061 class ClusterThread : Thread
00062 {
00063         private ClusterCache    cache;
00064         private ClusterQueue    queue;
00065         private IBuffer         buffer;
00066         private ProtocolReader  reader;
00067         private ProtocolWriter  writer;
00068         private ILogger         logger;
00069         private char[]          client;
00070         private Cluster         cluster;
00071 
00072         /**********************************************************************
00073 
00074                 Note that the conduit stays open until the client kills it.
00075                 Also note that we use a GrowableBuffer here, which expands
00076                 as necessary to contain larger payloads.
00077 
00078         **********************************************************************/
00079 
00080         this (AbstractServer server, IConduit conduit, Cluster cluster, 
00081               ClusterCache cache, ClusterQueue queue)
00082         {
00083                 buffer = new GrowableBuffer (1024 * 8);
00084                 buffer.setConduit (conduit);
00085 
00086                 // get client infomation
00087                 client = server.getRemoteAddress (conduit);
00088 
00089                 // setup cluster protocol transcoders
00090                 writer = new ProtocolWriter (buffer);
00091 
00092                 // make the reader slice directly from the buffer content
00093                 reader = new ProtocolReader (buffer);
00094                 reader.setAllocator (new BufferAllocator (reader));
00095 
00096                 // save state
00097                 logger = server.getLogger;
00098                 this.cluster = cluster;
00099                 this.cache = cache;
00100                 this.queue = queue;
00101         }
00102 
00103         /**********************************************************************
00104 
00105         **********************************************************************/
00106 
00107         private final char[] msg (char[] action, char[] target)
00108         {
00109                 return client ~ ": " ~ action ~ target ~ "'"; 
00110         }
00111 
00112         /**********************************************************************
00113 
00114         **********************************************************************/
00115 
00116         private final char[] msg1 (char[] action, char[] target, char[] channel)
00117         {
00118                 return msg (action, target) ~ " on channel '" ~ channel ~ "'"; 
00119         }
00120 
00121         /**********************************************************************
00122 
00123         **********************************************************************/
00124 
00125         override int run ()
00126         {
00127                 ProtocolWriter.Command  cmd;
00128                 char[]                  channel;
00129                 char[]                  element;
00130 
00131                 logger.info (client ~ ": starting service handler");
00132                 //cache.trace ();
00133                 //queue.trace ();
00134                 
00135                 try {
00136                     while (true)
00137                           {
00138                           // start with a clear buffer
00139                           buffer.clear ();
00140 
00141                           // wait for a request to arrive
00142                           ClusterContent content = reader.getPacket (channel, element, cmd);
00143 
00144                           // are we enabled for logging?
00145                           bool log = logger.isEnabled (logger.Level.Trace);
00146 
00147                           try {
00148                               switch (cmd)
00149                                      {
00150                                      case ProtocolWriter.Command.Add:
00151                                           if (log)
00152                                               logger.trace (msg1 ("add cache entry '", element, channel)); 
00153 
00154                                           cache.put (channel, element, content);
00155                                           writer.success ("success"); 
00156                                           break;
00157  
00158                                      case ProtocolWriter.Command.Copy:
00159                                           if (log)
00160                                               logger.trace (msg1 ("copy cache entry '", element, channel)); 
00161 
00162                                           writer.reply (cache.get (channel, element)); 
00163                                           break;
00164   
00165                                      case ProtocolWriter.Command.Remove:
00166                                           if (log)
00167                                               logger.trace (msg1 ("remove cache entry '", element, channel)); 
00168 
00169                                           writer.reply (cache.extract (channel, element));
00170                                           break;
00171 
00172                                      case ProtocolWriter.Command.AddQueue:
00173                                           if (log)
00174                                               logger.trace (msg ("add queue entry '", channel)); 
00175 
00176                                           if (queue.put (channel, content))
00177                                               writer.success ();
00178                                           else
00179                                              writer.full ("cluster queue is full");
00180                                           break;
00181 
00182                                      case ProtocolWriter.Command.RemoveQueue:
00183                                           if (log)
00184                                               logger.trace (msg ("remove queue entry '", channel)); 
00185 
00186                                           writer.reply (queue.get (channel));
00187                                           break;
00188 
00189                                      default:
00190                                           throw new Exception ("invalid command");
00191                                      }
00192                                 } catch (Object x)
00193                                         {
00194                                         logger.error (msg ("cluster request exception '", x.toString));
00195                                         writer.exception ("cluster request exception: "~x.toString);
00196                                         }
00197 
00198                           // send response back to client
00199                           buffer.flush ();
00200                           }
00201 
00202                     } catch (IOException x)
00203                              if (! Socket.isCancelled)
00204                                    logger.trace (msg ("cluster socket exception '", x.toString));
00205 
00206                       catch (Object x)
00207                              logger.fatal (msg ("cluster runtime exception '", x.toString));
00208 
00209                 // log our halt status
00210                 logger.info (client ~ ": halting service handler");
00211 
00212                 // make sure we close the conduit
00213                 buffer.getConduit.close ();
00214 
00215                 return 0;
00216         }
00217 }
00218 

Generated on Sun Nov 7 19:06:50 2004 for Mango by doxygen 1.3.6