From 3f69f638d37270689a5fbe1995f8044137d483e5 Mon Sep 17 00:00:00 2001 From: helmutm Date: Sat, 29 Mar 2008 17:58:04 +0000 Subject: [PATCH] added a queueable agent that will only execute one job at a time git-svn-id: svn://svn.cy55.de/Zope3/src/cybertools/trunk@2483 fd906abe-77d9-0310-91a1-e0d9ade77398 --- agent/README.txt | 37 ++++++++++--------- agent/__init__.py | 2 +- agent/base/agent.py | 15 ++++---- agent/base/control.py | 13 +++++-- agent/base/job.py | 2 +- agent/base/sample.cfg | 2 +- agent/core/agent.py | 80 ++++++++++++++++++++++++++++++++++++++++++ agent/core/control.py | 37 +++++++++++++++++++ agent/core/schedule.py | 2 +- agent/interfaces.py | 27 +++++++++++--- 10 files changed, 181 insertions(+), 36 deletions(-) create mode 100644 agent/core/agent.py create mode 100644 agent/core/control.py diff --git a/agent/README.txt b/agent/README.txt index d0577a1..d927a34 100644 --- a/agent/README.txt +++ b/agent/README.txt @@ -106,7 +106,7 @@ the path to the configuration file. >>> configFile.close() >>> master.config - controller.name = 'sample' + controller.name = 'base.sample' logger.name = 'default' logger.standard = 30 scheduler.name = 'sample' @@ -155,12 +155,10 @@ is just one scheduler associated with the master agent. >>> master.scheduler -We schedule a sample job by taking the role of the controller and simply -call the master agent's callback method for entering jobs. +We schedule a sample job by calling an internal method of the agent's +controller. - >>> from cybertools.agent.base.control import JobSpecification - >>> jobSpec = JobSpecification('sample', '00001', agent='sample01') - >>> master.setupJobs([jobSpec]) + >>> master.controllers[0].enterJob('sample', 'sample01') Job 00001 on agent sample01 has been executed. Logging @@ -173,15 +171,14 @@ Logging >>> master.config.logger.standard = 20 >>> master.logger.setup() - >>> jobSpec = JobSpecification('sample', '00002', agent='sample01') - >>> master.setupJobs([jobSpec]) + >>> master.controllers[0].enterJob('sample', 'sample01') Job 00002 on agent sample01 has been executed. - 2... agent:sample01 job:00002 message:job execution + 2... agent:sample01 job:00002 message:job execution result:OK >>> for r in master.logger.records: ... print r - 2... agent:sample01 job:00001 message:job execution - 2... agent:sample01 job:00002 message:job execution + 2... agent:sample01 job:00001 message:job execution result:OK + 2... agent:sample01 job:00002 message:job execution result:OK Using the Twisted-based Scheduler @@ -190,27 +187,29 @@ Using the Twisted-based Scheduler By specifying the core scheduler in the agent's configuration this will be used automatically for scheduling. +In addition, we use another sample controller, now also the twisted-based +from the core package. This will in turn set up a queueable agent from +the core package so that now everything is running under the control of +the twisted reactor. + >>> config = ''' - ... controller(name='sample') + ... controller(name='core.sample') ... scheduler(name='core') ... logger(name='default', standard=30) ... ''' >>> master = Master(config) + >>> master.setup() >>> master.scheduler -We trigger the controller's setup as above and enter the same -job specification. +We enter the same job specification as above. - >>> master.setup() - - >>> jobSpec = JobSpecification('sample', '00001', agent='sample01') - >>> master.setupJobs([jobSpec]) + >>> master.controllers[0].enterJob('sample', 'sample01') Now the job is not executed immediately - we have to hand over control to the twisted reactor first. The running of the reactor is simulated by -a method provided for testing. +the ``iterate()`` method provided for testing. >>> from cybertools.agent.tests import tester >>> tester.iterate() diff --git a/agent/__init__.py b/agent/__init__.py index 13a3d13..75ef1b6 100644 --- a/agent/__init__.py +++ b/agent/__init__.py @@ -5,4 +5,4 @@ $Id$ # register default adapters from cybertools.agent.base import agent, control, job, log, schedule -from cybertools.agent.core import schedule +from cybertools.agent.core import agent, control, schedule diff --git a/agent/base/agent.py b/agent/base/agent.py index 92b2723..23328cf 100644 --- a/agent/base/agent.py +++ b/agent/base/agent.py @@ -47,6 +47,10 @@ class Agent(object): def execute(self, job, params=None): pass + def log(self, job, result='OK'): + self.logger.log(dict(message='job execution', job=job.identifier, + agent=self.name, result=result)) + class Master(Agent): @@ -84,12 +88,11 @@ class Master(Agent): class SampleAgent(Agent): - def execute(self, job, params=None): + def send(self, job): + self.execute(job) + + def execute(self, job): print 'Job %s on agent %s has been executed.' % (job.identifier, self.name) self.log(job) - def log(self, job): - self.logger.log(dict(message='job execution', job=job.identifier, - agent=self.name)) - -agents.register(SampleAgent, Master, name='sample') +agents.register(SampleAgent, Master, name='base.sample') diff --git a/agent/base/control.py b/agent/base/control.py index 726d019..7889dd0 100644 --- a/agent/base/control.py +++ b/agent/base/control.py @@ -49,10 +49,17 @@ class Controller(object): class SampleController(Controller): - def _getAgents(self): - return [AgentSpecification('sample01', 'sample')] + jobNumber = 0 -controllers.register(SampleController, Master, name='sample') + def _getAgents(self): + return [AgentSpecification('sample01', 'base.sample')] + + def enterJob(self, jobType, agent): + self.jobNumber += 1 + spec = JobSpecification(jobType, '%05i' % self.jobNumber, agent=agent) + self.agent.setupJobs([spec]) + +controllers.register(SampleController, Master, name='base.sample') class AgentSpecification(object): diff --git a/agent/base/job.py b/agent/base/job.py index 3b1ce35..752acdc 100644 --- a/agent/base/job.py +++ b/agent/base/job.py @@ -46,7 +46,7 @@ class Job(object): def execute(self): if self.agent is not None: - self.agent.execute(self, self.params) + self.agent.send(self) def reschedule(self, startTime=None): self.scheduler.schedule(self.copy(), startTime) diff --git a/agent/base/sample.cfg b/agent/base/sample.cfg index fe68ee0..e74d3ef 100644 --- a/agent/base/sample.cfg +++ b/agent/base/sample.cfg @@ -4,6 +4,6 @@ # $Id$ # -controller(name='sample') +controller(name='base.sample') scheduler(name='sample') logger(name='default', standard=30) diff --git a/agent/core/agent.py b/agent/core/agent.py new file mode 100644 index 0000000..de216ef --- /dev/null +++ b/agent/core/agent.py @@ -0,0 +1,80 @@ +# +# Copyright (c) 2008 Helmut Merz helmutm@cy55.de +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +""" +Queueable agent base/sample classes. + +$Id$ +""" + +from twisted.internet.defer import succeed +from zope.interface import implements + +from cybertools.agent.base.agent import Agent, Master +from cybertools.agent.components import agents +from cybertools.agent.interfaces import IQueueableAgent + + +class QueueableAgent(Agent): + + implements(IQueueableAgent) + + currentJob = None + + def __init__(self, master): + super(QueueableAgent, self).__init__(master) + self.queue = [] + + def send(self, job): + if self.currentJob is None: + if self.queue: # this should not happen... + self.queue.insert(0, job) + job = self.queue.pop() + self.execute(job) + else: + self.queue.insert(0, job) + + def execute(self, job): + self.currentJob = job + d = self.process() + d.addCallbacks(self.completed, self.error) + + def process(self): + # do something with the current job, return a deferred + print ('Job %s on agent %s has been executed.' + % (self.currentJob.identifier, self.name)) + return succeed('Done') + + def completed(self, result): + self.log(self.currentJob) + # TODO: inform the master about the result of the job execution + self.finishJob() + + def error(self, result): + print '*** error', result + self.log(self.currentJob, result='Error') + # TODO: inform the master about the result of the job execution + self.finishJob() + + def finishJob(self): + self.currentJob = None + if self.queue: + job = self.queue.pop() + self.execute(job, job.params) + +agents.register(QueueableAgent, Master, name='core.sample') diff --git a/agent/core/control.py b/agent/core/control.py new file mode 100644 index 0000000..7078630 --- /dev/null +++ b/agent/core/control.py @@ -0,0 +1,37 @@ +# +# Copyright (c) 2008 Helmut Merz helmutm@cy55.de +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +""" +Base/sample controller implementation. + +$Id$ +""" + +from zope.interface import implements + +from cybertools.agent.base.agent import Master +from cybertools.agent.base.control import SampleController, AgentSpecification +from cybertools.agent.components import controllers + + +class SampleController(SampleController): + + def _getAgents(self): + return [AgentSpecification('sample01', 'core.sample')] + +controllers.register(SampleController, Master, name='core.sample') diff --git a/agent/core/schedule.py b/agent/core/schedule.py index 6c9b87a..1ccaaec 100644 --- a/agent/core/schedule.py +++ b/agent/core/schedule.py @@ -17,7 +17,7 @@ # """ -Basic (sample) job scheduler. +Basic job scheduler using twisted. $Id$ """ diff --git a/agent/interfaces.py b/agent/interfaces.py index debbbd9..c039909 100644 --- a/agent/interfaces.py +++ b/agent/interfaces.py @@ -36,11 +36,28 @@ class IAgent(Interface): config = Attribute('Configuration settings.') logger = Attribute('Logger instance to be used for recording ' 'job execution and execution results.') - children = Attribute('A collection of agents that are managed by this ' - 'master.') - def execute(job, params=None): - """ Execute a job. + def send(job): + """ If the agent supports queueing and the agent is busy put + job in queue, otherwise just execute the job. + """ + + def execute(job): + """ Execute a job using the parameters given. + """ + + +class IQueueableAgent(Interface): + """ An agent that keeps a queue of jobs. A queueable agent + executes not more than one job at a time; when a running + job is finished the next one will be taken from the queue. + """ + + queue = Attribute('A sequence of jobs to execute.') + + def process(): + """ Do the real work asynchronously, returning a deferred. + This method will be called by execute(). """ @@ -51,6 +68,8 @@ class IMaster(IAgent): config = Attribute('Central configuration settings.') controllers = Attribute('Collection of IController instances.') scheduler = Attribute('IScheduler instance.') + children = Attribute('A collection of agents that are managed by this ' + 'master.') def setup(): """ Set up the master agent by triggering all assigned controllers.