00001 /******************************************************************************* 00002 00003 @file TaskServer.d 00004 00005 Copyright (C) 2004 Kris Bell 00006 00007 This software is provided 'as-is', without any express or implied 00008 warranty. In no event will the authors be held liable for damages 00009 of any kind arising from the use of this software. 00010 00011 Permission is hereby granted to anyone to use this software for any 00012 purpose, including commercial applications, and to alter it and/or 00013 redistribute it freely, subject to the following restrictions: 00014 00015 1. The origin of this software must not be misrepresented; you must 00016 not claim that you wrote the original software. If you use this 00017 software in a product, an acknowledgment within documentation of 00018 said product would be appreciated but is not required. 00019 00020 2. Altered source versions must be plainly marked as such, and must 00021 not be misrepresented as being the original software. 00022 00023 3. This notice may not be removed or altered from any distribution 00024 of the source. 00025 00026 00027 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 00028 00029 00030 @version Initial version, July 2004 00031 @author Kris 00032 00033 00034 *******************************************************************************/ 00035 00036 module mango.cluster.TaskServer; 00037 00038 private import mango.log.Admin, 00039 mango.log.Logger; 00040 00041 public import mango.log.model.ILogger; 00042 00043 private import mango.io.Socket, 00044 mango.io.PickleRegistry; 00045 00046 private import mango.io.model.IConduit; 00047 00048 private import mango.servlet.Servlet, 00049 mango.servlet.ServletContext, 00050 mango.servlet.ServletProvider; 00051 00052 private import mango.http.server.HttpServer; 00053 00054 private import mango.cluster.qos.socket.Cluster; 00055 00056 00057 /****************************************************************************** 00058 00059 @code 00060 import MyTask1, 00061 MyTask2, 00062 MyTask3; 00063 00064 00065 class MyTaskServer : TaskServer 00066 { 00067 this (char[] filename) 00068 { 00069 auto FileConduit config = new FileConduit (filename); 00070 ILogger logger = Logger.getLogger ("my.task.server"); 00071 00072 super (new Cluster (logger, config)); 00073 } 00074 00075 00076 void enroll (ILogger logger) 00077 { 00078 addConsumer (new MyTask1); 00079 addConsumer (new MyTask2); 00080 addConsumer (new MyTask3); 00081 } 00082 } 00083 00084 00085 main () 00086 { 00087 MyTaskServer mts = new MyTaskServer ("cluster.properties"); 00088 00089 mts.start (); 00090 } 00091 @endcode 00092 00093 ******************************************************************************/ 00094 00095 class TaskServer 00096 { 00097 private ILogger logger; 00098 private ICluster cluster; 00099 private HttpServer adminServer; 00100 00101 /********************************************************************** 00102 00103 **********************************************************************/ 00104 00105 abstract void enroll (ILogger logger); 00106 00107 /********************************************************************** 00108 00109 **********************************************************************/ 00110 00111 this (ICluster cluster, int adminPort = 0) 00112 { 00113 this.cluster = cluster; 00114 this.logger = cluster.getLogger; 00115 00116 if (adminPort) 00117 { 00118 // construct a servlet-provider 00119 ServletProvider sp = new ServletProvider; 00120 00121 // create a context for admin servlets 00122 ServletContext admin = sp.addContext (new AdminContext (sp, "/admin")); 00123 00124 // create a (1 thread) server using the IProvider to service requests 00125 // and start listening for requests (but this thread does not listen) 00126 adminServer = new HttpServer (sp, new InternetAddress (adminPort), 1, logger); 00127 } 00128 00129 } 00130 00131 /********************************************************************** 00132 00133 **********************************************************************/ 00134 00135 ICluster getCluster () 00136 { 00137 return cluster; 00138 } 00139 00140 /********************************************************************** 00141 00142 **********************************************************************/ 00143 00144 void start () 00145 { 00146 enroll (logger); 00147 00148 if (adminServer) 00149 adminServer.start (); 00150 } 00151 00152 /********************************************************************** 00153 00154 **********************************************************************/ 00155 00156 IConsumer addConsumer (IPickleFactory task, bool enroll = false) 00157 { 00158 char[] name = task.getGuid; 00159 00160 cluster.getLogger.info ("adding consumer '" ~ name ~ "'"); 00161 00162 if (enroll) 00163 PickleRegistry.enroll (task); 00164 return new TaskConsumer (cluster, name); 00165 } 00166 00167 /********************************************************************** 00168 00169 00170 **********************************************************************/ 00171 00172 class TaskConsumer : IEventListener, IConsumer 00173 { 00174 private char[] channel; 00175 private ILogger logger; 00176 private IConsumer consumer; 00177 00178 /************************************************************** 00179 00180 **************************************************************/ 00181 00182 this (ICluster cluster, char[] channel) 00183 { 00184 this.channel = channel; 00185 this.logger = cluster.getLogger (); 00186 00187 IChannel ch = cluster.createChannel (channel); 00188 consumer = cluster.createConsumer (ch, IEvent.Style.Message, this); 00189 } 00190 00191 /************************************************************** 00192 00193 **************************************************************/ 00194 00195 void cancel () 00196 { 00197 consumer.cancel (); 00198 } 00199 00200 /************************************************************** 00201 00202 Declares the contract for listeners within the 00203 cluster package. When creating a listener, you 00204 provide a class that implements this interface. 00205 The notify() method is invoked (on a seperate 00206 thread) whenever a relevant event occurs. 00207 00208 **************************************************************/ 00209 00210 void notify (IEvent event, IPayload payload) 00211 { 00212 if (logger.isEnabled (logger.Level.Info)) 00213 logger.info ("executing task from channel '" ~ channel ~ "'"); 00214 00215 // instantiate the task 00216 ITask task = cast(ITask) payload; 00217 00218 // fire it up! 00219 if (task) 00220 { 00221 task.execute (); 00222 00223 if (logger.isEnabled (logger.Level.Trace)) 00224 logger.trace ("completed task from channel '" ~ channel ~ "'"); 00225 } 00226 else 00227 logger.error ("received an invalid task on channel '" ~ channel ~ "'"); 00228 } 00229 } 00230 } 00231