intermediate check-in. moved rpcserver.py to testing, but yet only a part of the requested changes have been made there.
rpcclient functionality has been integrated into /transport/remote.py The transporter class is now using its own config-file (as a first idea). git-svn-id: svn://svn.cy55.de/Zope3/src/cybertools/trunk@2717 fd906abe-77d9-0310-91a1-e0d9ade77398
This commit is contained in:
parent
f93122d379
commit
5a48213560
4 changed files with 37 additions and 100 deletions
|
@ -25,6 +25,7 @@ $Id$
|
||||||
|
|
||||||
from twisted.web import xmlrpc, server, resource
|
from twisted.web import xmlrpc, server, resource
|
||||||
from twisted.internet import defer, reactor
|
from twisted.internet import defer, reactor
|
||||||
|
from cybertools.agent.base.agent import Agent
|
||||||
|
|
||||||
application = None
|
application = None
|
||||||
|
|
|
@ -25,13 +25,16 @@ $Id$
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from zope.interface import implements
|
from zope.interface import implements
|
||||||
|
from twisted.web import xmlrpc
|
||||||
|
|
||||||
from cybertools.agent.core.agent import QueueableAgent
|
from cybertools.agent.core.agent import QueueableAgent
|
||||||
from cybertools.agent.interfaces import ITransporter
|
from cybertools.agent.interfaces import ITransporter
|
||||||
from cybertools.agent.transport.rpcclient import RPCClient
|
|
||||||
from cybertools.agent.crawl.base import Metadata
|
from cybertools.agent.crawl.base import Metadata
|
||||||
from cybertools.agent.crawl.mail import MailResource
|
from cybertools.agent.crawl.mail import MailResource
|
||||||
from cybertools.agent.crawl.filesystem import FileResource
|
from cybertools.agent.crawl.filesystem import FileResource
|
||||||
|
from cybertools.agent.components import agents
|
||||||
|
from cybertools.util.config import Configurator
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class Transporter(QueueableAgent):
|
class Transporter(QueueableAgent):
|
||||||
|
@ -39,49 +42,47 @@ class Transporter(QueueableAgent):
|
||||||
implements(ITransporter)
|
implements(ITransporter)
|
||||||
|
|
||||||
serverURL = ''
|
serverURL = ''
|
||||||
|
server = ''
|
||||||
method = ''
|
method = ''
|
||||||
machineName = ''
|
machineName = ''
|
||||||
userName = ''
|
userName = ''
|
||||||
password = ''
|
password = ''
|
||||||
xmlrpcClient = ''
|
|
||||||
resource = None
|
resource = None
|
||||||
|
|
||||||
def __init__(self, master, params={}):
|
def __init__(self, master, configuration):
|
||||||
super(Transporter, self).__init__(master)
|
super(Transporter, self).__init__(master)
|
||||||
if params.has_key(serverURL):
|
if isinstance(configuration, Configurator):
|
||||||
self.xmlrpcClient = RPCClient(self.serverURL)
|
self.config = configuration
|
||||||
|
else: # configuration is path to config file
|
||||||
|
self.config = Configurator()
|
||||||
|
self.config.load(configuration)
|
||||||
|
|
||||||
|
self.serverURL = self.config.xmlrpcClient.serverURL
|
||||||
|
self.server = xmlrpc.Proxy(self.serverURL)
|
||||||
|
self.method = self.config.xmlrpcClient.method
|
||||||
|
self.machineName = self.config.xmlrpcClient.machineName
|
||||||
|
self.userName = self.config.xmlrpcClient.userName
|
||||||
|
self.password = self.config.xmlrpcClient.password
|
||||||
|
|
||||||
def transfer(self, resource):
|
def transfer(self, resource):
|
||||||
""" Transfer the resource (an object providing IResource)
|
""" Transfer the resource (an object providing IResource)
|
||||||
to the server and return a Deferred.
|
to the server and return a Deferred.
|
||||||
"""
|
"""
|
||||||
deferred = self.xmlrpcClient.transferResource(resource)
|
deferred = self.server.callRemote('getMetadata', resource.metadata)
|
||||||
# concept test method
|
deferred.addCallback(self.transferDone)
|
||||||
# sftp transfer here with callback to self.cb_sendMetadata
|
deferred.addErrback(self.errorHandler)
|
||||||
deferred.addCallback(self.cb_sendMetadata)
|
|
||||||
deferred.addErrback(self.cb_errorHandler)
|
|
||||||
|
|
||||||
def cb_sendMetadata(self, serverResponse=''):
|
|
||||||
"""
|
|
||||||
After the resource object has been sent successfully to the
|
|
||||||
RPCServer this method is invoked by a callback from the
|
|
||||||
transfer method and is sending the according metadata of the resource.
|
|
||||||
"""
|
|
||||||
# maybe react here to a special server response like
|
|
||||||
# e.g. delay because of server being in heavy load condition
|
|
||||||
deferred = self.xmlrpcClient.transferMetadata(self.resource.metadata)
|
|
||||||
deferred.addCallback(self.cb_transferDone)
|
|
||||||
deferred.addErrback(self.cb_errorHandler)
|
|
||||||
|
|
||||||
def cb_errorHandler(self, errorInfo):
|
def errorHandler(self, errorInfo):
|
||||||
"""
|
"""
|
||||||
This is a callback error Handler
|
Invoked as a callback from self.transfer
|
||||||
|
Error handler.
|
||||||
"""
|
"""
|
||||||
print errorInfo
|
print errorInfo
|
||||||
self.xmlrpcClient.close()
|
self.server.close()
|
||||||
|
|
||||||
def cb_transferDone(self, successMessage=''):
|
def transferDone(self, successMessage=''):
|
||||||
"""
|
"""
|
||||||
|
Invoked as a callback from self.transfer
|
||||||
This callback method is called when resource and metadata
|
This callback method is called when resource and metadata
|
||||||
have been transferred successfully.
|
have been transferred successfully.
|
||||||
"""
|
"""
|
||||||
|
@ -94,3 +95,4 @@ class Transporter(QueueableAgent):
|
||||||
# d = defer.succeed([])
|
# d = defer.succeed([])
|
||||||
# return d
|
# return d
|
||||||
|
|
||||||
|
agents.register(Transporter, Master, name='transport.remote')
|
|
@ -1,74 +0,0 @@
|
||||||
#
|
|
||||||
# Copyright (c) 2008 Helmut Merz helmutm@cy55.de
|
|
||||||
#
|
|
||||||
# This program is free software; you can redistribute it and/or modify
|
|
||||||
# it under the terms of the GNU General Public License as published by
|
|
||||||
# the Free Software Foundation; either version 2 of the License, or
|
|
||||||
# (at your option) any later version.
|
|
||||||
#
|
|
||||||
# This program is distributed in the hope that it will be useful,
|
|
||||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
||||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
||||||
# GNU General Public License for more details.
|
|
||||||
#
|
|
||||||
# You should have received a copy of the GNU General Public License
|
|
||||||
# along with this program; if not, write to the Free Software
|
|
||||||
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
||||||
#
|
|
||||||
|
|
||||||
"""
|
|
||||||
RPCClient takes over the task of transferring metadata to the
|
|
||||||
loops RPCServer (control.remote)
|
|
||||||
RPCClient is invoked by the transporter (transport.remote)
|
|
||||||
|
|
||||||
$Id$
|
|
||||||
"""
|
|
||||||
|
|
||||||
from twisted.web import xmlrpc
|
|
||||||
from twisted.internet import reactor
|
|
||||||
|
|
||||||
class RPCClient(object):
|
|
||||||
|
|
||||||
close = reactor.stop
|
|
||||||
server = None
|
|
||||||
|
|
||||||
def __init__(self, url):
|
|
||||||
self.server = xmlrpc.Proxy(url)
|
|
||||||
reactor.run()
|
|
||||||
|
|
||||||
def cb_printServerResponse(self, response=''):
|
|
||||||
"""
|
|
||||||
this method is invoked by a callback
|
|
||||||
"""
|
|
||||||
print response
|
|
||||||
return response
|
|
||||||
|
|
||||||
def cb_errorHandler(self, error):
|
|
||||||
"""
|
|
||||||
this method is invoked by a callback
|
|
||||||
"""
|
|
||||||
print error
|
|
||||||
return error
|
|
||||||
|
|
||||||
def transferMetadata(self, metadata):
|
|
||||||
deferred = self.server.callRemote('getMetadata', metadata)
|
|
||||||
deferred.addCallback(self.cb_printServerResponse)
|
|
||||||
deferred.addErrback(self.cb_errorHandler)
|
|
||||||
|
|
||||||
def transferResource(self, resource):
|
|
||||||
deferred = self.server.callRemote('transfer', resource)
|
|
||||||
deferred.addCallback(self.cb_printServerResponse)
|
|
||||||
deferred.addErrback(self.cb_errorHandler)
|
|
||||||
|
|
||||||
def printString(self):
|
|
||||||
print "Client test function printString"
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
url = 'http://localhost:8082'
|
|
||||||
for elem in range(1,4):
|
|
||||||
elem = "Testcounter: ", elem
|
|
||||||
RPCClient(url).transferMetadata(elem)
|
|
||||||
RPCClient(url).printString()
|
|
||||||
reactor.run()
|
|
||||||
|
|
8
agent/transport/transporter.cfg
Normal file
8
agent/transport/transporter.cfg
Normal file
|
@ -0,0 +1,8 @@
|
||||||
|
#
|
||||||
|
# sample.cfg - agent configuration for demonstration and testing purposes
|
||||||
|
#
|
||||||
|
# $Id$
|
||||||
|
#
|
||||||
|
|
||||||
|
xmlrpcClient(serverURL='http://localhost:8082', method='PUT', machineName='sr',
|
||||||
|
userName='loopsAdmin', password='loops')
|
Loading…
Add table
Reference in a new issue