provide Agent.scheduleJobsFromConfig() as part of the start-up procedure; work in progress: filesystem crawler

git-svn-id: svn://svn.cy55.de/Zope3/src/loops/trunk@1861 fd906abe-77d9-0310-91a1-e0d9ade77398
This commit is contained in:
helmutm 2007-08-01 16:15:29 +00:00
parent 5f97f29bbb
commit b9ba07ad95
9 changed files with 219 additions and 33 deletions

View file

@ -79,6 +79,9 @@ it with a default if not found, in one statement.
>>> config.transport.setdefault('user', 'loops') >>> config.transport.setdefault('user', 'loops')
'loops' 'loops'
>>> sorted(config.transport.items())
[('url', 'http://loops.cy55.de'), ('user', 'loops')]
We can output a configuration in a form that is ready for loading We can output a configuration in a form that is ready for loading
just by converting it to a string representation. just by converting it to a string representation.
@ -142,14 +145,43 @@ How does this work?
We can set up a more realistic example using the dummy crawler and transporter We can set up a more realistic example using the dummy crawler and transporter
classes from the testing package. classes from the testing package.
>>> from loops.agent.testing.crawl import CrawlingJob >>> from loops.agent.testing import crawl
>>> from loops.agent.testing.transport import Transporter, TransportJob >>> from loops.agent.testing import transport
>>> crawl = CrawlingJob() >>> crawlJob = crawl.CrawlingJob()
>>> transporter = Transporter() >>> transporter = transport.Transporter()
>>> transport = TransportJob(transporter) >>> transportJob = transporter.jobFactory(transporter)
>>> crawl.successors.append(transport) >>> crawlJob.successors.append(transportJob)
>>> scheduler.schedule(crawl, int(time())) >>> scheduler.schedule(crawlJob, int(time()))
>>> tester.iterate()
Transferring: Dummy resource data for testing purposes.
Using configuration with scheduling
-----------------------------------
Let's start with a fresh agent, directly supplying the configuration
(just for testing).
>>> config = '''
... crawl[0].type = 'dummy'
... crawl[0].directory = '~/documents'
... crawl[0].pattern = '.*\.doc'
... crawl[0].starttime = %s
... crawl[0].transport = 'dummy'
... crawl[0].repeat = 0
... transport.url = 'http://loops.cy55.de'
... ''' % int(time())
>>> agent = core.Agent(config)
We also register our dummy crawling job and transporter classes as
we can not perform real crawling and transfers when testing.
>>> agent.crawlTypes = dict(dummy=crawl.CrawlingJob)
>>> agent.transportTypes = dict(dummy=transport.Transporter)
>>> agent.scheduleJobsFromConfig()
>>> tester.iterate() >>> tester.iterate()
Transferring: Dummy resource data for testing purposes. Transferring: Dummy resource data for testing purposes.

View file

@ -94,6 +94,11 @@ class ConfigSection(list):
return value return value
return getattr(self, attr) return getattr(self, attr)
def items(self):
for name, value in self.__dict__.items():
if isinstance(value, (str, int)):
yield name, value
def collect(self, ident, result): def collect(self, ident, result):
for idx, element in enumerate(self): for idx, element in enumerate(self):
element.collect('%s[%i]' % (ident, idx), result) element.collect('%s[%i]' % (ident, idx), result)

View file

@ -22,18 +22,54 @@ The real agent stuff.
$Id$ $Id$
""" """
from time import time
from zope.interface import implements from zope.interface import implements
from loops.agent.interfaces import IAgent from loops.agent.interfaces import IAgent
from loops.agent.config import Configurator from loops.agent.config import Configurator
from loops.agent.crawl import filesystem
from loops.agent.schedule import Scheduler from loops.agent.schedule import Scheduler
from loops.agent.transport import httpput
crawlTypes = dict(
filesystem=filesystem.CrawlingJob,
)
transportTypes = dict(
httpput=httpput.Transporter,
)
class Agent(object): class Agent(object):
implements(IAgent) implements(IAgent)
def __init__(self): crawlTypes = crawlTypes
config = self.config = Configurator('ui', 'crawl', 'transport') transportTypes = transportTypes
config.load()
self.scheduler = Scheduler() def __init__(self, conf=None):
config = self.config = Configurator('ui', 'crawl', 'transport')
config.load(conf)
self.scheduler = Scheduler(self)
def scheduleJobsFromConfig(self):
config = self.config
scheduler = self.scheduler
for info in config.crawl:
crawlType = info.type
factory = self.crawlTypes.get(crawlType)
if factory is not None:
job = factory()
job.params = dict((name, value)
for name, value in info.items()
if name not in ('starttime',))
transportType = info.transport or 'httpput'
factory = self.transportTypes.get(transportType)
if factory is not None:
transporter = factory()
# TODO: configure transporter or - better -
# set up transporter(s) just once
job.successors.append(transporter.jobFactory(transporter))
job.repeat = info.repeat or 0
self.scheduler.schedule(job, info.starttime or int(time()))

View file

