diff --git a/agent/README.txt b/agent/README.txt index 6a9b27a..679272e 100644 --- a/agent/README.txt +++ b/agent/README.txt @@ -20,6 +20,9 @@ This means that all calls to services (like crawler, transporter, ...) return a deferred that must be supplied with a callback method (and in most cases also an errback method). + >>> from loops.agent.core import Agent + >>> agent = Agent() + Browser-based User Interface ============================ @@ -46,6 +49,24 @@ Configuration (per job) - schedule, repeating pattern, conditions - following job(s), e.g. to start a transfer immediately after a crawl + >>> scheduler = agent.scheduler + + >>> from time import time + >>> from loops.agent.schedule import Job + + >>> class TestJob(Job): + ... def execute(self, **kw): + ... d = super(TestJob, self).execute(**kw) + ... print 'executing' + ... return d + + >>> scheduler.schedule(TestJob(), int(time())+1) + + >>> from twisted.internet import reactor + >>> ignore = reactor.callLater(2, reactor.stop) + >>> reactor.run() + executing + Crawling ======== diff --git a/agent/core.py b/agent/core.py index 2e6b863..9df60ea 100644 --- a/agent/core.py +++ b/agent/core.py @@ -25,6 +25,7 @@ $Id$ from zope.interface import implements from loops.agent.interfaces import IAgent from loops.agent.config import Configurator +from loops.agent.schedule import Scheduler class Agent(object): @@ -34,8 +35,5 @@ class Agent(object): def __init__(self): configurator = self.configurator = Configurator() configurator.loadConfiguration() - - -def startAgent(): - return Agent() + self.scheduler = Scheduler() diff --git a/agent/interfaces.py b/agent/interfaces.py index d0fdba2..13b2e16 100644 --- a/agent/interfaces.py +++ b/agent/interfaces.py @@ -30,6 +30,8 @@ class IAgent(Interface): and transfers these to its server. """ + scheduler = Attribute('IScheduler instance') + class IScheduler(Interface): """ Manages jobs and cares that they are started at the appropriate diff --git a/agent/loops.tac b/agent/loops.tac index 084f8ae..30296ba 100644 --- a/agent/loops.tac +++ b/agent/loops.tac @@ -1,7 +1,7 @@ from twisted.application import internet, service from nevow import appserver -from loops.agent.core import startAgent +from loops.agent.core import Agent from loops.agent.ui.web import AgentHome from loops.agent.config import conf @@ -9,7 +9,7 @@ port = conf.ui.web.port or 10095 application = service.Application('LoopsAgent') -site = appserver.NevowSite(resource=AgentHome(startAgent())) +site = appserver.NevowSite(resource=AgentHome(Agent())) webServer = internet.TCPServer(port, site) webServer.setServiceParent(application) diff --git a/agent/schedule.py b/agent/schedule.py index 2d7d6f4..1ff53c7 100644 --- a/agent/schedule.py +++ b/agent/schedule.py @@ -22,5 +22,63 @@ Job scheduling. $Id$ """ +from time import time +from twisted.internet import reactor +from twisted.internet.defer import Deferred +from zope.interface import implements + from loops.agent.interfaces import IScheduler, IScheduledJob + +class Scheduler(object): + + implements(IScheduler) + + def __init__(self): + self.queue = {} + self.Logger = None + + def schedule(self, job, startTime): + job.startTime = startTime + job.scheduler = self + self.queue[startTime] = job + reactor.callLater(startTime-int(time()), job.run, **job.params) + + def getJobsToExecute(startTime=None): + return [j for j in self.queue.values() if (startTime or 0) <= j.startTime] + + +class Job(object): + + implements(IScheduledJob) + + def __init__(self): + self.startTime = 0 + self.scheduler = None + self.params = {} + self.successors = [] + + def execute(self, **kw): + d = Deferred() + return d + + def reschedule(self, startTime): + self.scheduler.schedule(self.copy(), startTime) + + def run(self, **kw): + d = self.execute(**kw) + d.addCallback(self.finishRun) + # TODO: logging + + def finishRun(self, result): + for job in self.successors: + job.run(job, **job.params) + # TODO: remove from queue + # TODO: logging + # TODO: reschedule if told by configuration + + def copy(self): + newJob = Job() + newJob.params = self.params + newJob.successors = [s.copy() for s in self.successors] + diff --git a/agent/tests.py b/agent/tests.py index 980c1f8..a95bc42 100755 --- a/agent/tests.py +++ b/agent/tests.py @@ -1,22 +1,37 @@ # $Id$ import unittest, doctest -from zope.testing.doctestunit import DocFileSuite -from zope.interface.verify import verifyClass -from loops.expert import query +import time +from twisted.internet import reactor + +from loops.agent.core import Agent +from loops.agent.schedule import Job + + +class TestJob(Job): + + def execute(self, **kw): + d = super(TestJob, self).execute(**kw) + print 'executing' + return d + class Test(unittest.TestCase): "Basic tests for the loops.agent package." - def testSomething(self): - pass + def setUp(self): + self.agent = Agent() + + def testScheduling(self): + d = self.agent.scheduler.schedule(TestJob(), int(time.time())+1) + time.sleep(1) def test_suite(): flags = doctest.NORMALIZE_WHITESPACE | doctest.ELLIPSIS return unittest.TestSuite(( unittest.makeSuite(Test), - DocFileSuite('README.txt', optionflags=flags), + doctest.DocFileSuite('README.txt', optionflags=flags), )) if __name__ == '__main__':