00001 /******************************************************************************* 00002 00003 @file TaskServer.d 00004 00005 Copyright (c) 2004 Kris Bell 00006 00007 This software is provided 'as-is', without any express or implied 00008 warranty. In no event will the authors be held liable for damages 00009 of any kind arising from the use of this software. 00010 00011 Permission is hereby granted to anyone to use this software for any 00012 purpose, including commercial applications, and to alter it and/or 00013 redistribute it freely, subject to the following restrictions: 00014 00015 1. The origin of this software must not be misrepresented; you must 00016 not claim that you wrote the original software. If you use this 00017 software in a product, an acknowledgment within documentation of 00018 said product would be appreciated but is not required. 00019 00020 2. Altered source versions must be plainly marked as such, and must 00021 not be misrepresented as being the original software. 00022 00023 3. This notice may not be removed or altered from any distribution 00024 of the source. 00025 00026 4. Derivative works are permitted, but they must carry this notice 00027 in full and credit the original source. 00028 00029 00030 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 00031 00032 00033 @version Initial version, July 2004 00034 @author Kris 00035 00036 00037 *******************************************************************************/ 00038 00039 module mango.cluster.TaskServer; 00040 00041 private import mango.log.Admin, 00042 mango.log.Logger; 00043 00044 public import mango.log.model.ILogger; 00045 00046 private import mango.io.Socket, 00047 mango.io.PickleRegistry; 00048 00049 private import mango.io.model.IConduit; 00050 00051 private import mango.servlet.Servlet, 00052 mango.servlet.ServletContext, 00053 mango.servlet.ServletProvider; 00054 00055 private import mango.http.server.HttpServer; 00056 00057 private import mango.cluster.qos.socket.Cluster; 00058 00059 00060 /****************************************************************************** 00061 00062 @code 00063 import MyTask1, 00064 MyTask2, 00065 MyTask3; 00066 00067 00068 class MyTaskServer : TaskServer 00069 { 00070 this (char[] filename) 00071 { 00072 auto FileConduit config = new FileConduit (filename); 00073 ILogger logger = Logger.getLogger ("my.task.server"); 00074 00075 super (new Cluster (logger, config)); 00076 } 00077 00078 00079 void enroll (ILogger logger) 00080 { 00081 addConsumer (new MyTask1); 00082 addConsumer (new MyTask2); 00083 addConsumer (new MyTask3); 00084 } 00085 } 00086 00087 00088 main () 00089 { 00090 MyTaskServer mts = new MyTaskServer ("cluster.properties"); 00091 00092 mts.start (); 00093 } 00094 @endcode 00095 00096 ******************************************************************************/ 00097 00098 class TaskServer 00099 { 00100 private ILogger logger; 00101 private ICluster cluster; 00102 private HttpServer adminServer; 00103 00104 /********************************************************************** 00105 00106 **********************************************************************/ 00107 00108 abstract void enroll (ILogger logger); 00109 00110 /********************************************************************** 00111 00112 **********************************************************************/ 00113 00114 this (ICluster cluster, ushort adminPort = 0) 00115 { 00116 this.cluster = cluster; 00117 this.logger = cluster.getLogger; 00118 00119 if (adminPort) 00120 { 00121 // construct a servlet-provider 00122 ServletProvider sp = new ServletProvider; 00123 00124 // create a context for admin servlets 00125 ServletContext admin = sp.addContext (new AdminContext (sp, "/admin")); 00126 00127 // create a (1 thread) server using the IProvider to service requests 00128 // and start listening for requests (but this thread does not listen) 00129 adminServer = new HttpServer (sp, new InternetAddress (adminPort), 1, logger); 00130 } 00131 00132 } 00133 00134 /********************************************************************** 00135 00136 **********************************************************************/ 00137 00138 ICluster getCluster () 00139 { 00140 return cluster; 00141 } 00142 00143 /********************************************************************** 00144 00145 **********************************************************************/ 00146 00147 void start () 00148 { 00149 enroll (logger); 00150 00151 if (adminServer) 00152 adminServer.start (); 00153 } 00154 00155 /********************************************************************** 00156 00157 **********************************************************************/ 00158 00159 IConsumer addConsumer (IPickleFactory task, bool enroll = false) 00160 { 00161 char[] name = task.getGuid; 00162 00163 cluster.getLogger.info ("adding consumer '" ~ name ~ "'"); 00164 00165 if (enroll) 00166 PickleRegistry.enroll (task); 00167 return new TaskConsumer (cluster, name); 00168 } 00169 00170 /********************************************************************** 00171 00172 00173 **********************************************************************/ 00174 00175 class TaskConsumer : IEventListener, IConsumer 00176 { 00177 private char[] channel; 00178 private ILogger logger; 00179 private IConsumer consumer; 00180 00181 /************************************************************** 00182 00183 **************************************************************/ 00184 00185 this (ICluster cluster, char[] channel) 00186 { 00187 this.channel = channel; 00188 this.logger = cluster.getLogger (); 00189 00190 IChannel ch = cluster.createChannel (channel); 00191 consumer = cluster.createConsumer (ch, IEvent.Style.Message, this); 00192 } 00193 00194 /************************************************************** 00195 00196 **************************************************************/ 00197 00198 void cancel () 00199 { 00200 consumer.cancel (); 00201 } 00202 00203 /************************************************************** 00204 00205 Declares the contract for listeners within the 00206 cluster package. When creating a listener, you 00207 provide a class that implements this interface. 00208 The notify() method is invoked (on a seperate 00209 thread) whenever a relevant event occurs. 00210 00211 **************************************************************/ 00212 00213 void notify (IEvent event, IPayload payload) 00214 { 00215 if (logger.isEnabled (logger.Level.Info)) 00216 logger.info ("executing task from channel '" ~ channel ~ "'"); 00217 00218 // instantiate the task 00219 ITask task = cast(ITask) payload; 00220 00221 // fire it up! 00222 if (task) 00223 { 00224 task.execute (); 00225 00226 if (logger.isEnabled (logger.Level.Trace)) 00227 logger.trace ("completed task from channel '" ~ channel ~ "'"); 00228 } 00229 else 00230 logger.error ("received an invalid task on channel '" ~ channel ~ "'"); 00231 } 00232 } 00233 } 00234