From c8622b30587760ce12abfb5bfa57cf0ff2d1c09f Mon Sep 17 00:00:00 2001 From: helmutm Date: Sat, 23 Feb 2008 14:07:15 +0000 Subject: [PATCH] added cybertools.agent package (work in progress...) git-svn-id: svn://svn.cy55.de/Zope3/src/cybertools/trunk@2413 fd906abe-77d9-0310-91a1-e0d9ade77398 --- agent/README.txt | 126 ++++++++ agent/__init__.py | 4 + agent/base/__init__.py | 4 + agent/base/agent.py | 60 ++++ agent/base/control.py | 33 ++ agent/base/job.py | 32 ++ agent/base/log.py | 76 +++++ agent/base/sample.cfg | 9 + agent/base/schedule.py | 34 +++ agent/control/__init__.py | 4 + agent/core/README.txt | 31 ++ agent/core/__init__.py | 4 + agent/crawl/__init__.py | 4 + agent/interfaces.py | 241 +++++++++++++++ agent/tests.py | 52 ++++ agent/transport/__init__.py | 4 + agent/util/__init__.py | 4 + agent/util/task.py | 368 +++++++++++++++++++++++ composer/schema/browser/schema_macros.pt | 1 + organize/browser/service.py | 12 +- tracking/btree.py | 17 +- 21 files changed, 1115 insertions(+), 5 deletions(-) create mode 100644 agent/README.txt create mode 100644 agent/__init__.py create mode 100644 agent/base/__init__.py create mode 100644 agent/base/agent.py create mode 100644 agent/base/control.py create mode 100644 agent/base/job.py create mode 100644 agent/base/log.py create mode 100644 agent/base/sample.cfg create mode 100644 agent/base/schedule.py create mode 100644 agent/control/__init__.py create mode 100644 agent/core/README.txt create mode 100644 agent/core/__init__.py create mode 100644 agent/crawl/__init__.py create mode 100644 agent/interfaces.py create mode 100755 agent/tests.py create mode 100644 agent/transport/__init__.py create mode 100644 agent/util/__init__.py create mode 100644 agent/util/task.py diff --git a/agent/README.txt b/agent/README.txt new file mode 100644 index 0000000..b10943b --- /dev/null +++ b/agent/README.txt @@ -0,0 +1,126 @@ +================================================ +Agents for Job Execution and Communication Tasks +================================================ + +Agents do some work specified by a jobs, the main task being to collect +information objects from the local machine or some external source and +transfer them e.g. to a loops server on the same machine or another. + + + ($Id$) + +This package does not depend on Zope but represents a standalone application. + + +Sub-Packages +============ + +Top-level + Generic interfaces, ``tests`` module, this ``README.txt`` file. + +base + Base and sample classes + +core + Agent and scheduling implementations. + +control + Communication with an external agent control and job database application + +crawl + Scanning/crawling some system, e.g. the database of an email application, + the local file system, or an external document source + +transport + Transfer of information objects to agents on another machine or + to an information management application (e.g. loops) + +util + Various utility modules, e.g. a backport of the + ``twisted.internet.task.coiterate()`` function from Twisted 2.5. + +All sub-packages except ``base`` depend on Twisted. + + +Object Structure and Control Flow +================================= + +:: + + ---------- ------------ + -------- | |<---1| | + | config |--> | master |<---2| controller | + -------- /| |4--->| | + / ---------- ------------ + / 1 ^ 2 + / | | \ ----------- + / | | `---->| scheduler | + v v 4 ----------- + ----- --------- 3 + | log |<--| agent |<----------ยด + ----- --------- + +(1) Agent specifications control creation and setup of agents + +(2) Job specifications control scheduling of jobs + +(3) Scheduler triggers job execution by agent + +(4) Results are recorded, possibly triggering further actions + + +Basic Stuff +=========== + +While the real application is based on the asynchronous communication +framework Twisted there is some basic stuff (mainly interfaces and +base classes with basic, sample, or dummy implementations) that is +independent of Twisted. + +The code resides in in the ``base`` sub-package. + +Master Agent and Configuration +------------------------------ + +All activity is controlled by the master agent. + +The master agent is set up according to the specifications in a +configuration file. + +The configuration typically provides only basic informations, e.g. about +the controller(s) and logger(s) to be used; details on jobs and agent +configuration are provided by the controller. + + >>> from cybertools.agent.tests import baseDir + >>> import os + >>> configFile = open(os.path.join(baseDir, 'base', 'sample.cfg')) + +So we are now ready to create a master agent and configure it by supplying +the path to the configuration file. + + >>> from cybertools.agent.base.agent import Master + >>> master = Master(configFile) + + >>> master.config + controller.name = 'sample' + logger.name = 'sample' + scheduler.name = 'sample' + +Controller +---------- + +Creation of agents and scheduling of jobs is controlled by the controller +object. This is typically associated with a sort of control storage that +provides agent and job specifications and receives the results of job +execution. + +We open the controller and read in the specifications via the master agent's +``setup`` method. + + >>> master.setup() + +Other Agents +------------ + +Job Scheduling and Execution +---------------------------- diff --git a/agent/__init__.py b/agent/__init__.py new file mode 100644 index 0000000..4bc90fb --- /dev/null +++ b/agent/__init__.py @@ -0,0 +1,4 @@ +""" +$Id$ +""" + diff --git a/agent/base/__init__.py b/agent/base/__init__.py new file mode 100644 index 0000000..4bc90fb --- /dev/null +++ b/agent/base/__init__.py @@ -0,0 +1,4 @@ +""" +$Id$ +""" + diff --git a/agent/base/agent.py b/agent/base/agent.py new file mode 100644 index 0000000..880b770 --- /dev/null +++ b/agent/base/agent.py @@ -0,0 +1,60 @@ +# +# Copyright (c) 2008 Helmut Merz helmutm@cy55.de +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +""" +Agent base and sample classes. + +$Id$ +""" + +from zope.interface import implements + +from cybertools.agent.interfaces import IAgent +from cybertools.util.config import Configurator + + +class Agent(object): + + implements(IAgent) + + master = None + logger = None + + def execute(self, job, params=None): + pass + + +class Master(Agent): + + config = None + controller = None + scheduler = None + + def __init__(self, configuration=None): + self.config = Configurator() + if configuration is not None: + self.config.load(configuration) + + def setup(self): + pass + + +class SampleAgent(Agent): + + pass + diff --git a/agent/base/control.py b/agent/base/control.py new file mode 100644 index 0000000..582d5ce --- /dev/null +++ b/agent/base/control.py @@ -0,0 +1,33 @@ +# +# Copyright (c) 2008 Helmut Merz helmutm@cy55.de +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +""" +Base/sample controller implementation. + +$Id$ +""" + +from zope.interface import implements + +from cybertools.agent.interfaces import IController + + +class Controller(object): + + implements(IController) + diff --git a/agent/base/job.py b/agent/base/job.py new file mode 100644 index 0000000..0fbe7de --- /dev/null +++ b/agent/base/job.py @@ -0,0 +1,32 @@ +# +# Copyright (c) 2007 Helmut Merz helmutm@cy55.de +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +""" +The real agent stuff. + +$Id$ +""" + +from zope.interface import implements +from loops.agent.interfaces import IScheduledJob + + +class Job(object): + + implements(IScheduledJob) + diff --git a/agent/base/log.py b/agent/base/log.py new file mode 100644 index 0000000..6732edf --- /dev/null +++ b/agent/base/log.py @@ -0,0 +1,76 @@ +# +# Copyright (c) 2007 Helmut Merz helmutm@cy55.de +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +""" +Log information management. + +$Id$ +""" + +import logging +import sys +import time +from zope.interface import implements + +from loops.agent.interfaces import ILogger, ILogRecord + + +class LogRecord(object): + + implements(ILogRecord) + + datefmt = '%Y-%m-%dT%H:%S' + + def __init__(self, logger, data): + self.logger = logger + self.data = data + self.timeStamp = time.time() + + def __str__(self): + msg = [str(time.strftime(self.datefmt, time.localtime(self.timeStamp)))] + for k in sorted(self.data): + msg.append('%s:%s' % (str(k), str(self.data[k]))) + return ' '.join(msg) + + +class Logger(list): + + implements(ILogger) + + recordFactory = LogRecord + + + def __init__(self, agent): + self.agent = agent + self.setup() + + def setup(self): + self.externalLoggers = [] + conf = self.agent.config.logging + if conf.standard: + logger = logging.getLogger() + logger.level = conf.standard + logger.addHandler(logging.StreamHandler(sys.stdout)) + self.externalLoggers.append(logger) + + def log(self, data): + record = self.recordFactory(self, data) + self.append(record) + for logger in self.externalLoggers: + logger.info(str(record)) + diff --git a/agent/base/sample.cfg b/agent/base/sample.cfg new file mode 100644 index 0000000..aca8734 --- /dev/null +++ b/agent/base/sample.cfg @@ -0,0 +1,9 @@ +# +# sample.cfg - agent configuration for demonstration and testing purposes +# +# $Id$ +# + +controller(name='sample') +scheduler(name='sample') +logger(name='sample') diff --git a/agent/base/schedule.py b/agent/base/schedule.py new file mode 100644 index 0000000..6a181eb --- /dev/null +++ b/agent/base/schedule.py @@ -0,0 +1,34 @@ +# +# Copyright (c) 2007 Helmut Merz helmutm@cy55.de +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +""" +Basic (sample) job scheduler. + +$Id$ +""" + +from time import time +from zope.interface import implements + +from loops.agent.interfaces import IScheduler + + +class Scheduler(object): + + implements(IScheduler) + diff --git a/agent/control/__init__.py b/agent/control/__init__.py new file mode 100644 index 0000000..4bc90fb --- /dev/null +++ b/agent/control/__init__.py @@ -0,0 +1,4 @@ +""" +$Id$ +""" + diff --git a/agent/core/README.txt b/agent/core/README.txt new file mode 100644 index 0000000..07645ea --- /dev/null +++ b/agent/core/README.txt @@ -0,0 +1,31 @@ +================================================ +Agents for Job Execution and Communication Tasks +================================================ + +Agents collect informations and transfer them e.g. to a loops server. + + ($Id$) + +This package does not depend on zope or the other loops packages +but represents a standalone application. + +But we need a reactor for working with Twisted; in order not to block +testing when running the reactor we use reactor.iterate() calls +wrapped in a ``tester`` object. + + >>> from cybertools.agent.tests import tester + + +Master Agent +============ + +The agent uses Twisted's cooperative multitasking model. + +This means that all calls to services (like crawler, transporter, ...) +return a deferred that must be supplied with a callback method (and in +most cases also an errback method). + + >>> #from cybertools.agent.core.agent import Master + >>> #master = Master() + + diff --git a/agent/core/__init__.py b/agent/core/__init__.py new file mode 100644 index 0000000..4bc90fb --- /dev/null +++ b/agent/core/__init__.py @@ -0,0 +1,4 @@ +""" +$Id$ +""" + diff --git a/agent/crawl/__init__.py b/agent/crawl/__init__.py new file mode 100644 index 0000000..4bc90fb --- /dev/null +++ b/agent/crawl/__init__.py @@ -0,0 +1,4 @@ +""" +$Id$ +""" + diff --git a/agent/interfaces.py b/agent/interfaces.py new file mode 100644 index 0000000..9081920 --- /dev/null +++ b/agent/interfaces.py @@ -0,0 +1,241 @@ +# +# Copyright (c) 2008 Helmut Merz helmutm@cy55.de +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +""" +cybertools agent interfaces. + +$Id$ +""" + +from zope.interface import Interface, Attribute + + +# agents + +class IAgent(Interface): + """ An agent waits for jobs to execute. + """ + + master = Attribute('IMaster instance.') + logger = Attribute('Logger instance to be used for recording ' + 'job execution and execution results.') + + def execute(job, params=None): + """ Execute a job. + """ + + +class IMaster(IAgent): + """ The top-level controller agent. + """ + + config = Attribute('Central configuration.') + controller = Attribute('IController instance.') + scheduler = Attribute('IScheduler instance.') + + def setup(): + """ Load agent specifications from the controller and set up + the corresponding agents. Then load the specifications of + active jobs from the controller and schedule the corresponding + jobs. + """ + + +class ICrawler(IAgent): + """ Collects resources. + """ + + def collect(filter=None): + """ Return a deferred that upon callback will provide a + collection of resource objects that should be transferred + to the server. + + Use the selection criteria given to filter the resources that + should be collected. + """ + + +class ITransporter(IAgent): + """ Transfers one or more collected resources and corresponding + metadata to another entity - a remote agent or another application. + """ + + serverURL = Attribute('URL of the server the resources will be ' + 'transferred to. The URL also determines the ' + 'transfer protocol, e.g. HTTP or FTP.') + method = Attribute('Transport method, e.g. PUT.') + machineName = Attribute('Name under which the local machine is ' + 'known to the server.') + userName = Attribute('User name for logging in to the server.') + password = Attribute('Password for logging in to the server.') + + def transfer(resource): + """ Transfer the resource (an object providing IResource) + to the server and return a Deferred. + """ + + +# job control + +class IController(Interface): + """ Fetches agent and job specifications from a control + storage and updates the storage with the status and result + information. + """ + + +class IScheduler(Interface): + """ Manages jobs and cares that they are started at the appropriate + time by the agents responsible for it. + """ + + def schedule(job, agent, startTime=None): + """ Register the job given for execution at the intended start + date/time (an integer timestamp) and return the job. + + If the start time is not given schedule the job for immediate + start. Return the start time with which the job has been + scheduled - this may be different from the start time + supplied. + """ + + def getJobsToExecute(startTime=None, agents=None): + """ Return a collection of jobs that are scheduled for execution at + or before the date/time given. + + If ``startTime`` is None the current date/time is used. + If ``agents`` is not None return only jobs for the agents + given. + """ + + +# jobs + +class IScheduledJob(Interface): + """ A job that will be executed on some external triggering at + a predefined date and time - this is the basic job interface. + """ + + agent = Attribute('Agent responsible for executing the job.') + startTime = Attribute('Date/time at which the job should be executed.') + params = Attribute('Mapping with key/value pairs to be used by ' + 'the ``execute()`` method.') + repeat = Attribute('Number of seconds after which the job should be ' + 'rescheduled. Do not repeat if 0.') + successors = Attribute('Jobs to execute immediately after this ' + 'one has been finished.') + whenStarted = Attribute('A callable with one argument (the job) that will ' + 'be called when the job has started.') + whenfinished = Attribute('A callable with two arguments, the job and the ' + 'result of running the job, that will be called when ' + 'the job has finished.') + + def execute(params=None): + """ Execute the job, typically by calling the ``execute()`` method + of the agent responsible for it. + """ + + def reschedule(startTime): + """ Re-schedule the job, setting the date/time the job should be + executed again. + """ + + +class ICrawlingJob(IScheduledJob): + """ A job specifying a crawling task. + """ + + predefinedMetadata = Attribute('A mapping with metadata to be used ' + 'for all resources found.') + + +class ITransportJob(IScheduledJob): + """ A job managing the the transfer of a resource to the server. + """ + + transporter = Attribute('The transporter agent to use for transfer.') + + +# information objects + +class IResource(Interface): + """ Represents a data object that is collected by a crawler and + will be transferred to the server. + """ + + data = Attribute('A string, file, or similar representation of the ' + 'resource\'s content; may be None if the receiver of ' + 'the information can retrieve the date from the path ' + 'given.') + path = Attribute('A filesystem path or some other information ' + 'uniquely identifying the resource on the client ' + 'machine for the current user.') + application = Attribute('The name of the application that provided ' + 'the resource, e.g. "filesystem" or "mail".') + metadata = Attribute('Information describing this resource; ' + 'should be an IMetadataSet object.') + + +class IMetadataSet(Interface): + """ Metadata associated with a resource; a mapping. + """ + + def asXML(): + """ Return an XML string representing the metadata set. + + If this metadata set contains other metadata sets + (nested metadata) these will be converted to XML as well. + """ + + def set(key, value): + """ Set a metadata element. + + The value may be a string or another metadata set + (nested metadata). + """ + + +# logging + +class ILogger(Interface): + """ Ordered collection (list) of log records, probably stored on some + external device. + """ + + externalLoggers = Attribute('A collection of logger objects ' + 'to which the logging records should be written.') + + def setup(): + """ Initialize the logger with the current configuration settings. + """ + + def log(data): + """ Record the information given by the ``data`` argument + (a mapping). + """ + + +class ILogRecord(Interface): + """ + """ + + def __str__(): + """ Return a string representation suitable for writing to a + log file. + """ + diff --git a/agent/tests.py b/agent/tests.py new file mode 100755 index 0000000..99cf95e --- /dev/null +++ b/agent/tests.py @@ -0,0 +1,52 @@ +#! /usr/bin/env python + +# $Id$ + +import os, time +import unittest, doctest +from zope.testing.doctestunit import DocFileSuite +from twisted.internet import reactor +#from twisted.internet.defer import Deferred +#from twisted.trial import unittest as trial_unittest + +baseDir = os.path.dirname(__file__) + + +class Tester(object): + """ Used for controlled execution of reactor iteration cycles. + """ + + def iterate(self, n=10, delays={}): + for i in range(n): + delay = delays.get(i, 0) + reactor.iterate(delay) + +tester = Tester() + + +class Test(unittest.TestCase): + "Basic tests for the cybertools.agent package." + + def setUp(self): + pass + + def tearDown(self): + pass + + def testBasicStuff(self): + pass + + +def test_suite(): + flags = doctest.NORMALIZE_WHITESPACE | doctest.ELLIPSIS + testSuite = unittest.TestSuite(( + unittest.makeSuite(Test), + DocFileSuite('README.txt', optionflags=flags), + DocFileSuite('core/README.txt', optionflags=flags), + #DocFileSuite('transport/httpput.txt', optionflags=flags), + )) + return testSuite + + +if __name__ == '__main__': + standard_unittest.main(defaultTest='test_suite') diff --git a/agent/transport/__init__.py b/agent/transport/__init__.py new file mode 100644 index 0000000..4bc90fb --- /dev/null +++ b/agent/transport/__init__.py @@ -0,0 +1,4 @@ +""" +$Id$ +""" + diff --git a/agent/util/__init__.py b/agent/util/__init__.py new file mode 100644 index 0000000..4bc90fb --- /dev/null +++ b/agent/util/__init__.py @@ -0,0 +1,4 @@ +""" +$Id$ +""" + diff --git a/agent/util/task.py b/agent/util/task.py new file mode 100644 index 0000000..224d704 --- /dev/null +++ b/agent/util/task.py @@ -0,0 +1,368 @@ +# -*- test-case-name: twisted.test.test_task -*- +# Copyright (c) 2001-2004 Twisted Matrix Laboratories. +# See LICENSE for details. + + +"""Scheduling utility methods and classes. + +API Stability: Unstable + +@author: U{Jp Calderone} +""" + +__metaclass__ = type + +import time + +from twisted.python.runtime import seconds +from twisted.python import reflect + +from twisted.internet import base, defer + + +class LoopingCall: + """Call a function repeatedly. + + @ivar f: The function to call. + @ivar a: A tuple of arguments to pass the function. + @ivar kw: A dictionary of keyword arguments to pass to the function. + + If C{f} returns a deferred, rescheduling will not take place until the + deferred has fired. The result value is ignored. + """ + + call = None + running = False + deferred = None + interval = None + count = None + starttime = None + + def _callLater(self, delay): + from twisted.internet import reactor + return reactor.callLater(delay, self) + + _seconds = staticmethod(seconds) + + def __init__(self, f, *a, **kw): + self.f = f + self.a = a + self.kw = kw + + def start(self, interval, now=True): + """Start running function every interval seconds. + + @param interval: The number of seconds between calls. May be + less than one. Precision will depend on the underlying + platform, the available hardware, and the load on the system. + + @param now: If True, run this call right now. Otherwise, wait + until the interval has elapsed before beginning. + + @return: A Deferred whose callback will be invoked with + C{self} when C{self.stop} is called, or whose errback will be + invoked when the function raises an exception or returned a + deferred that has its errback invoked. + """ + assert not self.running, ("Tried to start an already running " + "LoopingCall.") + if interval < 0: + raise ValueError, "interval must be >= 0" + self.running = True + d = self.deferred = defer.Deferred() + self.starttime = self._seconds() + self.count = 0 + self.interval = interval + if now: + self() + else: + self._reschedule() + return d + + def stop(self): + """Stop running function. + """ + assert self.running, ("Tried to stop a LoopingCall that was " + "not running.") + self.running = False + if self.call is not None: + self.call.cancel() + self.call = None + d, self.deferred = self.deferred, None + d.callback(self) + + def __call__(self): + def cb(result): + if self.running: + self._reschedule() + else: + d, self.deferred = self.deferred, None + d.callback(self) + + def eb(failure): + self.running = False + d, self.deferred = self.deferred, None + d.errback(failure) + + self.call = None + d = defer.maybeDeferred(self.f, *self.a, **self.kw) + d.addCallback(cb) + d.addErrback(eb) + + def _reschedule(self): + if self.interval == 0: + self.call = self._callLater(0) + return + + fromNow = self.starttime - self._seconds() + + while self.running: + self.count += 1 + fromStart = self.count * self.interval + delay = fromNow + fromStart + if delay > 0: + self.call = self._callLater(delay) + return + + def __repr__(self): + if hasattr(self.f, 'func_name'): + func = self.f.func_name + if hasattr(self.f, 'im_class'): + func = self.f.im_class.__name__ + '.' + func + else: + func = reflect.safe_repr(self.f) + + return 'LoopingCall<%r>(%s, *%s, **%s)' % ( + self.interval, func, reflect.safe_repr(self.a), + reflect.safe_repr(self.kw)) + + + +class SchedulerStopped(Exception): + """ + The operation could not complete because the scheduler was stopped in + progress or was already stopped. + """ + + + +class _Timer(object): + MAX_SLICE = 0.01 + def __init__(self): + self.end = time.time() + self.MAX_SLICE + + + def __call__(self): + return time.time() >= self.end + + + +_EPSILON = 0.00000001 +def _defaultScheduler(x): + from twisted.internet import reactor + return reactor.callLater(_EPSILON, x) + + + +class Cooperator(object): + """ + Cooperative task scheduler. + """ + + def __init__(self, + terminationPredicateFactory=_Timer, + scheduler=_defaultScheduler, + started=True): + """ + Create a scheduler-like object to which iterators may be added. + + @param terminationPredicateFactory: A no-argument callable which will + be invoked at the beginning of each step and should return a + no-argument callable which will return False when the step should be + terminated. The default factory is time-based and allows iterators to + run for 1/100th of a second at a time. + + @param scheduler: A one-argument callable which takes a no-argument + callable and should invoke it at some future point. This will be used + to schedule each step of this Cooperator. + + @param started: A boolean which indicates whether iterators should be + stepped as soon as they are added, or if they will be queued up until + L{Cooperator.start} is called. + """ + self.iterators = [] + self._metarator = iter(()) + self._terminationPredicateFactory = terminationPredicateFactory + self._scheduler = scheduler + self._delayedCall = None + self._stopped = False + self._started = started + + + def coiterate(self, iterator, doneDeferred=None): + """ + Add an iterator to the list of iterators I am currently running. + + @return: a Deferred that will fire when the iterator finishes. + """ + if doneDeferred is None: + doneDeferred = defer.Deferred() + if self._stopped: + doneDeferred.errback(SchedulerStopped()) + return doneDeferred + self.iterators.append((iterator, doneDeferred)) + self._reschedule() + return doneDeferred + + + def _tasks(self): + terminator = self._terminationPredicateFactory() + while self.iterators: + for i in self._metarator: + yield i + if terminator(): + return + self._metarator = iter(self.iterators) + + + def _tick(self): + """ + Run one scheduler tick. + """ + self._delayedCall = None + for taskObj in self._tasks(): + iterator, doneDeferred = taskObj + try: + result = iterator.next() + except StopIteration: + self.iterators.remove(taskObj) + doneDeferred.callback(iterator) + except: + self.iterators.remove(taskObj) + doneDeferred.errback() + else: + if isinstance(result, defer.Deferred): + self.iterators.remove(taskObj) + def cbContinue(result, taskObj=taskObj): + self.coiterate(*taskObj) + result.addCallbacks(cbContinue, doneDeferred.errback) + self._reschedule() + + + _mustScheduleOnStart = False + def _reschedule(self): + if not self._started: + self._mustScheduleOnStart = True + return + if self._delayedCall is None and self.iterators: + self._delayedCall = self._scheduler(self._tick) + + + def start(self): + """ + Begin scheduling steps. + """ + self._stopped = False + self._started = True + if self._mustScheduleOnStart: + del self._mustScheduleOnStart + self._reschedule() + + + def stop(self): + """ + Stop scheduling steps. Errback the completion Deferreds of all + iterators which have been added and forget about them. + """ + self._stopped = True + for iterator, doneDeferred in self.iterators: + doneDeferred.errback(SchedulerStopped()) + self.iterators = [] + if self._delayedCall is not None: + self._delayedCall.cancel() + self._delayedCall = None + + + +_theCooperator = Cooperator() +def coiterate(iterator): + """ + Cooperatively iterate over the given iterator, dividing runtime between it + and all other iterators which have been passed to this function and not yet + exhausted. + """ + return _theCooperator.coiterate(iterator) + + + +class Clock: + """ + Provide a deterministic, easily-controlled implementation of + L{IReactorTime.callLater}. This is commonly useful for writing + deterministic unit tests for code which schedules events using this API. + """ + rightNow = 0.0 + + def __init__(self): + self.calls = [] + + def seconds(self): + """ + Pretend to be time.time(). This is used internally when an operation + such as L{IDelayedCall.reset} needs to determine a a time value + relative to the current time. + + @rtype: C{float} + @return: The time which should be considered the current time. + """ + return self.rightNow + + + def callLater(self, when, what, *a, **kw): + """ + See L{twisted.internet.interfaces.IReactorTime.callLater}. + """ + self.calls.append( + base.DelayedCall(self.seconds() + when, + what, a, kw, + self.calls.remove, + lambda c: None, + self.seconds)) + self.calls.sort(lambda a, b: cmp(a.getTime(), b.getTime())) + return self.calls[-1] + + + def advance(self, amount): + """ + Move time on this clock forward by the given amount and run whatever + pending calls should be run. + + @type amount: C{float} + @param amount: The number of seconds which to advance this clock's + time. + """ + self.rightNow += amount + while self.calls and self.calls[0].getTime() <= self.seconds(): + call = self.calls.pop(0) + call.called = 1 + call.func(*call.args, **call.kw) + + + def pump(self, timings): + """ + Advance incrementally by the given set of times. + + @type timings: iterable of C{float} + """ + for amount in timings: + self.advance(amount) + + + +__all__ = [ + 'LoopingCall', + + 'Clock', + + 'SchedulerStopped', 'Cooperator', 'coiterate', + ] diff --git a/composer/schema/browser/schema_macros.pt b/composer/schema/browser/schema_macros.pt index 708f4dd..ff56d58 100755 --- a/composer/schema/browser/schema_macros.pt +++ b/composer/schema/browser/schema_macros.pt @@ -79,6 +79,7 @@