schedule: put successor jobs in queue instead of just 'run'ning them
git-svn-id: svn://svn.cy55.de/Zope3/src/loops/trunk@1902 fd906abe-77d9-0310-91a1-e0d9ade77398
This commit is contained in:
parent
f4449162a1
commit
de87f1ff91
3 changed files with 15 additions and 7 deletions
|
@ -32,7 +32,7 @@ transferred.
|
||||||
|
|
||||||
We are now ready to schedule the job and let the reactor execute it.
|
We are now ready to schedule the job and let the reactor execute it.
|
||||||
|
|
||||||
>>> scheduler.schedule(crawlJob, int(time()))
|
>>> scheduler.schedule(crawlJob)
|
||||||
|
|
||||||
>>> tester.iterate()
|
>>> tester.iterate()
|
||||||
Metadata: {'path': '...data...file1.txt'}
|
Metadata: {'path': '...data...file1.txt'}
|
||||||
|
|
|
@ -39,9 +39,12 @@ class Scheduler(object):
|
||||||
self.queue = {}
|
self.queue = {}
|
||||||
self.logger = None
|
self.logger = None
|
||||||
|
|
||||||
def schedule(self, job, startTime):
|
def schedule(self, job, startTime=None):
|
||||||
|
if startTime is None:
|
||||||
|
startTime = int(time())
|
||||||
job.startTime = startTime
|
job.startTime = startTime
|
||||||
job.scheduler = self
|
job.scheduler = self
|
||||||
|
# TODO: find a better key to identify jobs
|
||||||
self.queue[startTime] = job
|
self.queue[startTime] = job
|
||||||
reactor.callLater(startTime-int(time()), job.run)
|
reactor.callLater(startTime-int(time()), job.run)
|
||||||
|
|
||||||
|
@ -53,6 +56,8 @@ class Job(object):
|
||||||
|
|
||||||
implements(IScheduledJob)
|
implements(IScheduledJob)
|
||||||
|
|
||||||
|
scheduler = None
|
||||||
|
|
||||||
def __init__(self, **params):
|
def __init__(self, **params):
|
||||||
self.startTime = 0
|
self.startTime = 0
|
||||||
self.params = params
|
self.params = params
|
||||||
|
@ -60,6 +65,8 @@ class Job(object):
|
||||||
self.repeat = 0
|
self.repeat = 0
|
||||||
|
|
||||||
def execute(self):
|
def execute(self):
|
||||||
|
""" Must be overridden by subclass.
|
||||||
|
"""
|
||||||
d = Deferred()
|
d = Deferred()
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
@ -72,12 +79,13 @@ class Job(object):
|
||||||
# TODO: logging
|
# TODO: logging
|
||||||
|
|
||||||
def finishRun(self, result):
|
def finishRun(self, result):
|
||||||
|
# remove from queue
|
||||||
|
del self.scheduler.queue[self.startTime]
|
||||||
# run successors
|
# run successors
|
||||||
for job in self.successors:
|
for job in self.successors:
|
||||||
job.params['result'] = result
|
job.params['result'] = result
|
||||||
job.run()
|
#job.run()
|
||||||
# remove from queue
|
self.scheduler.schedule(job)
|
||||||
del self.scheduler.queue[self.startTime]
|
|
||||||
# TODO: logging
|
# TODO: logging
|
||||||
# reschedule if necessary
|
# reschedule if necessary
|
||||||
if self.repeat:
|
if self.repeat:
|
||||||
|
|
|
@ -23,7 +23,7 @@ $Id$
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from twisted.internet import reactor
|
from twisted.internet import reactor
|
||||||
from twisted.internet.defer import Deferred
|
from twisted.internet.defer import succeed
|
||||||
from zope.interface import implements
|
from zope.interface import implements
|
||||||
|
|
||||||
from loops.agent.interfaces import ITransportJob, ITransporter
|
from loops.agent.interfaces import ITransportJob, ITransporter
|
||||||
|
@ -43,5 +43,5 @@ class Transporter(BaseTransporter):
|
||||||
if metadata is not None:
|
if metadata is not None:
|
||||||
print 'Metadata:', metadata
|
print 'Metadata:', metadata
|
||||||
print 'Transferring:', text
|
print 'Transferring:', text
|
||||||
return Deferred()
|
return succeed('OK')
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue