diff --git a/agent/README.txt b/agent/README.txt index ac1f4bd..6fd351a 100644 --- a/agent/README.txt +++ b/agent/README.txt @@ -146,9 +146,19 @@ Let's check a few attributes of the newly created agent. Job Scheduling and Execution ---------------------------- +A scheduler is responsible for triggering the execution of a job at the +appropriate time. The master agent schedules the jobs based upon the +information (job specifications) it gets from the controller. There +is just one scheduler associated with the master agent. + >>> master.scheduler + >>> from cybertools.agent.base.control import JobSpecification + >>> jobSpec = JobSpecification('sample', agent='sample01') + >>> master.setupJobs([jobSpec]) + Job <...Job object ...> on agent <...SampleAgent object ...> has been executed. + Logging ------- diff --git a/agent/base/agent.py b/agent/base/agent.py index 43ef6d3..6fb4912 100644 --- a/agent/base/agent.py +++ b/agent/base/agent.py @@ -25,8 +25,8 @@ $Id$ from zope.interface import implements from cybertools.agent.interfaces import IAgent -from cybertools.agent.components import agents -from cybertools.agent.components import controllers, loggers, schedulers +from cybertools.agent.components import agents, controllers, jobs +from cybertools.agent.components import loggers, schedulers from cybertools.util.config import Configurator @@ -72,12 +72,16 @@ class Master(Agent): self.children[spec.name] = agent def setupJobs(self, jobSpecs): - pass + for spec in jobSpecs: + job = jobs(self.scheduler, spec.type) + job.agent = self.children[spec.agent] + self.scheduler.schedule(job, spec.startTime) class SampleAgent(Agent): - pass + def execute(self, job, params=None): + print 'Job %s on agent %s has been executed.' % (job, self) agents.register(SampleAgent, Master, name='sample') diff --git a/agent/base/control.py b/agent/base/control.py index 8b6c73a..70cb9e2 100644 --- a/agent/base/control.py +++ b/agent/base/control.py @@ -66,8 +66,9 @@ class AgentSpecification(object): class JobSpecification(object): - def __init__(self, name, type, **kw): - self.name = name + startTime = None + + def __init__(self, type, **kw): self.type = type for k, v in kw.items(): setattr(self, k, v) diff --git a/agent/base/job.py b/agent/base/job.py index e8e02d7..2acb2d3 100644 --- a/agent/base/job.py +++ b/agent/base/job.py @@ -23,10 +23,38 @@ $Id$ """ from zope.interface import implements -from loops.agent.interfaces import IScheduledJob + +from cybertools.agent.base.schedule import Scheduler +from cybertools.agent.components import jobs +from cybertools.agent.interfaces import IScheduledJob class Job(object): implements(IScheduledJob) + agent = None + startTime = None + repeat = 0 + whenStarted = whenFinished = None + + def __init__(self, scheduler): + self.scheduler = scheduler + self.params = {} + self.successors = [] + + def execute(self): + if self.agent is not None: + self.agent.execute(self, self.params) + + def reschedule(self, startTime=None): + self.scheduler.schedule(self.copy(), startTime) + + def copy(self): + newJob = Job(self.scheduler) + newJob.agent = self.agent + newJob.params = self.params + newJob.repeat = self.repeat + newJob.successors = [s.copy() for s in self.successors] + +jobs.register(Job, Scheduler, name='sample') diff --git a/agent/base/schedule.py b/agent/base/schedule.py index 65e4ad0..b9acc98 100644 --- a/agent/base/schedule.py +++ b/agent/base/schedule.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2007 Helmut Merz helmutm@cy55.de +# Copyright (c) 2008 Helmut Merz helmutm@cy55.de # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -22,7 +22,7 @@ Basic (sample) job scheduler. $Id$ """ - +from time import time from zope.interface import implements from cybertools.agent.base.agent import Master @@ -36,6 +36,15 @@ class Scheduler(object): def __init__(self, agent): self.agent = agent + self.queue = [] + + def schedule(self, job, startTime=None): + job. startTime = startTime or int(time()) + self.queue.append(job) + job.execute() # the sample scheduler does not care about startTime + + def getJobsToExecute(startTime=0): + return [j for j in self.queue.values() if startTime <= j.startTime] schedulers.register(Scheduler, Master, name='sample') diff --git a/agent/interfaces.py b/agent/interfaces.py index 6567d91..5865bb2 100644 --- a/agent/interfaces.py +++ b/agent/interfaces.py @@ -127,7 +127,7 @@ class IScheduler(Interface): time by the agents responsible for it. """ - def schedule(job, agent, startTime=None): + def schedule(job, startTime=None): """ Register the job given for execution at the intended start date/time (an integer timestamp) and return the job. @@ -154,6 +154,7 @@ class IScheduledJob(Interface): a predefined date and time - this is the basic job interface. """ + scheduler = Attribute('Scheduler that controls this job.') agent = Attribute('Agent responsible for executing the job.') startTime = Attribute('Date/time at which the job should be executed.') params = Attribute('Mapping with key/value pairs to be used by ' @@ -168,7 +169,7 @@ class IScheduledJob(Interface): 'result of running the job, that will be called when ' 'the job has finished.') - def execute(params=None): + def execute(): """ Execute the job, typically by calling the ``execute()`` method of the agent responsible for it. """