upload of resources from loops.agent via HTTP PUT basically working
git-svn-id: svn://svn.cy55.de/Zope3/src/loops/trunk@1955 fd906abe-77d9-0310-91a1-e0d9ade77398
This commit is contained in:
parent
a689fb09bb
commit
57a5f126b5
4 changed files with 43 additions and 24 deletions
|
@ -35,8 +35,8 @@ We are now ready to schedule the job and let the reactor execute it.
|
|||
>>> startTime = scheduler.schedule(crawlJob)
|
||||
|
||||
>>> tester.iterate()
|
||||
Metadata: {'path': '...data...file1.txt'}
|
||||
Transferring: Data from file1.txt
|
||||
Metadata: {'path': '...data...subdir...file2.txt'}
|
||||
Transferring: Data from file2.txt
|
||||
Metadata: {'path': '...data...file1.txt'}
|
||||
Transferring: Data from file1.txt
|
||||
|
||||
|
|
|
@ -24,7 +24,7 @@ $Id$
|
|||
|
||||
from base64 import b64encode
|
||||
from twisted.internet import reactor
|
||||
from twisted.internet.defer import Deferred, DeferredList, fail
|
||||
from twisted.internet.defer import Deferred, fail
|
||||
from twisted.web.client import getPage
|
||||
from zope.interface import implements
|
||||
|
||||
|
@ -39,26 +39,36 @@ class TransportJob(Job):
|
|||
def __init__(self, transporter):
|
||||
super(TransportJob, self).__init__()
|
||||
self.transporter = transporter
|
||||
self.resources = []
|
||||
self.deferred = None
|
||||
|
||||
def execute(self):
|
||||
result = self.params.get('result')
|
||||
if result is None:
|
||||
return fail('No data available.')
|
||||
transfers = []
|
||||
for resource in result:
|
||||
d = self.transporter.transfer(resource)
|
||||
transfers.append(d)
|
||||
d.addCallback(self.logTransfer)
|
||||
d.addErrback(self.logError)
|
||||
return DeferredList(transfers)
|
||||
self.resources = result
|
||||
self.deferred = Deferred()
|
||||
d = self.transporter.transfer(self.resources.pop())
|
||||
d.addCallback(self.transferDone).addErrback(self.logError)
|
||||
return self.deferred
|
||||
|
||||
def transferDone(self, result):
|
||||
self.logTransfer(result)
|
||||
if self.resources:
|
||||
d = self.transporter.transfer(self.resources.pop())
|
||||
d.addCallback(self.transferDone).addErrback(self.logError)
|
||||
else:
|
||||
self.deferred.callback('OK')
|
||||
|
||||
def logTransfer(self, result, err=None):
|
||||
#print 'transfer successful; remaining:', len(self.resources)
|
||||
# TODO: logging
|
||||
# self.transporter.agent.logger.log(...)
|
||||
pass
|
||||
|
||||
def logError(self, error):
|
||||
print '*** error on transfer', self.transporter.serverURL, error
|
||||
self.deferred.errback(error)
|
||||
|
||||
|
||||
class Transporter(object):
|
||||
|
@ -77,6 +87,7 @@ class Transporter(object):
|
|||
self.agent = agent
|
||||
for k, v in params.items():
|
||||
setattr(self, k ,v)
|
||||
self.deferred = None
|
||||
|
||||
def createJob(self):
|
||||
return self.jobFactory(self)
|
||||
|
@ -88,22 +99,27 @@ class Transporter(object):
|
|||
data.close()
|
||||
else:
|
||||
text = data
|
||||
# TODO: encode text (?)
|
||||
# TODO: set headers: Content-Type, Authorization, User-Agent
|
||||
path = resource.path
|
||||
app = resource.application
|
||||
deferreds = []
|
||||
metadata = resource.metadata
|
||||
auth = b64encode(':'.join((self.userName, self.password)))
|
||||
auth = b64encode(self.userName + ':' + self.password)
|
||||
headers = {'Authorization': 'Basic ' + auth}
|
||||
if metadata is not None:
|
||||
url = self.makePath('.meta', app, path, 'xml')
|
||||
deferreds.append(getPage(url, method=self.method, headers=headers,
|
||||
postdata=metadata.asXML()))
|
||||
url = self.makePath('.data', app, path)
|
||||
deferreds.append(getPage(url, method=self.method, headers=headers,
|
||||
postdata=text))
|
||||
return DeferredList(deferreds, fireOnOneErrback=True)
|
||||
d = getPage(url, method=self.method, headers=headers, postdata=text)
|
||||
if metadata is None:
|
||||
d.addCallback(self.finished)
|
||||
else:
|
||||
d.addCallback(self.transferMeta, app, path, headers, metadata.asXML())
|
||||
self.deferred = Deferred()
|
||||
return self.deferred
|
||||
|
||||
def transferMeta(self, result, app, path, headers, text):
|
||||
url = self.makePath('.meta', app, path)
|
||||
d = getPage(url, method=self.method, headers=headers, postdata=text)
|
||||
d.addCallback(self.finished)
|
||||
|
||||
def finished(self, result):
|
||||
self.deferred.callback(result)
|
||||
|
||||
def makePath(self, infoType, app, path, extension=None):
|
||||
if path.startswith('/'):
|
||||
|
|
|
@ -99,9 +99,9 @@ class ResourceManagerTraverser(ItemTraverser):
|
|||
obj.contentType = contentType
|
||||
obj.title = title
|
||||
#obj.data = data
|
||||
notify(ObjectModifiedEvent(resource))
|
||||
# TODO: provide basic concept assignments (collections)
|
||||
IExternalSourceInfo(resource).externalIdentifier == identifier
|
||||
IExternalSourceInfo(resource).externalIdentifier = identifier
|
||||
notify(ObjectCreatedEvent(resource))
|
||||
return resource
|
||||
|
||||
def generateName(self, name):
|
||||
|
|
|
@ -48,6 +48,9 @@ class ExternalSourceInfo(object):
|
|||
def getExternalIdentifier(self):
|
||||
return self.getSourceInfo().get('externalIdentifier')
|
||||
def setExternalIdentifier(self, value):
|
||||
self.getSourceInfo()['externalIdentifier'] = value
|
||||
info = self.getSourceInfo()
|
||||
if not info:
|
||||
setattr(self.context, sourceInfoAttrName, info)
|
||||
info['externalIdentifier'] = value
|
||||
externalIdentifier = property(getExternalIdentifier, setExternalIdentifier)
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue