Added rpcclient.py in /agent/transport which is instantiated by Transporter in /agent/transport/remote.py
Added rpcserver.py in /agent/control which is intended to be listening on connections and hands over metadata and resource objects to a controller. git-svn-id: svn://svn.cy55.de/Zope3/src/cybertools/trunk@2711 fd906abe-77d9-0310-91a1-e0d9ade77398
This commit is contained in:
parent
b6d7bfa7c9
commit
5a1e67fe5c
4 changed files with 223 additions and 5 deletions
|
@ -17,12 +17,10 @@
|
|||
#
|
||||
|
||||
"""
|
||||
Providing access for remote agent instances by listening for requests
|
||||
from remote transport agents.
|
||||
Controller that accepts, forwards and stores data received by the
|
||||
rpcserver
|
||||
|
||||
$Id$
|
||||
"""
|
||||
|
||||
from zope.interface import implements
|
||||
|
||||
|
||||
from zope.interface import implements
|
78
agent/control/rpcserver.py
Normal file
78
agent/control/rpcserver.py
Normal file
|
@ -0,0 +1,78 @@
|
|||
#
|
||||
# Copyright (c) 2008 Helmut Merz helmutm@cy55.de
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation; either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, write to the Free Software
|
||||
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
#
|
||||
|
||||
"""
|
||||
Providing access for remote agent instances by listening for requests
|
||||
from remote transport agents.
|
||||
|
||||
$Id$
|
||||
"""
|
||||
|
||||
from twisted.web import xmlrpc, server, resource
|
||||
from twisted.internet import defer, reactor
|
||||
|
||||
application = None
|
||||
|
||||
class RPCServer(xmlrpc.XMLRPC):
|
||||
|
||||
serverURL = ''
|
||||
method = ''
|
||||
machineName = ''
|
||||
userName = ''
|
||||
password = ''
|
||||
controller = ''
|
||||
close = reactor.stop
|
||||
|
||||
def __init__(self, serverURL = '', method = '', machineName = '',
|
||||
userName = '', password = '', controlObj= None):
|
||||
self.serverURL = serverURL
|
||||
self.method = method
|
||||
self.machineName = machineName
|
||||
self.userName = userName
|
||||
self.password = password
|
||||
self.controller = controlObj
|
||||
xmlrpc.XMLRPC.__init__(self)
|
||||
|
||||
def xmlrpc_transfer(self, resource):
|
||||
if self.controller is not None:
|
||||
# pass resource object to controller
|
||||
# this is done BEFORE the metadata is handed over
|
||||
# call notify method of controller
|
||||
pass
|
||||
print resource
|
||||
return "Resource received: ", resource
|
||||
|
||||
def xmlrpc_getMetadata(self, metadata):
|
||||
if self.controller is not None:
|
||||
# pass metadata to controller
|
||||
# this is done AFTER the resource (like e.g. file or mail)
|
||||
# is handed over
|
||||
pass
|
||||
print metadata
|
||||
metadata = "Echo: ", metadata
|
||||
return metadata
|
||||
|
||||
def xmlrpc_shutdownRPCServer():
|
||||
self.close()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
from twisted.internet import reactor
|
||||
site = RPCServer()
|
||||
reactor.listenTCP(8082, server.Site(site))
|
||||
reactor.run()
|
|
@ -26,3 +26,71 @@ $Id$
|
|||
|
||||
from zope.interface import implements
|
||||
|
||||
from cybertools.agent.core.agent import QueueableAgent
|
||||
from cybertools.agent.interfaces import ITransporter
|
||||
from cybertools.agent.transport.rpcclient import RPCClient
|
||||
from cybertools.agent.crawl.base import Metadata
|
||||
from cybertools.agent.crawl.mail import MailResource
|
||||
from cybertools.agent.crawl.filesystem import FileResource
|
||||
|
||||
|
||||
class Transporter(QueueableAgent):
|
||||
|
||||
implements(ITransporter)
|
||||
|
||||
serverURL = ''
|
||||
method = ''
|
||||
machineName = ''
|
||||
userName = ''
|
||||
password = ''
|
||||
xmlrpcClient = ''
|
||||
resource = None
|
||||
|
||||
def __init__(self, master, params={}):
|
||||
super(Transporter, self).__init__(master)
|
||||
if params.has_key(serverURL):
|
||||
self.xmlrpcClient = RPCClient(self.serverURL)
|
||||
|
||||
def transfer(self, resource):
|
||||
""" Transfer the resource (an object providing IResource)
|
||||
to the server and return a Deferred.
|
||||
"""
|
||||
deferred = self.xmlrpcClient.transferResource(resource)
|
||||
# concept test method
|
||||
# sftp transfer here with callback to self.cb_sendMetadata
|
||||
deferred.addCallback(self.cb_sendMetadata)
|
||||
deferred.addErrback(self.cb_errorHandler)
|
||||
|
||||
def cb_sendMetadata(self, serverResponse=''):
|
||||
"""
|
||||
After the resource object has been sent successfully to the
|
||||
RPCServer this method is invoked by a callback from the
|
||||
transfer method and is sending the according metadata of the resource.
|
||||
"""
|
||||
# maybe react here to a special server response like
|
||||
# e.g. delay because of server being in heavy load condition
|
||||
deferred = self.xmlrpcClient.transferMetadata(self.resource.metadata)
|
||||
deferred.addCallback(self.cb_transferDone)
|
||||
deferred.addErrback(self.cb_errorHandler)
|
||||
|
||||
def cb_errorHandler(self, errorInfo):
|
||||
"""
|
||||
This is a callback error Handler
|
||||
"""
|
||||
print errorInfo
|
||||
self.xmlrpcClient.close()
|
||||
|
||||
def cb_transferDone(self, successMessage=''):
|
||||
"""
|
||||
This callback method is called when resource and metadata
|
||||
have been transferred successfully.
|
||||
"""
|
||||
pass
|
||||
|
||||
# def process(self):
|
||||
# return self.collect()
|
||||
|
||||
# def collect(self, filter=None):
|
||||
# d = defer.succeed([])
|
||||
# return d
|
||||
|
||||
|
|
74
agent/transport/rpcclient.py
Normal file
74
agent/transport/rpcclient.py
Normal file
|
@ -0,0 +1,74 @@
|
|||
#
|
||||
# Copyright (c) 2008 Helmut Merz helmutm@cy55.de
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation; either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, write to the Free Software
|
||||
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
#
|
||||
|
||||
"""
|
||||
RPCClient takes over the task of transferring metadata to the
|
||||
loops RPCServer (control.remote)
|
||||
RPCClient is invoked by the transporter (transport.remote)
|
||||
|
||||
$Id$
|
||||
"""
|
||||
|
||||
from twisted.web import xmlrpc
|
||||
from twisted.internet import reactor
|
||||
|
||||
class RPCClient(object):
|
||||
|
||||
close = reactor.stop
|
||||
server = None
|
||||
|
||||
def __init__(self, url):
|
||||
self.server = xmlrpc.Proxy(url)
|
||||
reactor.run()
|
||||
|
||||
def cb_printServerResponse(self, response=''):
|
||||
"""
|
||||
this method is invoked by a callback
|
||||
"""
|
||||
print response
|
||||
return response
|
||||
|
||||
def cb_errorHandler(self, error):
|
||||
"""
|
||||
this method is invoked by a callback
|
||||
"""
|
||||
print error
|
||||
return error
|
||||
|
||||
def transferMetadata(self, metadata):
|
||||
deferred = self.server.callRemote('getMetadata', metadata)
|
||||
deferred.addCallback(self.cb_printServerResponse)
|
||||
deferred.addErrback(self.cb_errorHandler)
|
||||
|
||||
def transferResource(self, resource):
|
||||
deferred = self.server.callRemote('transfer', resource)
|
||||
deferred.addCallback(self.cb_printServerResponse)
|
||||
deferred.addErrback(self.cb_errorHandler)
|
||||
|
||||
def printString(self):
|
||||
print "Client test function printString"
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
url = 'http://localhost:8082'
|
||||
for elem in range(1,4):
|
||||
elem = "Testcounter: ", elem
|
||||
RPCClient(url).transferMetadata(elem)
|
||||
RPCClient(url).printString()
|
||||
reactor.run()
|
||||
|
Loading…
Add table
Reference in a new issue