diff --git a/agent/core/agent.py b/agent/core/agent.py index 41a0fac..bd75fbc 100644 --- a/agent/core/agent.py +++ b/agent/core/agent.py @@ -72,6 +72,7 @@ class QueueableAgent(Agent): def error(self, result): print '*** error', result + job = self.currentJob job.state = states.aborted self.log(self.currentJob, result='Error') self.master.notify(job, result) diff --git a/agent/main.py b/agent/main.py index 309281a..32387dd 100755 --- a/agent/main.py +++ b/agent/main.py @@ -59,6 +59,8 @@ def setupEnvironment(config): from cybertools.agent.transport import remote, loops from cybertools.agent.system.windows import api api.setup(config) + from cybertools.agent.system import rpcapi + rpcapi.setup(config) from cybertools.agent.crawl import base, filesystem, outlook diff --git a/agent/testing/rpcserver.py b/agent/testing/rpcserver.py index 208ab93..56f386a 100644 --- a/agent/testing/rpcserver.py +++ b/agent/testing/rpcserver.py @@ -24,6 +24,7 @@ $Id$ from twisted.internet.defer import succeed + class RPCServer(object): serverURL = '' @@ -41,55 +42,56 @@ class RPCServer(object): self.userName = userName self.password = password self.controller = controlObj - + def getMetadata(self, metadata): if self.controller is not None: # pass metadata to controller # this is done AFTER the resource (like e.g. file or mail) # is handed over pass - deferred = defer.succeed('Metadata accepted by server') + deferred = succeed('Metadata accepted by server') return deferred def xmlrpc_shutdownRPCServer(): return "xmlrRPC server shutdown completed!" -class xmlrpc(object): - +class XmlRpc(object): + Proxy = None XMLRPC = None Handler = None XMLRPCIntrospection = None QueryProtocol = None _QueryFactory = None - + def __init__(self): - self.Proxy = Proxy() - self.XMLRPC = XMLRPC() - self.Handler = Handler() - self.XMLRPCIntrospection = XMLRPCIntrospection() - self.QueryProtocol = QueryProtocol() - self._QueryFactory = _QueryFactory() - + self.Proxy = Proxy + #self.XMLRPC = XMLRPC() + #self.Handler = Handler() + #self.XMLRPCIntrospection = XMLRPCIntrospection() + #self.QueryProtocol = QueryProtocol() + #self._QueryFactory = _QueryFactory() + def addIntrospection(self, xmlrpc): pass - + + class Proxy(object): - + url = '' user = None password = None allowNone = False queryFactory = None - + def __init__(self, url, user=None, password=None, allowNone=False): self.url = url self.user = user self.password = password self.allowNone = allowNone self.RPCServer = RPCServer() - + def callRemote(self, methodName, *params): """ intended to simulate the callRemote command of a real xmlrpcserver @@ -99,32 +101,35 @@ class Proxy(object): method = getattr(self.RPCServer, methodName) return method(*params) - + +xmlrpc = XmlRpc() + + class XMLRPC(object): - + def __init__(self): pass - + class Handler(object): - + def __init__(self): pass - + class XMLRPCIntrospection(object): - + def __init__(self): pass - + class QueryProtocol(object): - + def __init__(self): pass - + class _QueryFactory(object): - + def __init__(self): pass diff --git a/agent/tests.py b/agent/tests.py index 7ecdc52..f9f1408 100755 --- a/agent/tests.py +++ b/agent/tests.py @@ -45,6 +45,7 @@ def test_suite(): DocFileSuite('crawl/README.txt', optionflags=flags), DocFileSuite('crawl/filesystem.txt', optionflags=flags), DocFileSuite('crawl/outlook.txt', optionflags=flags), + DocFileSuite('transport/transporter.txt', optionflags=flags), )) return testSuite diff --git a/agent/transport/remote.py b/agent/transport/remote.py index 3dc9e26..b1fb974 100644 --- a/agent/transport/remote.py +++ b/agent/transport/remote.py @@ -24,9 +24,10 @@ and sending requests to a corresponding remote controller. $Id$ """ +from twisted.internet import defer from zope.interface import implements -from cybertools.agent.system import rpcapi +from cybertools.agent.system import rpcapi from cybertools.agent.base.agent import Master from cybertools.agent.core.agent import QueueableAgent from cybertools.agent.interfaces import ITransporter @@ -40,7 +41,7 @@ from cybertools.util.config import Configurator class Transporter(QueueableAgent): implements(ITransporter) - + serverURL = '' server = '' method = '' @@ -49,44 +50,46 @@ class Transporter(QueueableAgent): password = '' resource = None - def __init__(self, master, params): + def __init__(self, master): super(Transporter, self).__init__(master) -## if isinstance(configuration, Configurator): -## self.config = configuration -## else: # configuration is path to config file -## self.config = Configurator() -## self.config.load(configuration) - self.serverURL = params[serverURL] + config = master.config + #self.serverURL = params[serverURL] self.server = rpcapi.xmlrpc.Proxy(self.serverURL) - self.method = params[method] - self.machineName = params[machineName] - self.userName = params[userName] - self.password = params[password] + #self.method = params[method] + #self.machineName = params[machineName] + #self.userName = params[userName] + #self.password = params[password] + + def process(self): + return self.transfer(self.params['resource']) def transfer(self, resource): """ Transfer the resource (an object providing IResource) to the server and return a Deferred. """ - deferred = self.server.callRemote('getMetadata', resource.metadata) - deferred.addCallback(self.transferDone) - deferred.addErrback(self.errorHandler) - + #return self.server.callRemote('getMetadata', resource.metadata) + self.deferred = defer.Deferred() + d = self.server.callRemote('getMetadata', resource.metadata) + d.addCallback(self.transferDone) + d.addErrback(self.errorHandler) + return self.deferred + def errorHandler(self, errorInfo): """ Invoked as a callback from self.transfer Error handler. """ print errorInfo - self.server.close() - - def transferDone(self, successMessage=''): + #self.server.close() + + def transferDone(self, result): """ Invoked as a callback from self.transfer This callback method is called when resource and metadata have been transferred successfully. """ - print successMessage - pass + #print 'transferDone:', successMessage + self.deferred.callback(result) # def process(self): # return self.collect() diff --git a/agent/transport/transporter.txt b/agent/transport/transporter.txt index 63c7e05..cb97568 100644 --- a/agent/transport/transporter.txt +++ b/agent/transport/transporter.txt @@ -35,11 +35,15 @@ This Testcase is using subsidiary methods to simulate a real xmlrpc server. In the next step we request the start of a job, again via the controller. - >>> controller.enterJob('sample', 'sample03', params=dict(serverURL="", machineName="", method="", userName="", password="")) + >>> from cybertools.agent.crawl.base import Metadata, Resource + >>> md01 = Metadata(dict(filename='dummy.txt')) + >>> r01 = Resource() + >>> r01.metadata = md01 + >>> controller.enterJob('sample', 'sample03', params=dict(resource=r01)) The job is not executed immediately - we have to hand over control to the twisted reactor first. >>> from cybertools.agent.tests import tester >>> tester.iterate() - Job 00001 completed; result: Metadata received!; \ No newline at end of file + Job 00001 completed; result: Metadata accepted by server;