added a queueable agent that will only execute one job at a time

git-svn-id: svn://svn.cy55.de/Zope3/src/cybertools/trunk@2483 fd906abe-77d9-0310-91a1-e0d9ade77398
This commit is contained in:
helmutm 2008-03-29 17:58:04 +00:00
parent 998cbf59cd
commit 3f69f638d3
10 changed files with 181 additions and 36 deletions

View file

@ -106,7 +106,7 @@ the path to the configuration file.
>>> configFile.close() >>> configFile.close()
>>> master.config >>> master.config
controller.name = 'sample' controller.name = 'base.sample'
logger.name = 'default' logger.name = 'default'
logger.standard = 30 logger.standard = 30
scheduler.name = 'sample' scheduler.name = 'sample'
@ -155,12 +155,10 @@ is just one scheduler associated with the master agent.
>>> master.scheduler >>> master.scheduler
<cybertools.agent.base.schedule.Scheduler object ...> <cybertools.agent.base.schedule.Scheduler object ...>
We schedule a sample job by taking the role of the controller and simply We schedule a sample job by calling an internal method of the agent's
call the master agent's callback method for entering jobs. controller.
>>> from cybertools.agent.base.control import JobSpecification >>> master.controllers[0].enterJob('sample', 'sample01')
>>> jobSpec = JobSpecification('sample', '00001', agent='sample01')
>>> master.setupJobs([jobSpec])
Job 00001 on agent sample01 has been executed. Job 00001 on agent sample01 has been executed.
Logging Logging
@ -173,15 +171,14 @@ Logging
>>> master.config.logger.standard = 20 >>> master.config.logger.standard = 20
>>> master.logger.setup() >>> master.logger.setup()
>>> jobSpec = JobSpecification('sample', '00002', agent='sample01') >>> master.controllers[0].enterJob('sample', 'sample01')
>>> master.setupJobs([jobSpec])
Job 00002 on agent sample01 has been executed. Job 00002 on agent sample01 has been executed.
2... agent:sample01 job:00002 message:job execution 2... agent:sample01 job:00002 message:job execution result:OK
>>> for r in master.logger.records: >>> for r in master.logger.records:
... print r ... print r
2... agent:sample01 job:00001 message:job execution 2... agent:sample01 job:00001 message:job execution result:OK
2... agent:sample01 job:00002 message:job execution 2... agent:sample01 job:00002 message:job execution result:OK
Using the Twisted-based Scheduler Using the Twisted-based Scheduler
@ -190,27 +187,29 @@ Using the Twisted-based Scheduler
By specifying the core scheduler in the agent's configuration this will be By specifying the core scheduler in the agent's configuration this will be
used automatically for scheduling. used automatically for scheduling.
In addition, we use another sample controller, now also the twisted-based
from the core package. This will in turn set up a queueable agent from
the core package so that now everything is running under the control of
the twisted reactor.
>>> config = ''' >>> config = '''
... controller(name='sample') ... controller(name='core.sample')
... scheduler(name='core') ... scheduler(name='core')
... logger(name='default', standard=30) ... logger(name='default', standard=30)
... ''' ... '''
>>> master = Master(config) >>> master = Master(config)
>>> master.setup()
>>> master.scheduler >>> master.scheduler
<cybertools.agent.core.schedule.Scheduler object ...> <cybertools.agent.core.schedule.Scheduler object ...>
We trigger the controller's setup as above and enter the same We enter the same job specification as above.
job specification.
>>> master.setup() >>> master.controllers[0].enterJob('sample', 'sample01')
>>> jobSpec = JobSpecification('sample', '00001', agent='sample01')
>>> master.setupJobs([jobSpec])
Now the job is not executed immediately - we have to hand over control to Now the job is not executed immediately - we have to hand over control to
the twisted reactor first. The running of the reactor is simulated by the twisted reactor first. The running of the reactor is simulated by
a method provided for testing. the ``iterate()`` method provided for testing.
>>> from cybertools.agent.tests import tester >>> from cybertools.agent.tests import tester
>>> tester.iterate() >>> tester.iterate()

View file

@ -5,4 +5,4 @@ $Id$
# register default adapters # register default adapters
from cybertools.agent.base import agent, control, job, log, schedule from cybertools.agent.base import agent, control, job, log, schedule
from cybertools.agent.core import schedule from cybertools.agent.core import agent, control, schedule

View file

@ -47,6 +47,10 @@ class Agent(object):
def execute(self, job, params=None): def execute(self, job, params=None):
pass pass
def log(self, job, result='OK'):
self.logger.log(dict(message='job execution', job=job.identifier,
agent=self.name, result=result))
class Master(Agent): class Master(Agent):
@ -84,12 +88,11 @@ class Master(Agent):
class SampleAgent(Agent): class SampleAgent(Agent):
def execute(self, job, params=None): def send(self, job):
self.execute(job)
def execute(self, job):
print 'Job %s on agent %s has been executed.' % (job.identifier, self.name) print 'Job %s on agent %s has been executed.' % (job.identifier, self.name)
self.log(job) self.log(job)
def log(self, job): agents.register(SampleAgent, Master, name='base.sample')
self.logger.log(dict(message='job execution', job=job.identifier,
agent=self.name))
agents.register(SampleAgent, Master, name='sample')

