Job: + whenStarted, whenFinished callbacks, now with additional job argument; let Scheduler.schedule() return the real startTime (may be used as jobId)

git-svn-id: svn://svn.cy55.de/Zope3/src/loops/trunk@1914 fd906abe-77d9-0310-91a1-e0d9ade77398
This commit is contained in:
helmutm 2007-08-14 08:24:49 +00:00
parent 5348071d34
commit 2ac3c63888
4 changed files with 25 additions and 11 deletions

View file

@ -137,7 +137,7 @@ How does this work?
>>> from time import time >>> from time import time
>>> scheduler = agent.scheduler >>> scheduler = agent.scheduler
>>> scheduler.schedule(TestJob()) >>> startTime = scheduler.schedule(TestJob())
>>> tester.iterate() >>> tester.iterate()
executing executing
@ -152,9 +152,19 @@ classes from the testing package.
>>> transporter = transport.Transporter(agent) >>> transporter = transport.Transporter(agent)
>>> transportJob = transporter.createJob() >>> transportJob = transporter.createJob()
>>> crawlJob.successors.append(transportJob) >>> crawlJob.successors.append(transportJob)
>>> scheduler.schedule(crawlJob) >>> startTime = scheduler.schedule(crawlJob)
The Job class offers two callback hooks: ``whenStarted`` and ``whenFinished``.
Use this for getting notified about the starting and finishing of a job.
>>> def finishedCB(job, result):
... print 'Crawling finished, result:', result
>>> crawlJob.whenFinished = finishedCB
Now let the reactor run...
>>> tester.iterate() >>> tester.iterate()
Crawling finished, result: [<loops.agent.testing.crawl.DummyResource ...>]
Transferring: Dummy resource data for testing purposes. Transferring: Dummy resource data for testing purposes.
Using configuration with scheduling Using configuration with scheduling
@ -216,6 +226,9 @@ Metadata sources
- path, filename - path, filename
Implementation and documentation: see loops/agent/crawl/filesystem.py
and .../filesystem.txt.
E-Mail-Clients E-Mail-Clients
-------------- --------------

View file

@ -12,7 +12,7 @@ loops.agent.crawl.filesystem - The Filesystem Crawler
>>> from loops.agent.crawl.filesystem import CrawlingJob >>> from loops.agent.crawl.filesystem import CrawlingJob
>>> agent = Agent() >>> agent = Agent()
>>> scheduler = agent.scheduler >>> startTime = scheduler = agent.scheduler
We create a crawling job that should scan the data subdirectory We create a crawling job that should scan the data subdirectory
of the testing directory in the loops.agent package. of the testing directory in the loops.agent package.
@ -32,7 +32,7 @@ transferred.
We are now ready to schedule the job and let the reactor execute it. We are now ready to schedule the job and let the reactor execute it.
>>> scheduler.schedule(crawlJob) >>> startTime = scheduler.schedule(crawlJob)
>>> tester.iterate() >>> tester.iterate()
Metadata: {'path': '...data...file1.txt'} Metadata: {'path': '...data...file1.txt'}

View file

@ -68,9 +68,9 @@ class IScheduledJob(Interface):
'rescheduled. Do not repeat if 0.') 'rescheduled. Do not repeat if 0.')
successors = Attribute('Jobs to execute immediately after this ' successors = Attribute('Jobs to execute immediately after this '
'one has been finished.') 'one has been finished.')
whenStarted = Attribute('A callable with no arguments that will ' whenStarted = Attribute('A callable with one argument (the job) that will '
'be called when the job has started.') 'be called when the job has started.')
whenfinished = Attribute('A callable with one argument, the ' whenfinished = Attribute('A callable with two arguments, the job and the '
'result of running the job, that will be called when ' 'result of running the job, that will be called when '
'the job has finished.') 'the job has finished.')

View file

@ -48,6 +48,7 @@ class Scheduler(object):
startTime += 1 startTime += 1
self.queue[startTime] = job self.queue[startTime] = job
reactor.callLater(startTime-int(time()), job.run) reactor.callLater(startTime-int(time()), job.run)
return startTime
def getJobsToExecute(startTime=None): def getJobsToExecute(startTime=None):
return [j for j in self.queue.values() if (startTime or 0) <= j.startTime] return [j for j in self.queue.values() if (startTime or 0) <= j.startTime]
@ -59,8 +60,8 @@ class Job(object):
scheduler = None scheduler = None
whenStarted = lambda self: None whenStarted = lambda self, job: None
whenFinished = lambda self, result: None whenFinished = lambda self, job, result: None
def __init__(self, **params): def __init__(self, **params):
self.startTime = 0 self.startTime = 0
@ -74,12 +75,12 @@ class Job(object):
return succeed('OK') return succeed('OK')
def reschedule(self, startTime): def reschedule(self, startTime):
self.scheduler.schedule(self.copy(), startTime) return self.scheduler.schedule(self.copy(), startTime)
def run(self): def run(self):
d = self.execute() d = self.execute()
d.addCallback(self.finishRun) d.addCallback(self.finishRun)
self.whenStarted() self.whenStarted(self)
# TODO: logging # TODO: logging
def finishRun(self, result): def finishRun(self, result):
@ -90,7 +91,7 @@ class Job(object):
job.params['result'] = result job.params['result'] = result
#job.run() #job.run()
self.scheduler.schedule(job) self.scheduler.schedule(job)
self.whenFinished(result) self.whenFinished(self, result)
# TODO: logging # TODO: logging
# reschedule if necessary # reschedule if necessary
if self.repeat: if self.repeat: