added cybertools.agent package (work in progress...)

git-svn-id: svn://svn.cy55.de/Zope3/src/cybertools/trunk@2413 fd906abe-77d9-0310-91a1-e0d9ade77398
This commit is contained in:
helmutm 2008-02-23 14:07:15 +00:00
parent f244f58479
commit c8622b3058
21 changed files with 1115 additions and 5 deletions

126
agent/README.txt Normal file
View file

@ -0,0 +1,126 @@
================================================
Agents for Job Execution and Communication Tasks
================================================
Agents do some work specified by a jobs, the main task being to collect
information objects from the local machine or some external source and
transfer them e.g. to a loops server on the same machine or another.
($Id$)
This package does not depend on Zope but represents a standalone application.
Sub-Packages
============
Top-level
Generic interfaces, ``tests`` module, this ``README.txt`` file.
base
Base and sample classes
core
Agent and scheduling implementations.
control
Communication with an external agent control and job database application
crawl
Scanning/crawling some system, e.g. the database of an email application,
the local file system, or an external document source
transport
Transfer of information objects to agents on another machine or
to an information management application (e.g. loops)
util
Various utility modules, e.g. a backport of the
``twisted.internet.task.coiterate()`` function from Twisted 2.5.
All sub-packages except ``base`` depend on Twisted.
Object Structure and Control Flow
=================================
::
---------- ------------
-------- | |<---1| |
| config |--> | master |<---2| controller |
-------- /| |4--->| |
/ ---------- ------------
/ 1 ^ 2
/ | | \ -----------
/ | | `---->| scheduler |
v v 4 -----------
----- --------- 3
| log |<--| agent |<----------´
----- ---------
(1) Agent specifications control creation and setup of agents
(2) Job specifications control scheduling of jobs
(3) Scheduler triggers job execution by agent
(4) Results are recorded, possibly triggering further actions
Basic Stuff
===========
While the real application is based on the asynchronous communication
framework Twisted there is some basic stuff (mainly interfaces and
base classes with basic, sample, or dummy implementations) that is
independent of Twisted.
The code resides in in the ``base`` sub-package.
Master Agent and Configuration
------------------------------
All activity is controlled by the master agent.
The master agent is set up according to the specifications in a
configuration file.
The configuration typically provides only basic informations, e.g. about
the controller(s) and logger(s) to be used; details on jobs and agent
configuration are provided by the controller.
>>> from cybertools.agent.tests import baseDir
>>> import os
>>> configFile = open(os.path.join(baseDir, 'base', 'sample.cfg'))
So we are now ready to create a master agent and configure it by supplying
the path to the configuration file.
>>> from cybertools.agent.base.agent import Master
>>> master = Master(configFile)
>>> master.config
controller.name = 'sample'
logger.name = 'sample'
scheduler.name = 'sample'
Controller
----------
Creation of agents and scheduling of jobs is controlled by the controller
object. This is typically associated with a sort of control storage that
provides agent and job specifications and receives the results of job
execution.
We open the controller and read in the specifications via the master agent's
``setup`` method.
>>> master.setup()
Other Agents
------------
Job Scheduling and Execution
----------------------------

4
agent/__init__.py Normal file
View file

@ -0,0 +1,4 @@
"""
$Id$
"""

4
agent/base/__init__.py Normal file
View file

@ -0,0 +1,4 @@
"""
$Id$
"""

60
agent/base/agent.py Normal file
View file

@ -0,0 +1,60 @@
#
# Copyright (c) 2008 Helmut Merz helmutm@cy55.de
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""
Agent base and sample classes.
$Id$
"""
from zope.interface import implements
from cybertools.agent.interfaces import IAgent
from cybertools.util.config import Configurator
class Agent(object):
implements(IAgent)
master = None
logger = None
def execute(self, job, params=None):
pass
class Master(Agent):
config = None
controller = None
scheduler = None
def __init__(self, configuration=None):
self.config = Configurator()
if configuration is not None:
self.config.load(configuration)
def setup(self):
pass
class SampleAgent(Agent):
pass

33
agent/base/control.py Normal file
View file

@ -0,0 +1,33 @@
#
# Copyright (c) 2008 Helmut Merz helmutm@cy55.de
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""
Base/sample controller implementation.
$Id$
"""
from zope.interface import implements
from cybertools.agent.interfaces import IController
class Controller(object):
implements(IController)

32
agent/base/job.py Normal file
View file

@ -0,0 +1,32 @@
#
# Copyright (c) 2007 Helmut Merz helmutm@cy55.de
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""
The real agent stuff.
$Id$
"""
from zope.interface import implements
from loops.agent.interfaces import IScheduledJob
class Job(object):
implements(IScheduledJob)

76
agent/base/log.py Normal file
View file

@ -0,0 +1,76 @@
#
# Copyright (c) 2007 Helmut Merz helmutm@cy55.de
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""
Log information management.
$Id$
"""
import logging
import sys
import time
from zope.interface import implements
from loops.agent.interfaces import ILogger, ILogRecord
class LogRecord(object):
implements(ILogRecord)
datefmt = '%Y-%m-%dT%H:%S'
def __init__(self, logger, data):
self.logger = logger
self.data = data
self.timeStamp = time.time()
def __str__(self):
msg = [str(time.strftime(self.datefmt, time.localtime(self.timeStamp)))]
for k in sorted(self.data):
msg.append('%s:%s' % (str(k), str(self.data[k])))
return ' '.join(msg)
class Logger(list):
implements(ILogger)
recordFactory = LogRecord
def __init__(self, agent):
self.agent = agent
self.setup()
def setup(self):
self.externalLoggers = []
conf = self.agent.config.logging
if conf.standard:
logger = logging.getLogger()
logger.level = conf.standard
logger.addHandler(logging.StreamHandler(sys.stdout))
self.externalLoggers.append(logger)
def log(self, data):
record = self.recordFactory(self, data)
self.append(record)
for logger in self.externalLoggers:
logger.info(str(record))

9
agent/base/sample.cfg Normal file
View file

@ -0,0 +1,9 @@
#
# sample.cfg - agent configuration for demonstration and testing purposes
#
# $Id$
#
controller(name='sample')
scheduler(name='sample')
logger(name='sample')

34
agent/base/schedule.py Normal file
View file

@ -0,0 +1,34 @@
#
# Copyright (c) 2007 Helmut Merz helmutm@cy55.de
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""
Basic (sample) job scheduler.
$Id$
"""
from time import time
from zope.interface import implements
from loops.agent.interfaces import IScheduler
class Scheduler(object):
implements(IScheduler)

View file

@ -0,0 +1,4 @@
"""
$Id$
"""

31
agent/core/README.txt Normal file
View file

@ -0,0 +1,31 @@
================================================
Agents for Job Execution and Communication Tasks
================================================
Agents collect informations and transfer them e.g. to a loops server.
($Id$)
This package does not depend on zope or the other loops packages
but represents a standalone application.
But we need a reactor for working with Twisted; in order not to block
testing when running the reactor we use reactor.iterate() calls
wrapped in a ``tester`` object.
>>> from cybertools.agent.tests import tester
Master Agent
============
The agent uses Twisted's cooperative multitasking model.
This means that all calls to services (like crawler, transporter, ...)
return a deferred that must be supplied with a callback method (and in
most cases also an errback method).
>>> #from cybertools.agent.core.agent import Master
>>> #master = Master()

4
agent/core/__init__.py Normal file
View file

@ -0,0 +1,4 @@
"""
$Id$
"""

4
agent/crawl/__init__.py Normal file
View file

@ -0,0 +1,4 @@
"""
$Id$
"""

241
agent/interfaces.py Normal file
View file

@ -0,0 +1,241 @@
#
# Copyright (c) 2008 Helmut Merz helmutm@cy55.de
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""
cybertools agent interfaces.
$Id$
"""
from zope.interface import Interface, Attribute
# agents
class IAgent(Interface):
""" An agent waits for jobs to execute.
"""
master = Attribute('IMaster instance.')
logger = Attribute('Logger instance to be used for recording '
'job execution and execution results.')
def execute(job, params=None):
""" Execute a job.
"""
class IMaster(IAgent):
""" The top-level controller agent.
"""
config = Attribute('Central configuration.')
controller = Attribute('IController instance.')
scheduler = Attribute('IScheduler instance.')
def setup():
""" Load agent specifications from the controller and set up
the corresponding agents. Then load the specifications of
active jobs from the controller and schedule the corresponding
jobs.
"""
class ICrawler(IAgent):
""" Collects resources.
"""
def collect(filter=None):
""" Return a deferred that upon callback will provide a
collection of resource objects that should be transferred
to the server.
Use the selection criteria given to filter the resources that
should be collected.
"""
class ITransporter(IAgent):
""" Transfers one or more collected resources and corresponding
metadata to another entity - a remote agent or another application.
"""
serverURL = Attribute('URL of the server the resources will be '
'transferred to. The URL also determines the '
'transfer protocol, e.g. HTTP or FTP.')
method = Attribute('Transport method, e.g. PUT.')
machineName = Attribute('Name under which the local machine is '
'known to the server.')
userName = Attribute('User name for logging in to the server.')
password = Attribute('Password for logging in to the server.')
def transfer(resource):
""" Transfer the resource (an object providing IResource)
to the server and return a Deferred.
"""
# job control
class IController(Interface):
""" Fetches agent and job specifications from a control
storage and updates the storage with the status and result
information.
"""
class IScheduler(Interface):
""" Manages jobs and cares that they are started at the appropriate
time by the agents responsible for it.
"""
def schedule(job, agent, startTime=None):
""" Register the job given for execution at the intended start
date/time (an integer timestamp) and return the job.
If the start time is not given schedule the job for immediate
start. Return the start time with which the job has been
scheduled - this may be different from the start time
supplied.
"""
def getJobsToExecute(startTime=None, agents=None):
""" Return a collection of jobs that are scheduled for execution at
or before the date/time given.
If ``startTime`` is None the current date/time is used.
If ``agents`` is not None return only jobs for the agents
given.
"""
# jobs
class IScheduledJob(Interface):
""" A job that will be executed on some external triggering at
a predefined date and time - this is the basic job interface.
"""
agent = Attribute('Agent responsible for executing the job.')
startTime = Attribute('Date/time at which the job should be executed.')
params = Attribute('Mapping with key/value pairs to be used by '
'the ``execute()`` method.')
repeat = Attribute('Number of seconds after which the job should be '
'rescheduled. Do not repeat if 0.')
successors = Attribute('Jobs to execute immediately after this '
'one has been finished.')
whenStarted = Attribute('A callable with one argument (the job) that will '
'be called when the job has started.')
whenfinished = Attribute('A callable with two arguments, the job and the '
'result of running the job, that will be called when '
'the job has finished.')
def execute(params=None):
""" Execute the job, typically by calling the ``execute()`` method
of the agent responsible for it.
"""
def reschedule(startTime):
""" Re-schedule the job, setting the date/time the job should be
executed again.
"""
class ICrawlingJob(IScheduledJob):
""" A job specifying a crawling task.
"""
predefinedMetadata = Attribute('A mapping with metadata to be used '
'for all resources found.')
class ITransportJob(IScheduledJob):
""" A job managing the the transfer of a resource to the server.
"""
transporter = Attribute('The transporter agent to use for transfer.')
# information objects
class IResource(Interface):
""" Represents a data object that is collected by a crawler and
will be transferred to the server.
"""
data = Attribute('A string, file, or similar representation of the '
'resource\'s content; may be None if the receiver of '
'the information can retrieve the date from the path '
'given.')
path = Attribute('A filesystem path or some other information '
'uniquely identifying the resource on the client '
'machine for the current user.')
application = Attribute('The name of the application that provided '
'the resource, e.g. "filesystem" or "mail".')
metadata = Attribute('Information describing this resource; '
'should be an IMetadataSet object.')
class IMetadataSet(Interface):
""" Metadata associated with a resource; a mapping.
"""
def asXML():
""" Return an XML string representing the metadata set.
If this metadata set contains other metadata sets
(nested metadata) these will be converted to XML as well.
"""
def set(key, value):
""" Set a metadata element.
The value may be a string or another metadata set
(nested metadata).
"""
# logging
class ILogger(Interface):
""" Ordered collection (list) of log records, probably stored on some
external device.
"""
externalLoggers = Attribute('A collection of logger objects '
'to which the logging records should be written.')
def setup():
""" Initialize the logger with the current configuration settings.
"""
def log(data):
""" Record the information given by the ``data`` argument
(a mapping).
"""
class ILogRecord(Interface):
"""
"""
def __str__():
""" Return a string representation suitable for writing to a
log file.
"""

52
agent/tests.py Executable file
View file

@ -0,0 +1,52 @@
#! /usr/bin/env python
# $Id$
import os, time
import unittest, doctest
from zope.testing.doctestunit import DocFileSuite
from twisted.internet import reactor
#from twisted.internet.defer import Deferred
#from twisted.trial import unittest as trial_unittest
baseDir = os.path.dirname(__file__)
class Tester(object):
""" Used for controlled execution of reactor iteration cycles.
"""
def iterate(self, n=10, delays={}):
for i in range(n):
delay = delays.get(i, 0)
reactor.iterate(delay)
tester = Tester()
class Test(unittest.TestCase):
"Basic tests for the cybertools.agent package."
def setUp(self):
pass
def tearDown(self):
pass
def testBasicStuff(self):
pass
def test_suite():
flags = doctest.NORMALIZE_WHITESPACE | doctest.ELLIPSIS
testSuite = unittest.TestSuite((
unittest.makeSuite(Test),
DocFileSuite('README.txt', optionflags=flags),
DocFileSuite('core/README.txt', optionflags=flags),
#DocFileSuite('transport/httpput.txt', optionflags=flags),
))
return testSuite
if __name__ == '__main__':
standard_unittest.main(defaultTest='test_suite')

View file

@ -0,0 +1,4 @@
"""
$Id$
"""

4
agent/util/__init__.py Normal file
View file

@ -0,0 +1,4 @@
"""
$Id$
"""

368
agent/util/task.py Normal file
View file

@ -0,0 +1,368 @@
# -*- test-case-name: twisted.test.test_task -*-
# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
# See LICENSE for details.
"""Scheduling utility methods and classes.
API Stability: Unstable
@author: U{Jp Calderone<mailto:exarkun@twistedmatrix.com>}
"""
__metaclass__ = type
import time
from twisted.python.runtime import seconds
from twisted.python import reflect
from twisted.internet import base, defer
class LoopingCall:
"""Call a function repeatedly.
@ivar f: The function to call.
@ivar a: A tuple of arguments to pass the function.
@ivar kw: A dictionary of keyword arguments to pass to the function.
If C{f} returns a deferred, rescheduling will not take place until the
deferred has fired. The result value is ignored.
"""
call = None
running = False
deferred = None
interval = None
count = None
starttime = None
def _callLater(self, delay):
from twisted.internet import reactor
return reactor.callLater(delay, self)
_seconds = staticmethod(seconds)
def __init__(self, f, *a, **kw):
self.f = f
self.a = a
self.kw = kw
def start(self, interval, now=True):
"""Start running function every interval seconds.
@param interval: The number of seconds between calls. May be
less than one. Precision will depend on the underlying
platform, the available hardware, and the load on the system.
@param now: If True, run this call right now. Otherwise, wait
until the interval has elapsed before beginning.
@return: A Deferred whose callback will be invoked with
C{self} when C{self.stop} is called, or whose errback will be
invoked when the function raises an exception or returned a
deferred that has its errback invoked.
"""
assert not self.running, ("Tried to start an already running "
"LoopingCall.")
if interval < 0:
raise ValueError, "interval must be >= 0"
self.running = True
d = self.deferred = defer.Deferred()
self.starttime = self._seconds()
self.count = 0
self.interval = interval
if now:
self()
else:
self._reschedule()
return d
def stop(self):
"""Stop running function.
"""
assert self.running, ("Tried to stop a LoopingCall that was "
"not running.")
self.running = False
if self.call is not None:
self.call.cancel()
self.call = None
d, self.deferred = self.deferred, None
d.callback(self)
def __call__(self):
def cb(result):
if self.running:
self._reschedule()
else:
d, self.deferred = self.deferred, None
d.callback(self)
def eb(failure):
self.running = False
d, self.deferred = self.deferred, None
d.errback(failure)
self.call = None
d = defer.maybeDeferred(self.f, *self.a, **self.kw)
d.addCallback(cb)
d.addErrback(eb)
def _reschedule(self):
if self.interval == 0:
self.call = self._callLater(0)
return
fromNow = self.starttime - self._seconds()
while self.running:
self.count += 1
fromStart = self.count * self.interval
delay = fromNow + fromStart
if delay > 0:
self.call = self._callLater(delay)
return
def __repr__(self):
if hasattr(self.f, 'func_name'):
func = self.f.func_name
if hasattr(self.f, 'im_class'):
func = self.f.im_class.__name__ + '.' + func
else:
func = reflect.safe_repr(self.f)
return 'LoopingCall<%r>(%s, *%s, **%s)' % (
self.interval, func, reflect.safe_repr(self.a),
reflect.safe_repr(self.kw))
class SchedulerStopped(Exception):
"""
The operation could not complete because the scheduler was stopped in
progress or was already stopped.
"""
class _Timer(object):
MAX_SLICE = 0.01
def __init__(self):
self.end = time.time() + self.MAX_SLICE
def __call__(self):
return time.time() >= self.end
_EPSILON = 0.00000001
def _defaultScheduler(x):
from twisted.internet import reactor
return reactor.callLater(_EPSILON, x)
class Cooperator(object):
"""
Cooperative task scheduler.
"""
def __init__(self,
terminationPredicateFactory=_Timer,
scheduler=_defaultScheduler,
started=True):
"""
Create a scheduler-like object to which iterators may be added.
@param terminationPredicateFactory: A no-argument callable which will
be invoked at the beginning of each step and should return a
no-argument callable which will return False when the step should be
terminated. The default factory is time-based and allows iterators to
run for 1/100th of a second at a time.
@param scheduler: A one-argument callable which takes a no-argument
callable and should invoke it at some future point. This will be used
to schedule each step of this Cooperator.
@param started: A boolean which indicates whether iterators should be
stepped as soon as they are added, or if they will be queued up until
L{Cooperator.start} is called.
"""
self.iterators = []
self._metarator = iter(())
self._terminationPredicateFactory = terminationPredicateFactory
self._scheduler = scheduler
self._delayedCall = None
self._stopped = False
self._started = started
def coiterate(self, iterator, doneDeferred=None):
"""
Add an iterator to the list of iterators I am currently running.
@return: a Deferred that will fire when the iterator finishes.
"""
if doneDeferred is None:
doneDeferred = defer.Deferred()
if self._stopped:
doneDeferred.errback(SchedulerStopped())
return doneDeferred
self.iterators.append((iterator, doneDeferred))
self._reschedule()
return doneDeferred
def _tasks(self):
terminator = self._terminationPredicateFactory()
while self.iterators:
for i in self._metarator:
yield i
if terminator():
return
self._metarator = iter(self.iterators)
def _tick(self):
"""
Run one scheduler tick.
"""
self._delayedCall = None
for taskObj in self._tasks():
iterator, doneDeferred = taskObj
try:
result = iterator.next()
except StopIteration:
self.iterators.remove(taskObj)
doneDeferred.callback(iterator)
except:
self.iterators.remove(taskObj)
doneDeferred.errback()
else:
if isinstance(result, defer.Deferred):
self.iterators.remove(taskObj)
def cbContinue(result, taskObj=taskObj):
self.coiterate(*taskObj)
result.addCallbacks(cbContinue, doneDeferred.errback)
self._reschedule()
_mustScheduleOnStart = False
def _reschedule(self):
if not self._started:
self._mustScheduleOnStart = True
return
if self._delayedCall is None and self.iterators:
self._delayedCall = self._scheduler(self._tick)
def start(self):
"""
Begin scheduling steps.
"""
self._stopped = False
self._started = True
if self._mustScheduleOnStart:
del self._mustScheduleOnStart
self._reschedule()
def stop(self):
"""
Stop scheduling steps. Errback the completion Deferreds of all
iterators which have been added and forget about them.
"""
self._stopped = True
for iterator, doneDeferred in self.iterators:
doneDeferred.errback(SchedulerStopped())
self.iterators = []
if self._delayedCall is not None:
self._delayedCall.cancel()
self._delayedCall = None
_theCooperator = Cooperator()
def coiterate(iterator):
"""
Cooperatively iterate over the given iterator, dividing runtime between it
and all other iterators which have been passed to this function and not yet
exhausted.
"""
return _theCooperator.coiterate(iterator)
class Clock:
"""
Provide a deterministic, easily-controlled implementation of
L{IReactorTime.callLater}. This is commonly useful for writing
deterministic unit tests for code which schedules events using this API.
"""
rightNow = 0.0
def __init__(self):
self.calls = []
def seconds(self):
"""
Pretend to be time.time(). This is used internally when an operation
such as L{IDelayedCall.reset} needs to determine a a time value
relative to the current time.
@rtype: C{float}
@return: The time which should be considered the current time.
"""
return self.rightNow
def callLater(self, when, what, *a, **kw):
"""
See L{twisted.internet.interfaces.IReactorTime.callLater}.
"""
self.calls.append(
base.DelayedCall(self.seconds() + when,
what, a, kw,
self.calls.remove,
lambda c: None,
self.seconds))
self.calls.sort(lambda a, b: cmp(a.getTime(), b.getTime()))
return self.calls[-1]
def advance(self, amount):
"""
Move time on this clock forward by the given amount and run whatever
pending calls should be run.
@type amount: C{float}
@param amount: The number of seconds which to advance this clock's
time.
"""
self.rightNow += amount
while self.calls and self.calls[0].getTime() <= self.seconds():
call = self.calls.pop(0)
call.called = 1
call.func(*call.args, **call.kw)
def pump(self, timings):
"""
Advance incrementally by the given set of times.
@type timings: iterable of C{float}
"""
for amount in timings:
self.advance(amount)
__all__ = [
'LoopingCall',
'Clock',
'SchedulerStopped', 'Cooperator', 'coiterate',
]

View file

@ -79,6 +79,7 @@
<metal:textline define-macro="input_textline">
<input type="text" name="field" style="width: 450px"
xxdojoType="dijit.form.ValidationTextBox"
tal:define="width field/width|nothing"
tal:attributes="name name; id name;
style python:

View file

@ -66,10 +66,16 @@ class BaseView(SchemaBaseView):
if service is None:
service = self.context
if service.start and service.end:
return ('%s-%s' %
typeEnd = 'time'
separator = '-'
if time.localtime(service.start)[2] != time.localtime(service.end)[2]:
typeEnd = 'dateTime'
separator = ' - '
return ('%s%s%s' %
(self.getFormattedDate(service.start,
type='dateTime', variant='short').replace(' ', ' '),
self.getFormattedDate(service.end, type='time', variant='short')))
type='dateTime', variant='short'),
separator,
self.getFormattedDate(service.end, type=typeEnd, variant='short')))
else:
return '-'

View file

@ -94,7 +94,9 @@ class TrackingStorage(BTreeContainer):
trackFactory = Track
trackNum = runId = 0
runs = None
runs = None # currently active runs
finishedRuns = None # finished runs
currentRuns = None # the currently active run for each task
indexAttributes = Track.index_attributes
@ -108,6 +110,7 @@ class TrackingStorage(BTreeContainer):
for idx in self.indexAttributes:
self.indexes[idx] = FieldIndex()
self.runs = IOBTree.IOBTree()
self.finishedRuns = IOBTree.IOBTree()
self.currentRuns = OOBTree.OOBTree()
self.taskUsers = OOBTree.OOBTree()
@ -142,16 +145,26 @@ class TrackingStorage(BTreeContainer):
if run is not None:
run.end = getTimeStamp()
run.finished = finish
if finish:
self.moveToFinishedRuns(run)
return runId
return 0
def moveToFinishedRuns(self, run):
id = run.id
if id in self.runs:
del self.runs[id]
if self.finishedRuns is None: # backward compatibility
self.finishedRuns = IOBTree.IOBTree()
self.finishedRuns[id] = run
def getRun(self, taskId=None, runId=0):
if self.runs is None:
self.runs = IOBTree.IOBTree()
if taskId and not runId:
runId = self.currentRuns.get(taskId)
if runId:
return self.runs.get(runId)
return self.runs.get(runId) or self.finishedRuns.get(runId)
return None
def generateTrackId(self):