Work in progress: set up basic agent functionality (with unit tests...)
git-svn-id: svn://svn.cy55.de/Zope3/src/loops/trunk@1783 fd906abe-77d9-0310-91a1-e0d9ade77398
This commit is contained in:
parent
a7bd96600b
commit
d66bd7591c
6 changed files with 106 additions and 12 deletions
|
@ -20,6 +20,9 @@ This means that all calls to services (like crawler, transporter, ...)
|
|||
return a deferred that must be supplied with a callback method (and in
|
||||
most cases also an errback method).
|
||||
|
||||
>>> from loops.agent.core import Agent
|
||||
>>> agent = Agent()
|
||||
|
||||
|
||||
Browser-based User Interface
|
||||
============================
|
||||
|
@ -46,6 +49,24 @@ Configuration (per job)
|
|||
- schedule, repeating pattern, conditions
|
||||
- following job(s), e.g. to start a transfer immediately after a crawl
|
||||
|
||||
>>> scheduler = agent.scheduler
|
||||
|
||||
>>> from time import time
|
||||
>>> from loops.agent.schedule import Job
|
||||
|
||||
>>> class TestJob(Job):
|
||||
... def execute(self, **kw):
|
||||
... d = super(TestJob, self).execute(**kw)
|
||||
... print 'executing'
|
||||
... return d
|
||||
|
||||
>>> scheduler.schedule(TestJob(), int(time())+1)
|
||||
|
||||
>>> from twisted.internet import reactor
|
||||
>>> ignore = reactor.callLater(2, reactor.stop)
|
||||
>>> reactor.run()
|
||||
executing
|
||||
|
||||
|
||||
Crawling
|
||||
========
|
||||
|
|
|
@ -25,6 +25,7 @@ $Id$
|
|||
from zope.interface import implements
|
||||
from loops.agent.interfaces import IAgent
|
||||
from loops.agent.config import Configurator
|
||||
from loops.agent.schedule import Scheduler
|
||||
|
||||
|
||||
class Agent(object):
|
||||
|
@ -34,8 +35,5 @@ class Agent(object):
|
|||
def __init__(self):
|
||||
configurator = self.configurator = Configurator()
|
||||
configurator.loadConfiguration()
|
||||
|
||||
|
||||
def startAgent():
|
||||
return Agent()
|
||||
self.scheduler = Scheduler()
|
||||
|
||||
|
|
|
@ -30,6 +30,8 @@ class IAgent(Interface):
|
|||
and transfers these to its server.
|
||||
"""
|
||||
|
||||
scheduler = Attribute('IScheduler instance')
|
||||
|
||||
|
||||
class IScheduler(Interface):
|
||||
""" Manages jobs and cares that they are started at the appropriate
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
from twisted.application import internet, service
|
||||
from nevow import appserver
|
||||
|
||||
from loops.agent.core import startAgent
|
||||
from loops.agent.core import Agent
|
||||
from loops.agent.ui.web import AgentHome
|
||||
from loops.agent.config import conf
|
||||
|
||||
|
@ -9,7 +9,7 @@ port = conf.ui.web.port or 10095
|
|||
|
||||
application = service.Application('LoopsAgent')
|
||||
|
||||
site = appserver.NevowSite(resource=AgentHome(startAgent()))
|
||||
site = appserver.NevowSite(resource=AgentHome(Agent()))
|
||||
webServer = internet.TCPServer(port, site)
|
||||
webServer.setServiceParent(application)
|
||||
|
||||
|
|
|
@ -22,5 +22,63 @@ Job scheduling.
|
|||
$Id$
|
||||
"""
|
||||
|
||||
from time import time
|
||||
from twisted.internet import reactor
|
||||
from twisted.internet.defer import Deferred
|
||||
from zope.interface import implements
|
||||
|
||||
from loops.agent.interfaces import IScheduler, IScheduledJob
|
||||
|
||||
|
||||
class Scheduler(object):
|
||||
|
||||
implements(IScheduler)
|
||||
|
||||
def __init__(self):
|
||||
self.queue = {}
|
||||
self.Logger = None
|
||||
|
||||
def schedule(self, job, startTime):
|
||||
job.startTime = startTime
|
||||
job.scheduler = self
|
||||
self.queue[startTime] = job
|
||||
reactor.callLater(startTime-int(time()), job.run, **job.params)
|
||||
|
||||
def getJobsToExecute(startTime=None):
|
||||
return [j for j in self.queue.values() if (startTime or 0) <= j.startTime]
|
||||
|
||||
|
||||
class Job(object):
|
||||
|
||||
implements(IScheduledJob)
|
||||
|
||||
def __init__(self):
|
||||
self.startTime = 0
|
||||
self.scheduler = None
|
||||
self.params = {}
|
||||
self.successors = []
|
||||
|
||||
def execute(self, **kw):
|
||||
d = Deferred()
|
||||
return d
|
||||
|
||||
def reschedule(self, startTime):
|
||||
self.scheduler.schedule(self.copy(), startTime)
|
||||
|
||||
def run(self, **kw):
|
||||
d = self.execute(**kw)
|
||||
d.addCallback(self.finishRun)
|
||||
# TODO: logging
|
||||
|
||||
def finishRun(self, result):
|
||||
for job in self.successors:
|
||||
job.run(job, **job.params)
|
||||
# TODO: remove from queue
|
||||
# TODO: logging
|
||||
# TODO: reschedule if told by configuration
|
||||
|
||||
def copy(self):
|
||||
newJob = Job()
|
||||
newJob.params = self.params
|
||||
newJob.successors = [s.copy() for s in self.successors]
|
||||
|
||||
|
|
|
@ -1,22 +1,37 @@
|
|||
# $Id$
|
||||
|
||||
import unittest, doctest
|
||||
from zope.testing.doctestunit import DocFileSuite
|
||||
from zope.interface.verify import verifyClass
|
||||
from loops.expert import query
|
||||
import time
|
||||
from twisted.internet import reactor
|
||||
|
||||
from loops.agent.core import Agent
|
||||
from loops.agent.schedule import Job
|
||||
|
||||
|
||||
class TestJob(Job):
|
||||
|
||||
def execute(self, **kw):
|
||||
d = super(TestJob, self).execute(**kw)
|
||||
print 'executing'
|
||||
return d
|
||||
|
||||
|
||||
class Test(unittest.TestCase):
|
||||
"Basic tests for the loops.agent package."
|
||||
|
||||
def testSomething(self):
|
||||
pass
|
||||
def setUp(self):
|
||||
self.agent = Agent()
|
||||
|
||||
def testScheduling(self):
|
||||
d = self.agent.scheduler.schedule(TestJob(), int(time.time())+1)
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
def test_suite():
|
||||
flags = doctest.NORMALIZE_WHITESPACE | doctest.ELLIPSIS
|
||||
return unittest.TestSuite((
|
||||
unittest.makeSuite(Test),
|
||||
DocFileSuite('README.txt', optionflags=flags),
|
||||
doctest.DocFileSuite('README.txt', optionflags=flags),
|
||||
))
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
Loading…
Add table
Reference in a new issue