keep state information with jobs; provide feedback to master and controller via 'inform()' methods

git-svn-id: svn://svn.cy55.de/Zope3/src/cybertools/trunk@2492 fd906abe-77d9-0310-91a1-e0d9ade77398
This commit is contained in:
helmutm 2008-04-03 10:59:51 +00:00
parent be4aada1f8
commit 7cf1a59bae
10 changed files with 99 additions and 46 deletions

View file

@ -24,9 +24,10 @@ $Id$
from zope.interface import implements from zope.interface import implements
from cybertools.agent.interfaces import IAgent from cybertools.agent.common import states
from cybertools.agent.components import agents, controllers, jobs from cybertools.agent.components import agents, controllers, jobs
from cybertools.agent.components import loggers, schedulers from cybertools.agent.components import loggers, schedulers
from cybertools.agent.interfaces import IAgent
from cybertools.util.config import Configurator from cybertools.util.config import Configurator
@ -75,27 +76,31 @@ class Master(Agent):
for cont in self.controllers: for cont in self.controllers:
cont.setupAgent() cont.setupAgent()
def setupAgents(self, agentSpecs): def setupAgents(self, controller, agentSpecs):
for spec in agentSpecs: for spec in agentSpecs:
agent = agents(self, spec.type) agent = agents(self, spec.type)
if agent is None:
print spec.type
return
agent.name = spec.name agent.name = spec.name
self.children[spec.name] = agent self.children[spec.name] = agent
def setupJobs(self, jobSpecs): def setupJobs(self, controller, jobSpecs):
for spec in jobSpecs: for spec in jobSpecs:
job = jobs(self.scheduler, spec.type) job = jobs(self.scheduler, spec.type)
job.agent = self.children[spec.agent] job.agent = self.children[spec.agent]
job.identifier = spec.identifier job.identifier = spec.identifier
job.controller = controller
self.scheduler.schedule(job, spec.startTime) self.scheduler.schedule(job, spec.startTime)
def inform(self, job, result=None, message=''):
job.controller.inform(job.identifier, job.state, result, message)
class SampleAgent(Agent): class SampleAgent(Agent):
def execute(self, job): def execute(self, job):
job.state = states.running
print 'Job %s on agent %s has been executed.' % (job.identifier, self.name) print 'Job %s on agent %s has been executed.' % (job.identifier, self.name)
self.log(job) self.log(job)
job.state = states.completed
self.master.inform(job)
agents.register(SampleAgent, Master, name='base.sample') agents.register(SampleAgent, Master, name='base.sample')

View file

@ -37,8 +37,8 @@ class Controller(object):
self.agent = agent self.agent = agent
def setupAgent(self): def setupAgent(self):
self.agent.setupAgents(self._getAgents()) self.agent.setupAgents(self, self._getAgents())
self.agent.setupJobs(self._getCurrentJobs()) self.agent.setupJobs(self, self._getCurrentJobs())
def _getAgents(self): def _getAgents(self):
return [] return []
@ -46,6 +46,9 @@ class Controller(object):
def _getCurrentJobs(self): def _getCurrentJobs(self):
return [] return []
def inform(self, identifier, state, result=None, message=''):
pass
class SampleController(Controller): class SampleController(Controller):
@ -58,12 +61,12 @@ class SampleController(Controller):
def createAgent(self, agentType, name): def createAgent(self, agentType, name):
spec = AgentSpecification(name, agentType) spec = AgentSpecification(name, agentType)
self.agent.setupAgents([spec]) self.agent.setupAgents(self, [spec])
def enterJob(self, jobType, agent): def enterJob(self, jobType, agent):
self.jobNumber += 1 self.jobNumber += 1
spec = JobSpecification(jobType, '%05i' % self.jobNumber, agent=agent) spec = JobSpecification(jobType, '%05i' % self.jobNumber, agent=agent)
self.agent.setupJobs([spec]) self.agent.setupJobs(self, [spec])
controllers.register(SampleController, Master, name='base.sample') controllers.register(SampleController, Master, name='base.sample')

View file

