Main Page | Class Hierarchy | Alphabetical List | Class List | Directories | File List | Class Members | File Members | Related Pages

ClusterThread.d

Go to the documentation of this file.
00001 /*******************************************************************************
00002 
00003         @file ClusterThread.d
00004         
00005         Copyright (c) 2004 Kris Bell
00006         
00007         This software is provided 'as-is', without any express or implied
00008         warranty. In no event will the authors be held liable for damages
00009         of any kind arising from the use of this software.
00010         
00011         Permission is hereby granted to anyone to use this software for any 
00012         purpose, including commercial applications, and to alter it and/or 
00013         redistribute it freely, subject to the following restrictions:
00014         
00015         1. The origin of this software must not be misrepresented; you must 
00016            not claim that you wrote the original software. If you use this 
00017            software in a product, an acknowledgment within documentation of 
00018            said product would be appreciated but is not required.
00019 
00020         2. Altered source versions must be plainly marked as such, and must 
00021            not be misrepresented as being the original software.
00022 
00023         3. This notice may not be removed or altered from any distribution
00024            of the source.
00025 
00026         4. Derivative works are permitted, but they must carry this notice
00027            in full and credit the original source.
00028 
00029 
00030                         ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
00031 
00032         
00033         @version        Initial version, July 2004      
00034         @author         Kris
00035 
00036 
00037 *******************************************************************************/
00038 
00039 module mango.cluster.qos.socket.ClusterThread;
00040 
00041 private import  std.thread;
00042 
00043 private import  mango.io.Exception,
00044                 mango.io.GrowBuffer,
00045                 mango.io.ServerSocket,
00046                 mango.io.ArrayAllocator;
00047 
00048 private import  mango.io.model.IConduit;
00049 
00050 private import  mango.utils.AbstractServer;
00051 
00052 private import  mango.cluster.qos.socket.Cluster,
00053                 mango.cluster.qos.socket.ClusterCache,
00054                 mango.cluster.qos.socket.ProtocolReader,
00055                 mango.cluster.qos.socket.ProtocolWriter;
00056 
00057 /******************************************************************************
00058 
00059         The socket QOS thread for handling client requests.
00060 
00061 ******************************************************************************/
00062 
00063 class ClusterThread : Thread
00064 {
00065         private ClusterCache    cache;
00066         private ClusterQueue    queue;
00067         private GrowBuffer      buffer;
00068         private ProtocolReader  reader;
00069         private ProtocolWriter  writer;
00070         private ILogger         logger;
00071         private char[]          client;
00072         private Cluster         cluster;
00073 
00074         /**********************************************************************
00075 
00076                 Note that the conduit stays open until the client kills it.
00077                 Also note that we use a GrowableBuffer here, which expands
00078                 as necessary to contain larger payloads.
00079 
00080         **********************************************************************/
00081 
00082         this (AbstractServer server, IConduit conduit, Cluster cluster, 
00083               ClusterCache cache, ClusterQueue queue)
00084         {
00085                 buffer = new GrowBuffer (1024 * 8);
00086                 buffer.setConduit (conduit);
00087 
00088                 // get client infomation
00089                 client = server.getRemoteAddress (conduit);
00090 
00091                 // setup cluster protocol transcoders
00092                 writer = new ProtocolWriter (buffer);
00093 
00094                 // make the reader slice directly from the buffer content
00095                 reader = new ProtocolReader (buffer);
00096                 reader.setAllocator (new BufferAllocator);
00097 
00098                 // save state
00099                 logger = server.getLogger;
00100                 this.cluster = cluster;
00101                 this.cache = cache;
00102                 this.queue = queue;
00103         }
00104 
00105         /**********************************************************************
00106 
00107         **********************************************************************/
00108 
00109         private final char[] msg (char[] action, char[] target)
00110         {
00111                 return client ~ ": " ~ action ~ target ~ "'"; 
00112         }
00113 
00114         /**********************************************************************
00115 
00116         **********************************************************************/
00117 
00118         private final char[] msg1 (char[] action, char[] target, char[] channel)
00119         {
00120                 return msg (action, target) ~ " on channel '" ~ channel ~ "'"; 
00121         }
00122 
00123         /**********************************************************************
00124 
00125         **********************************************************************/
00126 
00127         version (Ares) 
00128                  alias void ThreadReturn;
00129               else
00130                  alias int ThreadReturn;
00131 
00132         override ThreadReturn run ()
00133         {
00134                 ProtocolWriter.Command  cmd;
00135                 char[]                  channel;
00136                 char[]                  element;
00137 
00138                 logger.info (client ~ ": starting service handler");
00139                 //cache.trace ();
00140                 //queue.trace ();
00141                 
00142                 try {
00143                     while (true)
00144                           {
00145                           // start with a clear buffer
00146                           buffer.clear ();
00147 
00148                           // wait for a request to arrive
00149                           ClusterContent content = reader.getPacket (channel, element, cmd);
00150 
00151                           // are we enabled for logging?
00152                           bool log = logger.isEnabled (logger.Level.Trace);
00153 
00154                           try {
00155                               switch (cmd)
00156                                      {
00157                                      case ProtocolWriter.Command.Add:
00158                                           if (log)
00159                                               logger.trace (msg1 ("add cache entry '", element, channel)); 
00160 
00161                                           cache.put (channel, element, content);
00162                                           writer.success ("success"); 
00163                                           break;
00164  
00165                                      case ProtocolWriter.Command.Copy:
00166                                           if (log)
00167                                               logger.trace (msg1 ("copy cache entry '", element, channel)); 
00168 
00169                                           writer.reply (cache.get (channel, element)); 
00170                                           break;
00171   
00172                                      case ProtocolWriter.Command.Remove:
00173                                           if (log)
00174                                               logger.trace (msg1 ("remove cache entry '", element, channel)); 
00175 
00176                                           writer.reply (cache.extract (channel, element));
00177                                           break;
00178 
00179                                      case ProtocolWriter.Command.AddQueue:
00180                                           if (log)
00181                                               logger.trace (msg ("add queue entry '", channel)); 
00182 
00183                                           if (queue.put (channel, content))
00184                                               writer.success ();
00185                                           else
00186                                              writer.full ("cluster queue is full");
00187                                           break;
00188 
00189                                      case ProtocolWriter.Command.RemoveQueue:
00190                                           if (log)
00191                                               logger.trace (msg ("remove queue entry '", channel)); 
00192 
00193                                           writer.reply (queue.get (channel));
00194                                           break;
00195 
00196                                      default:
00197                                           throw new Exception ("invalid command");
00198                                      }
00199                                 } catch (Object x)
00200                                         {
00201                                         logger.error (msg ("cluster request exception '", x.toString));
00202                                         writer.exception ("cluster request exception: "~x.toString);
00203                                         }
00204 
00205                           // send response back to client
00206                           buffer.flush ();
00207                           }
00208 
00209                     } catch (IOException x)
00210                              if (! Socket.isHalting)
00211                                    logger.trace (msg ("cluster socket exception '", x.toString));
00212 
00213                       catch (Object x)
00214                              logger.fatal (msg ("cluster runtime exception '", x.toString));
00215 
00216                 // log our halt status
00217                 logger.info (client ~ ": halting service handler");
00218 
00219                 // make sure we close the conduit
00220                 buffer.getConduit.close ();
00221 
00222                 return 0;
00223         }
00224 }
00225 

Generated on Sat Dec 24 17:28:32 2005 for Mango by  doxygen 1.4.0