diff --git a/agent/control/remote.py b/agent/control/remote.py index 9a9296c..0f7c543 100644 --- a/agent/control/remote.py +++ b/agent/control/remote.py @@ -17,12 +17,10 @@ # """ -Providing access for remote agent instances by listening for requests -from remote transport agents. +Controller that accepts, forwards and stores data received by the +rpcserver $Id$ """ -from zope.interface import implements - - +from zope.interface import implements \ No newline at end of file diff --git a/agent/control/rpcserver.py b/agent/control/rpcserver.py new file mode 100644 index 0000000..2a275a7 --- /dev/null +++ b/agent/control/rpcserver.py @@ -0,0 +1,78 @@ +# +# Copyright (c) 2008 Helmut Merz helmutm@cy55.de +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +""" +Providing access for remote agent instances by listening for requests +from remote transport agents. + +$Id$ +""" + +from twisted.web import xmlrpc, server, resource +from twisted.internet import defer, reactor + +application = None + +class RPCServer(xmlrpc.XMLRPC): + + serverURL = '' + method = '' + machineName = '' + userName = '' + password = '' + controller = '' + close = reactor.stop + + def __init__(self, serverURL = '', method = '', machineName = '', + userName = '', password = '', controlObj= None): + self.serverURL = serverURL + self.method = method + self.machineName = machineName + self.userName = userName + self.password = password + self.controller = controlObj + xmlrpc.XMLRPC.__init__(self) + + def xmlrpc_transfer(self, resource): + if self.controller is not None: + # pass resource object to controller + # this is done BEFORE the metadata is handed over + # call notify method of controller + pass + print resource + return "Resource received: ", resource + + def xmlrpc_getMetadata(self, metadata): + if self.controller is not None: + # pass metadata to controller + # this is done AFTER the resource (like e.g. file or mail) + # is handed over + pass + print metadata + metadata = "Echo: ", metadata + return metadata + + def xmlrpc_shutdownRPCServer(): + self.close() + + +if __name__ == '__main__': + from twisted.internet import reactor + site = RPCServer() + reactor.listenTCP(8082, server.Site(site)) + reactor.run() \ No newline at end of file diff --git a/agent/transport/remote.py b/agent/transport/remote.py index 3935f19..2112054 100644 --- a/agent/transport/remote.py +++ b/agent/transport/remote.py @@ -26,3 +26,71 @@ $Id$ from zope.interface import implements +from cybertools.agent.core.agent import QueueableAgent +from cybertools.agent.interfaces import ITransporter +from cybertools.agent.transport.rpcclient import RPCClient +from cybertools.agent.crawl.base import Metadata +from cybertools.agent.crawl.mail import MailResource +from cybertools.agent.crawl.filesystem import FileResource + + +class Transporter(QueueableAgent): + + implements(ITransporter) + + serverURL = '' + method = '' + machineName = '' + userName = '' + password = '' + xmlrpcClient = '' + resource = None + + def __init__(self, master, params={}): + super(Transporter, self).__init__(master) + if params.has_key(serverURL): + self.xmlrpcClient = RPCClient(self.serverURL) + + def transfer(self, resource): + """ Transfer the resource (an object providing IResource) + to the server and return a Deferred. + """ + deferred = self.xmlrpcClient.transferResource(resource) + # concept test method + # sftp transfer here with callback to self.cb_sendMetadata + deferred.addCallback(self.cb_sendMetadata) + deferred.addErrback(self.cb_errorHandler) + + def cb_sendMetadata(self, serverResponse=''): + """ + After the resource object has been sent successfully to the + RPCServer this method is invoked by a callback from the + transfer method and is sending the according metadata of the resource. + """ + # maybe react here to a special server response like + # e.g. delay because of server being in heavy load condition + deferred = self.xmlrpcClient.transferMetadata(self.resource.metadata) + deferred.addCallback(self.cb_transferDone) + deferred.addErrback(self.cb_errorHandler) + + def cb_errorHandler(self, errorInfo): + """ + This is a callback error Handler + """ + print errorInfo + self.xmlrpcClient.close() + + def cb_transferDone(self, successMessage=''): + """ + This callback method is called when resource and metadata + have been transferred successfully. + """ + pass + +# def process(self): +# return self.collect() + +# def collect(self, filter=None): +# d = defer.succeed([]) +# return d + diff --git a/agent/transport/rpcclient.py b/agent/transport/rpcclient.py new file mode 100644 index 0000000..533f0f6 --- /dev/null +++ b/agent/transport/rpcclient.py @@ -0,0 +1,74 @@ +# +# Copyright (c) 2008 Helmut Merz helmutm@cy55.de +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +""" +RPCClient takes over the task of transferring metadata to the +loops RPCServer (control.remote) +RPCClient is invoked by the transporter (transport.remote) + +$Id$ +""" + +from twisted.web import xmlrpc +from twisted.internet import reactor + +class RPCClient(object): + + close = reactor.stop + server = None + + def __init__(self, url): + self.server = xmlrpc.Proxy(url) + reactor.run() + + def cb_printServerResponse(self, response=''): + """ + this method is invoked by a callback + """ + print response + return response + + def cb_errorHandler(self, error): + """ + this method is invoked by a callback + """ + print error + return error + + def transferMetadata(self, metadata): + deferred = self.server.callRemote('getMetadata', metadata) + deferred.addCallback(self.cb_printServerResponse) + deferred.addErrback(self.cb_errorHandler) + + def transferResource(self, resource): + deferred = self.server.callRemote('transfer', resource) + deferred.addCallback(self.cb_printServerResponse) + deferred.addErrback(self.cb_errorHandler) + + def printString(self): + print "Client test function printString" + + +if __name__ == '__main__': + url = 'http://localhost:8082' + for elem in range(1,4): + elem = "Testcounter: ", elem + RPCClient(url).transferMetadata(elem) + RPCClient(url).printString() + reactor.run() + \ No newline at end of file