View file

@ -49,10 +49,17 @@ class Controller(object):
class SampleController(Controller): class SampleController(Controller):
def _getAgents(self): jobNumber = 0
return [AgentSpecification('sample01', 'sample')]
controllers.register(SampleController, Master, name='sample') def _getAgents(self):
return [AgentSpecification('sample01', 'base.sample')]
def enterJob(self, jobType, agent):
self.jobNumber += 1
spec = JobSpecification(jobType, '%05i' % self.jobNumber, agent=agent)
self.agent.setupJobs([spec])
controllers.register(SampleController, Master, name='base.sample')
class AgentSpecification(object): class AgentSpecification(object):

View file

@ -46,7 +46,7 @@ class Job(object):
def execute(self): def execute(self):
if self.agent is not None: if self.agent is not None:
self.agent.execute(self, self.params) self.agent.send(self)
def reschedule(self, startTime=None): def reschedule(self, startTime=None):
self.scheduler.schedule(self.copy(), startTime) self.scheduler.schedule(self.copy(), startTime)

View file

@ -4,6 +4,6 @@
# $Id$ # $Id$
# #
controller(name='sample') controller(name='base.sample')
scheduler(name='sample') scheduler(name='sample')
logger(name='default', standard=30) logger(name='default', standard=30)

80
agent/core/agent.py Normal file
View file

@ -0,0 +1,80 @@
#
# Copyright (c) 2008 Helmut Merz helmutm@cy55.de
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""
Queueable agent base/sample classes.
$Id$
"""
from twisted.internet.defer import succeed
from zope.interface import implements
from cybertools.agent.base.agent import Agent, Master
from cybertools.agent.components import agents
from cybertools.agent.interfaces import IQueueableAgent
class QueueableAgent(Agent):
implements(IQueueableAgent)
currentJob = None
def __init__(self, master):
super(QueueableAgent, self).__init__(master)
self.queue = []
def send(self, job):
if self.currentJob is None:
if self.queue: # this should not happen...
self.queue.insert(0, job)
job = self.queue.pop()
self.execute(job)
else:
self.queue.insert(0, job)
def execute(self, job):
self.currentJob = job
d = self.process()
d.addCallbacks(self.completed, self.error)
def process(self):
# do something with the current job, return a deferred
print ('Job %s on agent %s has been executed.'
% (self.currentJob.identifier, self.name))
return succeed('Done')
def completed(self, result):
self.log(self.currentJob)
# TODO: inform the master about the result of the job execution
self.finishJob()
def error(self, result):
print '*** error', result
self.log(self.currentJob, result='Error')
# TODO: inform the master about the result of the job execution
self.finishJob()
def finishJob(self):
self.currentJob = None
if self.queue:
job = self.queue.pop()
self.execute(job, job.params)
agents.register(QueueableAgent, Master, name='core.sample')

37
agent/core/control.py Normal file
View file

@ -0,0 +1,37 @@
#
# Copyright (c) 2008 Helmut Merz helmutm@cy55.de
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""
Base/sample controller implementation.
$Id$
"""
from zope.interface import implements
from cybertools.agent.base.agent import Master
from cybertools.agent.base.control import SampleController, AgentSpecification
from cybertools.agent.components import controllers
class SampleController(SampleController):
def _getAgents(self):
return [AgentSpecification('sample01', 'core.sample')]
controllers.register(SampleController, Master, name='core.sample')

View file

@ -17,7 +17,7 @@
# #
""" """
Basic (sample) job scheduler. Basic job scheduler using twisted.
$Id$ $Id$
""" """

View file

@ -36,11 +36,28 @@ class IAgent(Interface):
config = Attribute('Configuration settings.') config = Attribute('Configuration settings.')
logger = Attribute('Logger instance to be used for recording ' logger = Attribute('Logger instance to be used for recording '
'job execution and execution results.') 'job execution and execution results.')
children = Attribute('A collection of agents that are managed by this '
'master.')
def execute(job, params=None): def send(job):
""" Execute a job. """ If the agent supports queueing and the agent is busy put
job in queue, otherwise just execute the job.
"""
def execute(job):
""" Execute a job using the parameters given.
"""
class IQueueableAgent(Interface):
""" An agent that keeps a queue of jobs. A queueable agent
executes not more than one job at a time; when a running
job is finished the next one will be taken from the queue.
"""
queue = Attribute('A sequence of jobs to execute.')
def process():
""" Do the real work asynchronously, returning a deferred.
This method will be called by execute().
""" """
@ -51,6 +68,8 @@ class IMaster(IAgent):
config = Attribute('Central configuration settings.') config = Attribute('Central configuration settings.')
controllers = Attribute('Collection of IController instances.') controllers = Attribute('Collection of IController instances.')
scheduler = Attribute('IScheduler instance.') scheduler = Attribute('IScheduler instance.')
children = Attribute('A collection of agents that are managed by this '
'master.')
def setup(): def setup():
""" Set up the master agent by triggering all assigned controllers. """ Set up the master agent by triggering all assigned controllers.