From b9ba07ad95adffaa738b22f696ceaa57ea3c3040 Mon Sep 17 00:00:00 2001 From: helmutm Date: Wed, 1 Aug 2007 16:15:29 +0000 Subject: [PATCH] provide Agent.scheduleJobsFromConfig() as part of the start-up procedure; work in progress: filesystem crawler git-svn-id: svn://svn.cy55.de/Zope3/src/loops/trunk@1861 fd906abe-77d9-0310-91a1-e0d9ade77398 --- agent/README.txt | 46 ++++++++++++++++++++++++++++----- agent/config.py | 5 ++++ agent/core.py | 44 +++++++++++++++++++++++++++++--- agent/crawl/filesystem.py | 42 +++++++++++++++++++++++++++++- agent/crawl/filesystem.txt | 17 +++++++++++++ agent/interfaces.py | 2 +- agent/schedule.py | 8 +++--- agent/testing/transport.py | 36 ++++++++++++++------------ agent/transport/httpput.py | 52 +++++++++++++++++++++++++++++++++++++- 9 files changed, 219 insertions(+), 33 deletions(-) create mode 100644 agent/crawl/filesystem.txt diff --git a/agent/README.txt b/agent/README.txt index 9767b50..39b9219 100644 --- a/agent/README.txt +++ b/agent/README.txt @@ -79,6 +79,9 @@ it with a default if not found, in one statement. >>> config.transport.setdefault('user', 'loops') 'loops' + >>> sorted(config.transport.items()) + [('url', 'http://loops.cy55.de'), ('user', 'loops')] + We can output a configuration in a form that is ready for loading just by converting it to a string representation. @@ -142,14 +145,43 @@ How does this work? We can set up a more realistic example using the dummy crawler and transporter classes from the testing package. - >>> from loops.agent.testing.crawl import CrawlingJob - >>> from loops.agent.testing.transport import Transporter, TransportJob + >>> from loops.agent.testing import crawl + >>> from loops.agent.testing import transport - >>> crawl = CrawlingJob() - >>> transporter = Transporter() - >>> transport = TransportJob(transporter) - >>> crawl.successors.append(transport) - >>> scheduler.schedule(crawl, int(time())) + >>> crawlJob = crawl.CrawlingJob() + >>> transporter = transport.Transporter() + >>> transportJob = transporter.jobFactory(transporter) + >>> crawlJob.successors.append(transportJob) + >>> scheduler.schedule(crawlJob, int(time())) + + >>> tester.iterate() + Transferring: Dummy resource data for testing purposes. + +Using configuration with scheduling +----------------------------------- + +Let's start with a fresh agent, directly supplying the configuration +(just for testing). + + >>> config = ''' + ... crawl[0].type = 'dummy' + ... crawl[0].directory = '~/documents' + ... crawl[0].pattern = '.*\.doc' + ... crawl[0].starttime = %s + ... crawl[0].transport = 'dummy' + ... crawl[0].repeat = 0 + ... transport.url = 'http://loops.cy55.de' + ... ''' % int(time()) + + >>> agent = core.Agent(config) + +We also register our dummy crawling job and transporter classes as +we can not perform real crawling and transfers when testing. + + >>> agent.crawlTypes = dict(dummy=crawl.CrawlingJob) + >>> agent.transportTypes = dict(dummy=transport.Transporter) + + >>> agent.scheduleJobsFromConfig() >>> tester.iterate() Transferring: Dummy resource data for testing purposes. diff --git a/agent/config.py b/agent/config.py index 887af3e..fce202c 100644 --- a/agent/config.py +++ b/agent/config.py @@ -94,6 +94,11 @@ class ConfigSection(list): return value return getattr(self, attr) + def items(self): + for name, value in self.__dict__.items(): + if isinstance(value, (str, int)): + yield name, value + def collect(self, ident, result): for idx, element in enumerate(self): element.collect('%s[%i]' % (ident, idx), result) diff --git a/agent/core.py b/agent/core.py index 2b85374..2e49e42 100644 --- a/agent/core.py +++ b/agent/core.py @@ -22,18 +22,54 @@ The real agent stuff. $Id$ """ +from time import time from zope.interface import implements from loops.agent.interfaces import IAgent from loops.agent.config import Configurator +from loops.agent.crawl import filesystem from loops.agent.schedule import Scheduler +from loops.agent.transport import httpput + + +crawlTypes = dict( + filesystem=filesystem.CrawlingJob, +) + +transportTypes = dict( + httpput=httpput.Transporter, +) class Agent(object): implements(IAgent) - def __init__(self): - config = self.config = Configurator('ui', 'crawl', 'transport') - config.load() - self.scheduler = Scheduler() + crawlTypes = crawlTypes + transportTypes = transportTypes + + def __init__(self, conf=None): + config = self.config = Configurator('ui', 'crawl', 'transport') + config.load(conf) + self.scheduler = Scheduler(self) + + def scheduleJobsFromConfig(self): + config = self.config + scheduler = self.scheduler + for info in config.crawl: + crawlType = info.type + factory = self.crawlTypes.get(crawlType) + if factory is not None: + job = factory() + job.params = dict((name, value) + for name, value in info.items() + if name not in ('starttime',)) + transportType = info.transport or 'httpput' + factory = self.transportTypes.get(transportType) + if factory is not None: + transporter = factory() + # TODO: configure transporter or - better - + # set up transporter(s) just once + job.successors.append(transporter.jobFactory(transporter)) + job.repeat = info.repeat or 0 + self.scheduler.schedule(job, info.starttime or int(time())) diff --git a/agent/crawl/filesystem.py b/agent/crawl/filesystem.py index 776634c..33c2c41 100644 --- a/agent/crawl/filesystem.py +++ b/agent/crawl/filesystem.py @@ -22,5 +22,45 @@ Filesystem crawler. $Id$ """ -from loops.agent.interfaces import ICrawlingJob +import os +import re +import stat +from twisted.internet.defer import Deferred +from zope.interface import implements +from loops.agent.interfaces import ICrawlingJob, IResource, IMetadataSet +from loops.agent.crawl.base import CrawlingJob as BaseCrawlingJob + + +class CrawlingJob(BaseCrawlingJob): + + def collect(self, **criteria): + deferred = reactor.deferToThread(self.crawlFilesystem, dataAvailable) + return deferred + + def dataAvailable(self): + self.deferred.callback([(FileResource(), Metadata())]) + + def crawlFilesystem(self, **criteria): + directory = criteria.get('directory') + pattern = re.compile(criteria.get('pattern') or '.*') + for path, dirs, files in os.walk(directory): + if '.svn' in dirs: + del dirs[dirs.index('.svn')] + for f in files: + if pattern.match(f): + mtime = os.stat(os.path.join(path, f))[stat.ST_MTIME] + yield (os.path.join(path[len(directory)+1:], f), + datetime.fromtimestamp(mtime)) + + +class Metadata(object): + + implements(IMetadataSet) + + +class FileResource(object): + + implements(IResource) + + data = 'Dummy resource data for testing purposes.' diff --git a/agent/crawl/filesystem.txt b/agent/crawl/filesystem.txt new file mode 100644 index 0000000..c5538dd --- /dev/null +++ b/agent/crawl/filesystem.txt @@ -0,0 +1,17 @@ +===================================================== +loops.agent.crawl.filesystem - The Filesystem Crawler +===================================================== + + ($Id$) + + >>> from loops.agent.tests import tester + >>> from loops.agent.core import Agent + + >>> agent = Agent() + >>> from loops.agent.crawl.filesystem import CrawlingJob + + >>> from time import time + >>> scheduler = agent.scheduler + >>> scheduler.schedule(CrawlingJob(), int(time())) + + >>> tester.iterate() diff --git a/agent/interfaces.py b/agent/interfaces.py index 4fd58cd..19b3f0c 100644 --- a/agent/interfaces.py +++ b/agent/interfaces.py @@ -178,7 +178,7 @@ class IConfigurator(Interface): path is stored in the ``filename`` attribute. """ - def save(filename=None) + def save(filename=None): """ Save configuration settings to the file given, or to the file from which it was loaded, or to the default location. """ diff --git a/agent/schedule.py b/agent/schedule.py index fbcf10a..8a3653f 100644 --- a/agent/schedule.py +++ b/agent/schedule.py @@ -34,7 +34,8 @@ class Scheduler(object): implements(IScheduler) - def __init__(self): + def __init__(self, agent): + self.agent = agent self.queue = {} self.logger = None @@ -54,9 +55,9 @@ class Job(object): def __init__(self): self.startTime = 0 - self.scheduler = None self.params = {} self.successors = [] + self.repeat = 0 def execute(self, **kw): d = Deferred() @@ -76,7 +77,8 @@ class Job(object): job.run(**job.params) # TODO: remove from queue # TODO: logging - # TODO: reschedule if told by configuration + if self.repeat: + self.reschedule(int(time() + self.repeat)) def copy(self): newJob = Job() diff --git a/agent/testing/transport.py b/agent/testing/transport.py index e72c07c..cee30ff 100644 --- a/agent/testing/transport.py +++ b/agent/testing/transport.py @@ -30,10 +30,30 @@ from loops.agent.interfaces import ITransportJob, ITransporter from loops.agent.schedule import Job +class TransportJob(Job): + + implements(ITransportJob) + + def __init__(self, transporter): + super(TransportJob, self).__init__() + self.transporter = transporter + + def execute(self, **kw): + result = kw.get('result') + if result is None: + print 'No data available.' + else: + for r in result: + d = self.transporter.transfer(r[0].data, r[1], str) + return Deferred() + + class Transporter(object): implements(ITransporter) + jobFactory = TransportJob + serverURL = None method = None machineName = None @@ -50,19 +70,3 @@ class Transporter(object): return Deferred() -class TransportJob(Job): - - implements(ITransportJob) - - def __init__(self, transporter): - super(TransportJob, self).__init__() - self.transporter = transporter - - def execute(self, **kw): - result = kw.get('result') - if result is None: - print 'No data available.' - else: - for r in result: - d = self.transporter.transfer(r[0].data, r[1], str) - return Deferred() diff --git a/agent/transport/httpput.py b/agent/transport/httpput.py index 684e353..52a3306 100644 --- a/agent/transport/httpput.py +++ b/agent/transport/httpput.py @@ -22,5 +22,55 @@ Transferring of data/files to the server. $Id$ """ -from loops.agent.interfaces import ITransporter +from twisted.internet import reactor +from twisted.internet.defer import Deferred +from zope.interface import implements + +from loops.agent.interfaces import ITransporter, ITransportJob +from loops.agent.schedule import Job + + +class TransportJob(Job): + + implements(ITransportJob) + + def __init__(self, transporter): + super(TransportJob, self).__init__() + self.transporter = transporter + + def execute(self, **kw): + result = kw.get('result') + if result is None: + print 'No data available.' + else: + for r in result: + d = self.transporter.transfer(r[0].data, r[1], str) + return Deferred() + + +class Transporter(object): + + implements(ITransporter) + + jobFactory = TransportJob + + serverURL = None + method = None + machineName = None + userName = None + password = None + + def __init__(self, agent): + self.agent = agent + config = agent.config + + def transfer(self, resource, metadata=None, resourceType=file): + if resourceType is file: + data = resource.read() + resource.close() + elif resourceType is str: + data = resource + print 'Transferring:', data + return Deferred() +