remote transport working via test script
git-svn-id: svn://svn.cy55.de/Zope3/src/cybertools/trunk@2769 fd906abe-77d9-0310-91a1-e0d9ade77398
This commit is contained in:
parent
bfd6191683
commit
64ccc2cc94
6 changed files with 32 additions and 53 deletions
|
@ -24,10 +24,6 @@ $Id$
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
print "****printing sys.path****"
|
|
||||||
for elem in sys.path:
|
|
||||||
print elem
|
|
||||||
import os
|
import os
|
||||||
from twisted.internet import reactor
|
from twisted.internet import reactor
|
||||||
|
|
||||||
|
@ -81,6 +77,6 @@ if __name__ == '__main__':
|
||||||
metadata01 = Metadata(dict(filename='dummy.txt'))
|
metadata01 = Metadata(dict(filename='dummy.txt'))
|
||||||
res01 = Resource()
|
res01 = Resource()
|
||||||
res01.metadata = metadata01
|
res01.metadata = metadata01
|
||||||
res01.path = '/dummydir/dummyfile'
|
res01.path = 'data/file1.txt'
|
||||||
controller.enterJob('sample', 'sample03', params=dict(resource=res01))
|
controller.enterJob('sample', 'sample03', params=dict(resource=res01))
|
||||||
startReactor()
|
startReactor()
|
||||||
|
|
|
@ -64,7 +64,7 @@ class RPCServer(xmlrpc.XMLRPC):
|
||||||
# this is done AFTER the resource (like e.g. file or mail)
|
# this is done AFTER the resource (like e.g. file or mail)
|
||||||
# is handed over
|
# is handed over
|
||||||
pass
|
pass
|
||||||
print metadata
|
print '*** metadata', metadata
|
||||||
metadata = "Echo: ", metadata
|
metadata = "Echo: ", metadata
|
||||||
return metadata
|
return metadata
|
||||||
|
|
||||||
|
@ -76,4 +76,5 @@ if __name__ == '__main__':
|
||||||
from twisted.internet import reactor
|
from twisted.internet import reactor
|
||||||
site = RPCServer()
|
site = RPCServer()
|
||||||
reactor.listenTCP(8082, server.Site(site))
|
reactor.listenTCP(8082, server.Site(site))
|
||||||
|
print '*** listening...'
|
||||||
reactor.run()
|
reactor.run()
|
|
@ -7,7 +7,7 @@ from cybertools.agent.transport.file.sftp import FileTransfer
|
||||||
def output(x):
|
def output(x):
|
||||||
print x
|
print x
|
||||||
|
|
||||||
ft = FileTransfer('cy05.de', 22, 'scrat', 'pyjmfha')
|
ft = FileTransfer('cy05.de', 22, 'scrat', '...')
|
||||||
|
|
||||||
d = ft.upload('d:\\text2.rtf', 'text.txt')
|
d = ft.upload('d:\\text2.rtf', 'text.txt')
|
||||||
d.addCallback(output)
|
d.addCallback(output)
|
||||||
|
|
|
@ -8,9 +8,10 @@
|
||||||
controller(names=['core.sample'])
|
controller(names=['core.sample'])
|
||||||
scheduler(name='core')
|
scheduler(name='core')
|
||||||
logger(name='default', standard=30)
|
logger(name='default', standard=30)
|
||||||
transport.remote.server = 'testing'
|
#transport.remote.server = 'testing'
|
||||||
transport.remote.url = 'http://localhost:8123'
|
transport.remote.url = 'http://localhost:8082'
|
||||||
transport.remote.ftp.url = 'http://cy05.de'
|
transport.remote.ftp.url = 'cy05.de'
|
||||||
transport.remote.ftp.user = 'scrat'
|
transport.remote.ftp.user = 'scrat'
|
||||||
transport.remote.sftp = 'testing'
|
transport.remote.ftp.password = '...'
|
||||||
|
transport.remote.sftp = 'http://cy05.de'
|
||||||
transport.remote.chunksize = 4096
|
transport.remote.chunksize = 4096
|
||||||
|
|
|
@ -46,7 +46,7 @@ class FileTransfer(protocol.ClientFactory):
|
||||||
def upload(self, localPath, remotePath):
|
def upload(self, localPath, remotePath):
|
||||||
""" Copies a file, returning a deferred.
|
""" Copies a file, returning a deferred.
|
||||||
"""
|
"""
|
||||||
d = defer.Deferred()
|
d = self.deferred = defer.Deferred()
|
||||||
# we put everything in a queue so that more than one file may
|
# we put everything in a queue so that more than one file may
|
||||||
# be transferred in one connection.
|
# be transferred in one connection.
|
||||||
self.queue.append(dict(deferred=d,
|
self.queue.append(dict(deferred=d,
|
||||||
|
@ -56,7 +56,6 @@ class FileTransfer(protocol.ClientFactory):
|
||||||
if len(self.queue) == 1 and self.channel is not None:
|
if len(self.queue) == 1 and self.channel is not None:
|
||||||
# the channel has emptied the queue
|
# the channel has emptied the queue
|
||||||
self.channel.execute()
|
self.channel.execute()
|
||||||
self.deferred = d
|
|
||||||
return d
|
return d
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
|
@ -102,7 +101,6 @@ class SFTPChannel(channel.SSHChannel):
|
||||||
self.localFile = open(localPath, 'rb')
|
self.localFile = open(localPath, 'rb')
|
||||||
d = self.client.openFile(remotePath,
|
d = self.client.openFile(remotePath,
|
||||||
filetransfer.FXF_WRITE | filetransfer.FXF_CREAT, {})
|
filetransfer.FXF_WRITE | filetransfer.FXF_CREAT, {})
|
||||||
print 'command_upload', params
|
|
||||||
d.addCallbacks(self.writeChunk, self.logError)
|
d.addCallbacks(self.writeChunk, self.logError)
|
||||||
|
|
||||||
def writeChunk(self, remoteFile):
|
def writeChunk(self, remoteFile):
|
||||||
|
@ -123,7 +121,8 @@ class SFTPChannel(channel.SSHChannel):
|
||||||
def finished(self, result):
|
def finished(self, result):
|
||||||
self.localFile.close()
|
self.localFile.close()
|
||||||
self.remFile.close()
|
self.remFile.close()
|
||||||
self.d.callback('finished')
|
#self.d.callback('finished')
|
||||||
|
self.conn.factory.deferred.callback('finished')
|
||||||
|
|
||||||
# classes for managing the SSH protocol and connection
|
# classes for managing the SSH protocol and connection
|
||||||
|
|
||||||
|
|
|
@ -44,25 +44,18 @@ class Transporter(QueueableAgent):
|
||||||
|
|
||||||
implements(ITransporter)
|
implements(ITransporter)
|
||||||
|
|
||||||
serverURL = ''
|
port = 22
|
||||||
server = ''
|
|
||||||
host = port = None
|
|
||||||
method = ''
|
|
||||||
machineName = ''
|
machineName = ''
|
||||||
userName = ''
|
|
||||||
password = ''
|
|
||||||
resource = None
|
|
||||||
|
|
||||||
def __init__(self, master):
|
def __init__(self, master):
|
||||||
super(Transporter, self).__init__(master)
|
super(Transporter, self).__init__(master)
|
||||||
config = master.config
|
config = master.config
|
||||||
self.serverURL = config.transport.remote.url
|
serverURL = config.transport.remote.url
|
||||||
self.server = rpcapi.xmlrpc.Proxy(self.serverURL)
|
self.server = rpcapi.xmlrpc.Proxy(serverURL)
|
||||||
self.ftpServer = sftpapi.FileTransfer(self.host, self.port, self.userName, self.password)
|
userName = config.transport.remote.ftp.user
|
||||||
#self.method = params[method]
|
password = config.transport.remote.ftp.password
|
||||||
#self.machineName = params[machineName]
|
host = config.transport.remote.ftp.url
|
||||||
#self.userName = params[userName]
|
self.ftpServer = sftpapi.FileTransfer(host, self.port, userName, password)
|
||||||
#self.password = params[password]
|
|
||||||
|
|
||||||
def process(self):
|
def process(self):
|
||||||
return self.transfer(self.params['resource'])
|
return self.transfer(self.params['resource'])
|
||||||
|
@ -71,12 +64,9 @@ class Transporter(QueueableAgent):
|
||||||
""" Transfer the resource (an object providing IResource)
|
""" Transfer the resource (an object providing IResource)
|
||||||
to the server and return a Deferred.
|
to the server and return a Deferred.
|
||||||
"""
|
"""
|
||||||
#return self.server.callRemote('getMetadata', resource.metadata)
|
|
||||||
self.deferred = defer.Deferred()
|
self.deferred = defer.Deferred()
|
||||||
#print "**** RESOURCE.PATH: ", resource.path
|
|
||||||
remoteFile = os.path.basename(resource.path)
|
remoteFile = os.path.basename(resource.path)
|
||||||
d = self.ftpServer.upload(resource.path, remoteFile)
|
d = self.ftpServer.upload(resource.path, remoteFile)
|
||||||
#d = self.server.callRemote('getMetadata', resource.metadata)
|
|
||||||
d.addErrback(self.errorHandler)
|
d.addErrback(self.errorHandler)
|
||||||
d.addCallback(lambda result:
|
d.addCallback(lambda result:
|
||||||
self.server.callRemote('getMetadata', dict(resource.metadata)))
|
self.server.callRemote('getMetadata', dict(resource.metadata)))
|
||||||
|
@ -97,14 +87,6 @@ class Transporter(QueueableAgent):
|
||||||
This callback method is called when resource and metadata
|
This callback method is called when resource and metadata
|
||||||
have been transferred successfully.
|
have been transferred successfully.
|
||||||
"""
|
"""
|
||||||
#print 'transferDone:', successMessage
|
|
||||||
self.deferred.callback(result)
|
self.deferred.callback(result)
|
||||||
|
|
||||||
# def process(self):
|
|
||||||
# return self.collect()
|
|
||||||
|
|
||||||
# def collect(self, filter=None):
|
|
||||||
# d = defer.succeed([])
|
|
||||||
# return d
|
|
||||||
|
|
||||||
agents.register(Transporter, Master, name='transport.remote')
|
agents.register(Transporter, Master, name='transport.remote')
|
Loading…
Add table
Reference in a new issue