diff --git a/agent/base/agent.py b/agent/base/agent.py index 3687518..961ebaa 100644 --- a/agent/base/agent.py +++ b/agent/base/agent.py @@ -24,9 +24,10 @@ $Id$ from zope.interface import implements -from cybertools.agent.interfaces import IAgent +from cybertools.agent.common import states from cybertools.agent.components import agents, controllers, jobs from cybertools.agent.components import loggers, schedulers +from cybertools.agent.interfaces import IAgent from cybertools.util.config import Configurator @@ -75,27 +76,31 @@ class Master(Agent): for cont in self.controllers: cont.setupAgent() - def setupAgents(self, agentSpecs): + def setupAgents(self, controller, agentSpecs): for spec in agentSpecs: agent = agents(self, spec.type) - if agent is None: - print spec.type - return agent.name = spec.name self.children[spec.name] = agent - def setupJobs(self, jobSpecs): + def setupJobs(self, controller, jobSpecs): for spec in jobSpecs: job = jobs(self.scheduler, spec.type) job.agent = self.children[spec.agent] job.identifier = spec.identifier + job.controller = controller self.scheduler.schedule(job, spec.startTime) + def inform(self, job, result=None, message=''): + job.controller.inform(job.identifier, job.state, result, message) + class SampleAgent(Agent): def execute(self, job): + job.state = states.running print 'Job %s on agent %s has been executed.' % (job.identifier, self.name) self.log(job) + job.state = states.completed + self.master.inform(job) agents.register(SampleAgent, Master, name='base.sample') diff --git a/agent/base/control.py b/agent/base/control.py index cb67e2a..6c6ee0e 100644 --- a/agent/base/control.py +++ b/agent/base/control.py @@ -37,8 +37,8 @@ class Controller(object): self.agent = agent def setupAgent(self): - self.agent.setupAgents(self._getAgents()) - self.agent.setupJobs(self._getCurrentJobs()) + self.agent.setupAgents(self, self._getAgents()) + self.agent.setupJobs(self, self._getCurrentJobs()) def _getAgents(self): return [] @@ -46,6 +46,9 @@ class Controller(object): def _getCurrentJobs(self): return [] + def inform(self, identifier, state, result=None, message=''): + pass + class SampleController(Controller): @@ -58,12 +61,12 @@ class SampleController(Controller): def createAgent(self, agentType, name): spec = AgentSpecification(name, agentType) - self.agent.setupAgents([spec]) + self.agent.setupAgents(self, [spec]) def enterJob(self, jobType, agent): self.jobNumber += 1 spec = JobSpecification(jobType, '%05i' % self.jobNumber, agent=agent) - self.agent.setupJobs([spec]) + self.agent.setupJobs(self, [spec]) controllers.register(SampleController, Master, name='base.sample') diff --git a/agent/base/job.py b/agent/base/job.py index 752acdc..b3d9981 100644 --- a/agent/base/job.py +++ b/agent/base/job.py @@ -25,6 +25,7 @@ $Id$ from zope.interface import implements from cybertools.agent.base.schedule import Scheduler +from cybertools.agent.common import states from cybertools.agent.components import jobs from cybertools.agent.interfaces import IScheduledJob @@ -37,7 +38,7 @@ class Job(object): agent = None startTime = None repeat = 0 - whenStarted = whenFinished = None + state = states.initialized def __init__(self, scheduler): self.scheduler = scheduler @@ -47,15 +48,18 @@ class Job(object): def execute(self): if self.agent is not None: self.agent.send(self) + self.state = states.submitted def reschedule(self, startTime=None): self.scheduler.schedule(self.copy(), startTime) def copy(self): - newJob = Job(self.scheduler) + newJob = self.__class__(self.scheduler) newJob.agent = self.agent newJob.params = self.params newJob.repeat = self.repeat newJob.successors = [s.copy() for s in self.successors] jobs.register(Job, Scheduler, name='sample') + + diff --git a/agent/base/schedule.py b/agent/base/schedule.py index 12411e8..ebd84b1 100644 --- a/agent/base/schedule.py +++ b/agent/base/schedule.py @@ -26,6 +26,7 @@ from time import time from zope.interface import implements from cybertools.agent.base.agent import Master +from cybertools.agent.common import states from cybertools.agent.components import schedulers from cybertools.agent.interfaces import IScheduler @@ -36,15 +37,10 @@ class Scheduler(object): def __init__(self, agent): self.agent = agent - self.queue = [] def schedule(self, job, startTime=None): job.startTime = startTime or int(time()) - self.queue.append(job) + job.state = states.scheduled job.execute() # the sample scheduler does not care about startTime - def getJobsToExecute(startTime=0): - return [j for j in self.queue.values() if startTime <= j.startTime] - - schedulers.register(Scheduler, Master, name='sample') diff --git a/agent/common.py b/agent/common.py new file mode 100644 index 0000000..174b9b5 --- /dev/null +++ b/agent/common.py @@ -0,0 +1,50 @@ +# +# Copyright (c) 2008 Helmut Merz helmutm@cy55.de +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +""" +Common stuff. + +$Id$ +""" + +from cybertools.util.jeep import Jeep + + +class JobState(object): + + def __init__(self, name, value): + self.name = name + self.value = value + + def hasError(self): + return self.value < 0 + + def hasFinished(self): + return (self.value <= states.aborted.value or + self.value >= states.completed.value) + + def __str__(self): + return self.name + + def __repr__(self): + return '' % self.name + + +states = Jeep([JobState(n, v) for n, v in + (('initialized', 0), ('scheduled', 1), ('submitted', 2), + ('running', 3), ('completed', 4), ('aborted', -1))]) diff --git a/agent/core/agent.py b/agent/core/agent.py index de216ef..6e2bb31 100644 --- a/agent/core/agent.py +++ b/agent/core/agent.py @@ -26,6 +26,7 @@ from twisted.internet.defer import succeed from zope.interface import implements from cybertools.agent.base.agent import Agent, Master +from cybertools.agent.common import states from cybertools.agent.components import agents from cybertools.agent.interfaces import IQueueableAgent @@ -50,6 +51,7 @@ class QueueableAgent(Agent): self.queue.insert(0, job) def execute(self, job): + job.state = states.running self.currentJob = job d = self.process() d.addCallbacks(self.completed, self.error) @@ -61,14 +63,17 @@ class QueueableAgent(Agent): return succeed('Done') def completed(self, result): - self.log(self.currentJob) - # TODO: inform the master about the result of the job execution + job = self.currentJob + job.state = states.completed + self.log(job) + self.master.inform(job, result) self.finishJob() def error(self, result): print '*** error', result + job.state = states.aborted self.log(self.currentJob, result='Error') - # TODO: inform the master about the result of the job execution + self.master.inform(job, result) self.finishJob() def finishJob(self): diff --git a/agent/core/schedule.py b/agent/core/schedule.py index 1ccaaec..ab5697f 100644 --- a/agent/core/schedule.py +++ b/agent/core/schedule.py @@ -27,6 +27,7 @@ from zope.interface import implements from cybertools.agent.base.agent import Master from cybertools.agent.base.schedule import Scheduler as BaseScheduler +from cybertools.agent.common import states from cybertools.agent.components import schedulers from cybertools.agent.interfaces import IScheduler @@ -40,20 +41,14 @@ class Scheduler(BaseScheduler): def __init__(self, agent): self.agent = agent - self.queue = {} def schedule(self, job, startTime=None): job.startTime = startTime or int(time()) - #self.queue.append(job) if startTime is None: startTime = int(time()) job.startTime = startTime - #job.scheduler = self # obsolete, set already in job's __init__() - while startTime in self.queue: # TODO: use another key for the queue; - startTime += 1 # is the queue necessary anyway? - self.queue[startTime] = job + job.state = states.scheduled reactor.callLater(startTime-int(time()), job.execute) - #job.execute() return startTime def getJobsToExecute(startTime=0): diff --git a/agent/crawl/base.py b/agent/crawl/base.py index ac87708..f1dcf7d 100644 --- a/agent/crawl/base.py +++ b/agent/crawl/base.py @@ -19,7 +19,7 @@ """ Crawl base and sample classes. -$Id: base.py +$Id$ """ from zope.interface import implements diff --git a/agent/crawl/mail.py b/agent/crawl/mail.py index 01aa1a8..78b64c9 100644 --- a/agent/crawl/mail.py +++ b/agent/crawl/mail.py @@ -19,7 +19,7 @@ """ Crawl base and sample classes. -$Id: base.py +$Id$ """ from zope.interface import implements diff --git a/agent/interfaces.py b/agent/interfaces.py index c039909..c65f2e7 100644 --- a/agent/interfaces.py +++ b/agent/interfaces.py @@ -24,6 +24,8 @@ $Id$ from zope.interface import Interface, Attribute +from cybertools.util.jeep import Jeep + # agents @@ -77,7 +79,7 @@ class IMaster(IAgent): methods ``setupAgents()`` and ``setupJobs()``. """ - def setupAgents(agentSpecs): + def setupAgents(controller, agentSpecs): """ Callback for loading agent specifications from the controller and setting up the corresponding agents. @@ -85,7 +87,7 @@ class IMaster(IAgent): wants to provide new agent information. """ - def setupJobs(jobSpecs): + def setupJobs(controller, jobSpecs): """ Callback for loading the specifications of active jobs from the controller and scheduling the corresponding jobs. @@ -93,6 +95,11 @@ class IMaster(IAgent): wants to provide new job information. """ + def inform(job, result=None, message=''): + """ Callback for informing the master about the state of a job. + The result is an IResource object (if not None). + """ + class ICrawler(IAgent): """ Collects resources. @@ -157,15 +164,6 @@ class IScheduler(Interface): supplied. """ - def getJobsToExecute(startTime=None, agents=None): - """ Return a collection of jobs that are scheduled for execution at - or before the date/time given. - - If ``startTime`` is None the current date/time is used. - If ``agents`` is not None return only jobs for the agents - given. - """ - # jobs @@ -178,18 +176,15 @@ class IScheduledJob(Interface): 'controller.') scheduler = Attribute('Scheduler that controls this job.') agent = Attribute('Agent responsible for executing the job.') + controller = Attribute('Controller that issued the job.') startTime = Attribute('Date/time at which the job should be executed.') params = Attribute('Mapping with key/value pairs to be used by ' 'the ``execute()`` method.') + state = Attribute('An object representing the current state of the job.') repeat = Attribute('Number of seconds after which the job should be ' 'rescheduled. Do not repeat if 0.') successors = Attribute('Jobs to execute immediately after this ' 'one has been finished.') - whenStarted = Attribute('A callable with one argument (the job) that will ' - 'be called when the job has started.') - whenfinished = Attribute('A callable with two arguments, the job and the ' - 'result of running the job, that will be called when ' - 'the job has finished.') def execute(): """ Execute the job, typically by calling the ``execute()`` method