@ -25,6 +25,7 @@ $Id$
from zope.interface import implements from zope.interface import implements
from cybertools.agent.base.schedule import Scheduler from cybertools.agent.base.schedule import Scheduler
from cybertools.agent.common import states
from cybertools.agent.components import jobs from cybertools.agent.components import jobs
from cybertools.agent.interfaces import IScheduledJob from cybertools.agent.interfaces import IScheduledJob
@ -37,7 +38,7 @@ class Job(object):
agent = None agent = None
startTime = None startTime = None
repeat = 0 repeat = 0
whenStarted = whenFinished = None state = states.initialized
def __init__(self, scheduler): def __init__(self, scheduler):
self.scheduler = scheduler self.scheduler = scheduler
@ -47,15 +48,18 @@ class Job(object):
def execute(self): def execute(self):
if self.agent is not None: if self.agent is not None:
self.agent.send(self) self.agent.send(self)
self.state = states.submitted
def reschedule(self, startTime=None): def reschedule(self, startTime=None):
self.scheduler.schedule(self.copy(), startTime) self.scheduler.schedule(self.copy(), startTime)
def copy(self): def copy(self):
newJob = Job(self.scheduler) newJob = self.__class__(self.scheduler)
newJob.agent = self.agent newJob.agent = self.agent
newJob.params = self.params newJob.params = self.params
newJob.repeat = self.repeat newJob.repeat = self.repeat
newJob.successors = [s.copy() for s in self.successors] newJob.successors = [s.copy() for s in self.successors]
jobs.register(Job, Scheduler, name='sample') jobs.register(Job, Scheduler, name='sample')

View file

@ -26,6 +26,7 @@ from time import time
from zope.interface import implements from zope.interface import implements
from cybertools.agent.base.agent import Master from cybertools.agent.base.agent import Master
from cybertools.agent.common import states
from cybertools.agent.components import schedulers from cybertools.agent.components import schedulers
from cybertools.agent.interfaces import IScheduler from cybertools.agent.interfaces import IScheduler
@ -36,15 +37,10 @@ class Scheduler(object):
def __init__(self, agent): def __init__(self, agent):
self.agent = agent self.agent = agent
self.queue = []
def schedule(self, job, startTime=None): def schedule(self, job, startTime=None):
job.startTime = startTime or int(time()) job.startTime = startTime or int(time())
self.queue.append(job) job.state = states.scheduled
job.execute() # the sample scheduler does not care about startTime job.execute() # the sample scheduler does not care about startTime
def getJobsToExecute(startTime=0):
return [j for j in self.queue.values() if startTime <= j.startTime]
schedulers.register(Scheduler, Master, name='sample') schedulers.register(Scheduler, Master, name='sample')

50
agent/common.py Normal file
View file

@ -0,0 +1,50 @@
#
# Copyright (c) 2008 Helmut Merz helmutm@cy55.de
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""
Common stuff.
$Id$
"""
from cybertools.util.jeep import Jeep
class JobState(object):
def __init__(self, name, value):
self.name = name
self.value = value
def hasError(self):
return self.value < 0
def hasFinished(self):
return (self.value <= states.aborted.value or
self.value >= states.completed.value)
def __str__(self):
return self.name
def __repr__(self):
return '<JobState %s>' % self.name
states = Jeep([JobState(n, v) for n, v in
(('initialized', 0), ('scheduled', 1), ('submitted', 2),
('running', 3), ('completed', 4), ('aborted', -1))])

View file

@ -26,6 +26,7 @@ from twisted.internet.defer import succeed
from zope.interface import implements from zope.interface import implements
from cybertools.agent.base.agent import Agent, Master from cybertools.agent.base.agent import Agent, Master
from cybertools.agent.common import states
from cybertools.agent.components import agents from cybertools.agent.components import agents
from cybertools.agent.interfaces import IQueueableAgent from cybertools.agent.interfaces import IQueueableAgent
@ -50,6 +51,7 @@ class QueueableAgent(Agent):
self.queue.insert(0, job) self.queue.insert(0, job)
def execute(self, job): def execute(self, job):
job.state = states.running
self.currentJob = job self.currentJob = job
d = self.process() d = self.process()
d.addCallbacks(self.completed, self.error) d.addCallbacks(self.completed, self.error)
@ -61,14 +63,17 @@ class QueueableAgent(Agent):
return succeed('Done') return succeed('Done')
def completed(self, result): def completed(self, result):
self.log(self.currentJob) job = self.currentJob
# TODO: inform the master about the result of the job execution job.state = states.completed
self.log(job)
self.master.inform(job, result)
self.finishJob() self.finishJob()
def error(self, result): def error(self, result):
print '*** error', result print '*** error', result
job.state = states.aborted
self.log(self.currentJob, result='Error') self.log(self.currentJob, result='Error')
# TODO: inform the master about the result of the job execution self.master.inform(job, result)
self.finishJob() self.finishJob()
def finishJob(self): def finishJob(self):

View file

@ -27,6 +27,7 @@ from zope.interface import implements
from cybertools.agent.base.agent import Master from cybertools.agent.base.agent import Master
from cybertools.agent.base.schedule import Scheduler as BaseScheduler from cybertools.agent.base.schedule import Scheduler as BaseScheduler
from cybertools.agent.common import states
from cybertools.agent.components import schedulers from cybertools.agent.components import schedulers
from cybertools.agent.interfaces import IScheduler from cybertools.agent.interfaces import IScheduler
@ -40,20 +41,14 @@ class Scheduler(BaseScheduler):
def __init__(self, agent): def __init__(self, agent):
self.agent = agent self.agent = agent
self.queue = {}
def schedule(self, job, startTime=None): def schedule(self, job, startTime=None):
job.startTime = startTime or int(time()) job.startTime = startTime or int(time())
#self.queue.append(job)
if startTime is None: if startTime is None:
startTime = int(time()) startTime = int(time())
job.startTime = startTime job.startTime = startTime
#job.scheduler = self # obsolete, set already in job's __init__() job.state = states.scheduled
while startTime in self.queue: # TODO: use another key for the queue;
startTime += 1 # is the queue necessary anyway?
self.queue[startTime] = job
reactor.callLater(startTime-int(time()), job.execute) reactor.callLater(startTime-int(time()), job.execute)
#job.execute()
return startTime return startTime
def getJobsToExecute(startTime=0): def getJobsToExecute(startTime=0):

View file

@ -19,7 +19,7 @@
""" """
Crawl base and sample classes. Crawl base and sample classes.
$Id: base.py $Id$
""" """
from zope.interface import implements from zope.interface import implements

View file

@ -19,7 +19,7 @@
""" """
Crawl base and sample classes. Crawl base and sample classes.
$Id: base.py $Id$
""" """
from zope.interface import implements from zope.interface import implements

View file

@ -24,6 +24,8 @@ $Id$
from zope.interface import Interface, Attribute from zope.interface import Interface, Attribute
from cybertools.util.jeep import Jeep
# agents # agents
@ -77,7 +79,7 @@ class IMaster(IAgent):
methods ``setupAgents()`` and ``setupJobs()``. methods ``setupAgents()`` and ``setupJobs()``.
""" """
def setupAgents(agentSpecs): def setupAgents(controller, agentSpecs):
""" Callback for loading agent specifications from the controller """ Callback for loading agent specifications from the controller
and setting up the corresponding agents. and setting up the corresponding agents.
@ -85,7 +87,7 @@ class IMaster(IAgent):
wants to provide new agent information. wants to provide new agent information.
""" """
def setupJobs(jobSpecs): def setupJobs(controller, jobSpecs):
""" Callback for loading the specifications of active jobs from """ Callback for loading the specifications of active jobs from
the controller and scheduling the corresponding jobs. the controller and scheduling the corresponding jobs.
@ -93,6 +95,11 @@ class IMaster(IAgent):
wants to provide new job information. wants to provide new job information.
""" """
def inform(job, result=None, message=''):
""" Callback for informing the master about the state of a job.
The result is an IResource object (if not None).
"""
class ICrawler(IAgent): class ICrawler(IAgent):
""" Collects resources. """ Collects resources.
@ -157,15 +164,6 @@ class IScheduler(Interface):
supplied. supplied.
""" """
def getJobsToExecute(startTime=None, agents=None):
""" Return a collection of jobs that are scheduled for execution at
or before the date/time given.
If ``startTime`` is None the current date/time is used.
If ``agents`` is not None return only jobs for the agents
given.
"""
# jobs # jobs
@ -178,18 +176,15 @@ class IScheduledJob(Interface):
'controller.') 'controller.')
scheduler = Attribute('Scheduler that controls this job.') scheduler = Attribute('Scheduler that controls this job.')
agent = Attribute('Agent responsible for executing the job.') agent = Attribute('Agent responsible for executing the job.')
controller = Attribute('Controller that issued the job.')
startTime = Attribute('Date/time at which the job should be executed.') startTime = Attribute('Date/time at which the job should be executed.')
params = Attribute('Mapping with key/value pairs to be used by ' params = Attribute('Mapping with key/value pairs to be used by '
'the ``execute()`` method.') 'the ``execute()`` method.')
state = Attribute('An object representing the current state of the job.')
repeat = Attribute('Number of seconds after which the job should be ' repeat = Attribute('Number of seconds after which the job should be '
'rescheduled. Do not repeat if 0.') 'rescheduled. Do not repeat if 0.')
successors = Attribute('Jobs to execute immediately after this ' successors = Attribute('Jobs to execute immediately after this '
'one has been finished.') 'one has been finished.')
whenStarted = Attribute('A callable with one argument (the job) that will '
'be called when the job has started.')
whenfinished = Attribute('A callable with two arguments, the job and the '
'result of running the job, that will be called when '
'the job has finished.')
def execute(): def execute():
""" Execute the job, typically by calling the ``execute()`` method """ Execute the job, typically by calling the ``execute()`` method