diff --git a/agent/crawl/filesystem.txt b/agent/crawl/filesystem.txt index 94217b0..5bb1302 100644 --- a/agent/crawl/filesystem.txt +++ b/agent/crawl/filesystem.txt @@ -35,8 +35,8 @@ We are now ready to schedule the job and let the reactor execute it. >>> startTime = scheduler.schedule(crawlJob) >>> tester.iterate() - Metadata: {'path': '...data...file1.txt'} - Transferring: Data from file1.txt Metadata: {'path': '...data...subdir...file2.txt'} Transferring: Data from file2.txt + Metadata: {'path': '...data...file1.txt'} + Transferring: Data from file1.txt diff --git a/agent/transport/base.py b/agent/transport/base.py index 67c4fbb..ceff3dc 100644 --- a/agent/transport/base.py +++ b/agent/transport/base.py @@ -24,7 +24,7 @@ $Id$ from base64 import b64encode from twisted.internet import reactor -from twisted.internet.defer import Deferred, DeferredList, fail +from twisted.internet.defer import Deferred, fail from twisted.web.client import getPage from zope.interface import implements @@ -39,26 +39,36 @@ class TransportJob(Job): def __init__(self, transporter): super(TransportJob, self).__init__() self.transporter = transporter + self.resources = [] + self.deferred = None def execute(self): result = self.params.get('result') if result is None: return fail('No data available.') - transfers = [] - for resource in result: - d = self.transporter.transfer(resource) - transfers.append(d) - d.addCallback(self.logTransfer) - d.addErrback(self.logError) - return DeferredList(transfers) + self.resources = result + self.deferred = Deferred() + d = self.transporter.transfer(self.resources.pop()) + d.addCallback(self.transferDone).addErrback(self.logError) + return self.deferred + + def transferDone(self, result): + self.logTransfer(result) + if self.resources: + d = self.transporter.transfer(self.resources.pop()) + d.addCallback(self.transferDone).addErrback(self.logError) + else: + self.deferred.callback('OK') def logTransfer(self, result, err=None): + #print 'transfer successful; remaining:', len(self.resources) # TODO: logging # self.transporter.agent.logger.log(...) pass def logError(self, error): print '*** error on transfer', self.transporter.serverURL, error + self.deferred.errback(error) class Transporter(object): @@ -77,6 +87,7 @@ class Transporter(object): self.agent = agent for k, v in params.items(): setattr(self, k ,v) + self.deferred = None def createJob(self): return self.jobFactory(self) @@ -88,22 +99,27 @@ class Transporter(object): data.close() else: text = data - # TODO: encode text (?) - # TODO: set headers: Content-Type, Authorization, User-Agent path = resource.path app = resource.application - deferreds = [] metadata = resource.metadata - auth = b64encode(':'.join((self.userName, self.password))) + auth = b64encode(self.userName + ':' + self.password) headers = {'Authorization': 'Basic ' + auth} - if metadata is not None: - url = self.makePath('.meta', app, path, 'xml') - deferreds.append(getPage(url, method=self.method, headers=headers, - postdata=metadata.asXML())) url = self.makePath('.data', app, path) - deferreds.append(getPage(url, method=self.method, headers=headers, - postdata=text)) - return DeferredList(deferreds, fireOnOneErrback=True) + d = getPage(url, method=self.method, headers=headers, postdata=text) + if metadata is None: + d.addCallback(self.finished) + else: + d.addCallback(self.transferMeta, app, path, headers, metadata.asXML()) + self.deferred = Deferred() + return self.deferred + + def transferMeta(self, result, app, path, headers, text): + url = self.makePath('.meta', app, path) + d = getPage(url, method=self.method, headers=headers, postdata=text) + d.addCallback(self.finished) + + def finished(self, result): + self.deferred.callback(result) def makePath(self, infoType, app, path, extension=None): if path.startswith('/'): diff --git a/integrator/put.py b/integrator/put.py index 809906d..dd2d04c 100644 --- a/integrator/put.py +++ b/integrator/put.py @@ -99,9 +99,9 @@ class ResourceManagerTraverser(ItemTraverser): obj.contentType = contentType obj.title = title #obj.data = data - notify(ObjectModifiedEvent(resource)) # TODO: provide basic concept assignments (collections) - IExternalSourceInfo(resource).externalIdentifier == identifier + IExternalSourceInfo(resource).externalIdentifier = identifier + notify(ObjectCreatedEvent(resource)) return resource def generateName(self, name): diff --git a/integrator/source.py b/integrator/source.py index 0653cbf..c816751 100644 --- a/integrator/source.py +++ b/integrator/source.py @@ -48,6 +48,9 @@ class ExternalSourceInfo(object): def getExternalIdentifier(self): return self.getSourceInfo().get('externalIdentifier') def setExternalIdentifier(self, value): - self.getSourceInfo()['externalIdentifier'] = value + info = self.getSourceInfo() + if not info: + setattr(self.context, sourceInfoAttrName, info) + info['externalIdentifier'] = value externalIdentifier = property(getExternalIdentifier, setExternalIdentifier)