
git-svn-id: svn://svn.cy55.de/Zope3/src/cybertools/trunk@2413 fd906abe-77d9-0310-91a1-e0d9ade77398
368 lines
10 KiB
Python
368 lines
10 KiB
Python
# -*- test-case-name: twisted.test.test_task -*-
|
|
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
|
|
# See LICENSE for details.
|
|
|
|
|
|
"""Scheduling utility methods and classes.
|
|
|
|
API Stability: Unstable
|
|
|
|
@author: U{Jp Calderone<mailto:exarkun@twistedmatrix.com>}
|
|
"""
|
|
|
|
__metaclass__ = type
|
|
|
|
import time
|
|
|
|
from twisted.python.runtime import seconds
|
|
from twisted.python import reflect
|
|
|
|
from twisted.internet import base, defer
|
|
|
|
|
|
class LoopingCall:
|
|
"""Call a function repeatedly.
|
|
|
|
@ivar f: The function to call.
|
|
@ivar a: A tuple of arguments to pass the function.
|
|
@ivar kw: A dictionary of keyword arguments to pass to the function.
|
|
|
|
If C{f} returns a deferred, rescheduling will not take place until the
|
|
deferred has fired. The result value is ignored.
|
|
"""
|
|
|
|
call = None
|
|
running = False
|
|
deferred = None
|
|
interval = None
|
|
count = None
|
|
starttime = None
|
|
|
|
def _callLater(self, delay):
|
|
from twisted.internet import reactor
|
|
return reactor.callLater(delay, self)
|
|
|
|
_seconds = staticmethod(seconds)
|
|
|
|
def __init__(self, f, *a, **kw):
|
|
self.f = f
|
|
self.a = a
|
|
self.kw = kw
|
|
|
|
def start(self, interval, now=True):
|
|
"""Start running function every interval seconds.
|
|
|
|
@param interval: The number of seconds between calls. May be
|
|
less than one. Precision will depend on the underlying
|
|
platform, the available hardware, and the load on the system.
|
|
|
|
@param now: If True, run this call right now. Otherwise, wait
|
|
until the interval has elapsed before beginning.
|
|
|
|
@return: A Deferred whose callback will be invoked with
|
|
C{self} when C{self.stop} is called, or whose errback will be
|
|
invoked when the function raises an exception or returned a
|
|
deferred that has its errback invoked.
|
|
"""
|
|
assert not self.running, ("Tried to start an already running "
|
|
"LoopingCall.")
|
|
if interval < 0:
|
|
raise ValueError, "interval must be >= 0"
|
|
self.running = True
|
|
d = self.deferred = defer.Deferred()
|
|
self.starttime = self._seconds()
|
|
self.count = 0
|
|
self.interval = interval
|
|
if now:
|
|
self()
|
|
else:
|
|
self._reschedule()
|
|
return d
|
|
|
|
def stop(self):
|
|
"""Stop running function.
|
|
"""
|
|
assert self.running, ("Tried to stop a LoopingCall that was "
|
|
"not running.")
|
|
self.running = False
|
|
if self.call is not None:
|
|
self.call.cancel()
|
|
self.call = None
|
|
d, self.deferred = self.deferred, None
|
|
d.callback(self)
|
|
|
|
def __call__(self):
|
|
def cb(result):
|
|
if self.running:
|
|
self._reschedule()
|
|
else:
|
|
d, self.deferred = self.deferred, None
|
|
d.callback(self)
|
|
|
|
def eb(failure):
|
|
self.running = False
|
|
d, self.deferred = self.deferred, None
|
|
d.errback(failure)
|
|
|
|
self.call = None
|
|
d = defer.maybeDeferred(self.f, *self.a, **self.kw)
|
|
d.addCallback(cb)
|
|
d.addErrback(eb)
|
|
|
|
def _reschedule(self):
|
|
if self.interval == 0:
|
|
self.call = self._callLater(0)
|
|
return
|
|
|
|
fromNow = self.starttime - self._seconds()
|
|
|
|
while self.running:
|
|
self.count += 1
|
|
fromStart = self.count * self.interval
|
|
delay = fromNow + fromStart
|
|
if delay > 0:
|
|
self.call = self._callLater(delay)
|
|
return
|
|
|
|
def __repr__(self):
|
|
if hasattr(self.f, 'func_name'):
|
|
func = self.f.func_name
|
|
if hasattr(self.f, 'im_class'):
|
|
func = self.f.im_class.__name__ + '.' + func
|
|
else:
|
|
func = reflect.safe_repr(self.f)
|
|
|
|
return 'LoopingCall<%r>(%s, *%s, **%s)' % (
|
|
self.interval, func, reflect.safe_repr(self.a),
|
|
reflect.safe_repr(self.kw))
|
|
|
|
|
|
|
|
class SchedulerStopped(Exception):
|
|
"""
|
|
The operation could not complete because the scheduler was stopped in
|
|
progress or was already stopped.
|
|
"""
|
|
|
|
|
|
|
|
class _Timer(object):
|
|
MAX_SLICE = 0.01
|
|
def __init__(self):
|
|
self.end = time.time() + self.MAX_SLICE
|
|
|
|
|
|
def __call__(self):
|
|
return time.time() >= self.end
|
|
|
|
|
|
|
|
_EPSILON = 0.00000001
|
|
def _defaultScheduler(x):
|
|
from twisted.internet import reactor
|
|
return reactor.callLater(_EPSILON, x)
|
|
|
|
|
|
|
|
class Cooperator(object):
|
|
"""
|
|
Cooperative task scheduler.
|
|
"""
|
|
|
|
def __init__(self,
|
|
terminationPredicateFactory=_Timer,
|
|
scheduler=_defaultScheduler,
|
|
started=True):
|
|
"""
|
|
Create a scheduler-like object to which iterators may be added.
|
|
|
|
@param terminationPredicateFactory: A no-argument callable which will
|
|
be invoked at the beginning of each step and should return a
|
|
no-argument callable which will return False when the step should be
|
|
terminated. The default factory is time-based and allows iterators to
|
|
run for 1/100th of a second at a time.
|
|
|
|
@param scheduler: A one-argument callable which takes a no-argument
|
|
callable and should invoke it at some future point. This will be used
|
|
to schedule each step of this Cooperator.
|
|
|
|
@param started: A boolean which indicates whether iterators should be
|
|
stepped as soon as they are added, or if they will be queued up until
|
|
L{Cooperator.start} is called.
|
|
"""
|
|
self.iterators = []
|
|
self._metarator = iter(())
|
|
self._terminationPredicateFactory = terminationPredicateFactory
|
|
self._scheduler = scheduler
|
|
self._delayedCall = None
|
|
self._stopped = False
|
|
self._started = started
|
|
|
|
|
|
def coiterate(self, iterator, doneDeferred=None):
|
|
"""
|
|
Add an iterator to the list of iterators I am currently running.
|
|
|
|
@return: a Deferred that will fire when the iterator finishes.
|
|
"""
|
|
if doneDeferred is None:
|
|
doneDeferred = defer.Deferred()
|
|
if self._stopped:
|
|
doneDeferred.errback(SchedulerStopped())
|
|
return doneDeferred
|
|
self.iterators.append((iterator, doneDeferred))
|
|
self._reschedule()
|
|
return doneDeferred
|
|
|
|
|
|
def _tasks(self):
|
|
terminator = self._terminationPredicateFactory()
|
|
while self.iterators:
|
|
for i in self._metarator:
|
|
yield i
|
|
if terminator():
|
|
return
|
|
self._metarator = iter(self.iterators)
|
|
|
|
|
|
def _tick(self):
|
|
"""
|
|
Run one scheduler tick.
|
|
"""
|
|
self._delayedCall = None
|
|
for taskObj in self._tasks():
|
|
iterator, doneDeferred = taskObj
|
|
try:
|
|
result = iterator.next()
|
|
except StopIteration:
|
|
self.iterators.remove(taskObj)
|
|
doneDeferred.callback(iterator)
|
|
except:
|
|
self.iterators.remove(taskObj)
|
|
doneDeferred.errback()
|
|
else:
|
|
if isinstance(result, defer.Deferred):
|
|
self.iterators.remove(taskObj)
|
|
def cbContinue(result, taskObj=taskObj):
|
|
self.coiterate(*taskObj)
|
|
result.addCallbacks(cbContinue, doneDeferred.errback)
|
|
self._reschedule()
|
|
|
|
|
|
_mustScheduleOnStart = False
|
|
def _reschedule(self):
|
|
if not self._started:
|
|
self._mustScheduleOnStart = True
|
|
return
|
|
if self._delayedCall is None and self.iterators:
|
|
self._delayedCall = self._scheduler(self._tick)
|
|
|
|
|
|
def start(self):
|
|
"""
|
|
Begin scheduling steps.
|
|
"""
|
|
self._stopped = False
|
|
self._started = True
|
|
if self._mustScheduleOnStart:
|
|
del self._mustScheduleOnStart
|
|
self._reschedule()
|
|
|
|
|
|
def stop(self):
|
|
"""
|
|
Stop scheduling steps. Errback the completion Deferreds of all
|
|
iterators which have been added and forget about them.
|
|
"""
|
|
self._stopped = True
|
|
for iterator, doneDeferred in self.iterators:
|
|
doneDeferred.errback(SchedulerStopped())
|
|
self.iterators = []
|
|
if self._delayedCall is not None:
|
|
self._delayedCall.cancel()
|
|
self._delayedCall = None
|
|
|
|
|
|
|
|
_theCooperator = Cooperator()
|
|
def coiterate(iterator):
|
|
"""
|
|
Cooperatively iterate over the given iterator, dividing runtime between it
|
|
and all other iterators which have been passed to this function and not yet
|
|
exhausted.
|
|
"""
|
|
return _theCooperator.coiterate(iterator)
|
|
|
|
|
|
|
|
class Clock:
|
|
"""
|
|
Provide a deterministic, easily-controlled implementation of
|
|
L{IReactorTime.callLater}. This is commonly useful for writing
|
|
deterministic unit tests for code which schedules events using this API.
|
|
"""
|
|
rightNow = 0.0
|
|
|
|
def __init__(self):
|
|
self.calls = []
|
|
|
|
def seconds(self):
|
|
"""
|
|
Pretend to be time.time(). This is used internally when an operation
|
|
such as L{IDelayedCall.reset} needs to determine a a time value
|
|
relative to the current time.
|
|
|
|
@rtype: C{float}
|
|
@return: The time which should be considered the current time.
|
|
"""
|
|
return self.rightNow
|
|
|
|
|
|
def callLater(self, when, what, *a, **kw):
|
|
"""
|
|
See L{twisted.internet.interfaces.IReactorTime.callLater}.
|
|
"""
|
|
self.calls.append(
|
|
base.DelayedCall(self.seconds() + when,
|
|
what, a, kw,
|
|
self.calls.remove,
|
|
lambda c: None,
|
|
self.seconds))
|
|
self.calls.sort(lambda a, b: cmp(a.getTime(), b.getTime()))
|
|
return self.calls[-1]
|
|
|
|
|
|
def advance(self, amount):
|
|
"""
|
|
Move time on this clock forward by the given amount and run whatever
|
|
pending calls should be run.
|
|
|
|
@type amount: C{float}
|
|
@param amount: The number of seconds which to advance this clock's
|
|
time.
|
|
"""
|
|
self.rightNow += amount
|
|
while self.calls and self.calls[0].getTime() <= self.seconds():
|
|
call = self.calls.pop(0)
|
|
call.called = 1
|
|
call.func(*call.args, **call.kw)
|
|
|
|
|
|
def pump(self, timings):
|
|
"""
|
|
Advance incrementally by the given set of times.
|
|
|
|
@type timings: iterable of C{float}
|
|
"""
|
|
for amount in timings:
|
|
self.advance(amount)
|
|
|
|
|
|
|
|
__all__ = [
|
|
'LoopingCall',
|
|
|
|
'Clock',
|
|
|
|
'SchedulerStopped', 'Cooperator', 'coiterate',
|
|
]
|