@ -22,5 +22,45 @@ Filesystem crawler.
$Id$ $Id$
""" """
from loops.agent.interfaces import ICrawlingJob import os
import re
import stat
from twisted.internet.defer import Deferred
from zope.interface import implements
from loops.agent.interfaces import ICrawlingJob, IResource, IMetadataSet
from loops.agent.crawl.base import CrawlingJob as BaseCrawlingJob
class CrawlingJob(BaseCrawlingJob):
def collect(self, **criteria):
deferred = reactor.deferToThread(self.crawlFilesystem, dataAvailable)
return deferred
def dataAvailable(self):
self.deferred.callback([(FileResource(), Metadata())])
def crawlFilesystem(self, **criteria):
directory = criteria.get('directory')
pattern = re.compile(criteria.get('pattern') or '.*')
for path, dirs, files in os.walk(directory):
if '.svn' in dirs:
del dirs[dirs.index('.svn')]
for f in files:
if pattern.match(f):
mtime = os.stat(os.path.join(path, f))[stat.ST_MTIME]
yield (os.path.join(path[len(directory)+1:], f),
datetime.fromtimestamp(mtime))
class Metadata(object):
implements(IMetadataSet)
class FileResource(object):
implements(IResource)
data = 'Dummy resource data for testing purposes.'

View file

@ -0,0 +1,17 @@
=====================================================
loops.agent.crawl.filesystem - The Filesystem Crawler
=====================================================
($Id$)
>>> from loops.agent.tests import tester
>>> from loops.agent.core import Agent
>>> agent = Agent()
>>> from loops.agent.crawl.filesystem import CrawlingJob
>>> from time import time
>>> scheduler = agent.scheduler
>>> scheduler.schedule(CrawlingJob(), int(time()))
>>> tester.iterate()

View file

@ -178,7 +178,7 @@ class IConfigurator(Interface):
path is stored in the ``filename`` attribute. path is stored in the ``filename`` attribute.
""" """
def save(filename=None) def save(filename=None):
""" Save configuration settings to the file given, or to the """ Save configuration settings to the file given, or to the
file from which it was loaded, or to the default location. file from which it was loaded, or to the default location.
""" """

View file

@ -34,7 +34,8 @@ class Scheduler(object):
implements(IScheduler) implements(IScheduler)
def __init__(self): def __init__(self, agent):
self.agent = agent
self.queue = {} self.queue = {}
self.logger = None self.logger = None
@ -54,9 +55,9 @@ class Job(object):
def __init__(self): def __init__(self):
self.startTime = 0 self.startTime = 0
self.scheduler = None
self.params = {} self.params = {}
self.successors = [] self.successors = []
self.repeat = 0
def execute(self, **kw): def execute(self, **kw):
d = Deferred() d = Deferred()
@ -76,7 +77,8 @@ class Job(object):
job.run(**job.params) job.run(**job.params)
# TODO: remove from queue # TODO: remove from queue
# TODO: logging # TODO: logging
# TODO: reschedule if told by configuration if self.repeat:
self.reschedule(int(time() + self.repeat))
def copy(self): def copy(self):
newJob = Job() newJob = Job()

View file

@ -30,10 +30,30 @@ from loops.agent.interfaces import ITransportJob, ITransporter
from loops.agent.schedule import Job from loops.agent.schedule import Job
class TransportJob(Job):
implements(ITransportJob)
def __init__(self, transporter):
super(TransportJob, self).__init__()
self.transporter = transporter
def execute(self, **kw):
result = kw.get('result')
if result is None:
print 'No data available.'
else:
for r in result:
d = self.transporter.transfer(r[0].data, r[1], str)
return Deferred()
class Transporter(object): class Transporter(object):
implements(ITransporter) implements(ITransporter)
jobFactory = TransportJob
serverURL = None serverURL = None
method = None method = None
machineName = None machineName = None
@ -50,19 +70,3 @@ class Transporter(object):
return Deferred() return Deferred()
class TransportJob(Job):
implements(ITransportJob)
def __init__(self, transporter):
super(TransportJob, self).__init__()
self.transporter = transporter
def execute(self, **kw):
result = kw.get('result')
if result is None:
print 'No data available.'
else:
for r in result:
d = self.transporter.transfer(r[0].data, r[1], str)
return Deferred()

View file

@ -22,5 +22,55 @@ Transferring of data/files to the server.
$Id$ $Id$
""" """
from loops.agent.interfaces import ITransporter from twisted.internet import reactor
from twisted.internet.defer import Deferred
from zope.interface import implements
from loops.agent.interfaces import ITransporter, ITransportJob
from loops.agent.schedule import Job
class TransportJob(Job):
implements(ITransportJob)
def __init__(self, transporter):
super(TransportJob, self).__init__()
self.transporter = transporter
def execute(self, **kw):
result = kw.get('result')
if result is None:
print 'No data available.'
else:
for r in result:
d = self.transporter.transfer(r[0].data, r[1], str)
return Deferred()
class Transporter(object):
implements(ITransporter)
jobFactory = TransportJob
serverURL = None
method = None
machineName = None
userName = None
password = None
def __init__(self, agent):
self.agent = agent
config = agent.config
def transfer(self, resource, metadata=None, resourceType=file):
if resourceType is file:
data = resource.read()
resource.close()
elif resourceType is str:
data = resource
print 'Transferring:', data
return Deferred()