diff --git a/agent/README.txt b/agent/README.txt index 28b5624..804562e 100644 --- a/agent/README.txt +++ b/agent/README.txt @@ -128,6 +128,8 @@ We make the contollers provide the specifications via the master agent's ``setup()`` method. >>> master.setup() + Starting agent application... + Using controllers base.sample. Other Agents ------------ diff --git a/agent/base/agent.py b/agent/base/agent.py index f7d94f8..bee7a9c 100644 --- a/agent/base/agent.py +++ b/agent/base/agent.py @@ -27,6 +27,7 @@ from zope.interface import implements from cybertools.agent.common import states from cybertools.agent.components import agents, controllers, jobs from cybertools.agent.components import loggers, schedulers +from cybertools.agent.components import servers, clients from cybertools.agent.interfaces import IAgent from cybertools.util.config import Configurator @@ -71,15 +72,22 @@ class Master(Agent): self.master = self self.controllers = [] self.children = {} + self.servers = [] def setup(self): config = self.config self.logger = loggers(self, name=config.logger.name) + print 'Starting agent application...' for n in config.controller.names: self.controllers.append(controllers(self, n)) self.scheduler = schedulers(self, name=config.scheduler.name) for cont in self.controllers: cont.setup() + print 'Using controllers %s.' % ', '.join(config.controller.names) + for n in config.talk.server.names: + server = servers(self, n) + self.servers.append(server) + server.setup() def setupAgents(self, controller, agentSpecs): for spec in agentSpecs: diff --git a/agent/components.py b/agent/components.py index c33a7c0..a4c0773 100644 --- a/agent/components.py +++ b/agent/components.py @@ -30,3 +30,5 @@ controllers = AdapterFactory() jobs = AdapterFactory() loggers = AdapterFactory() schedulers = AdapterFactory() +servers = AdapterFactory() +clients = AdapterFactory() diff --git a/agent/main.py b/agent/main.py index 144ea5b..edf241a 100755 --- a/agent/main.py +++ b/agent/main.py @@ -47,18 +47,10 @@ def setup(configInfo=None): master = Master(configInfo) setupEnvironment(master.config) master.setup() - print 'Starting agent application...' - print 'Using controllers %s.' % ', '.join(master.config.controller.names) return master def setupEnvironment(config): - # self registration of components: - from cybertools.agent.base import agent, control, job, log, schedule - from cybertools.agent.core import agent, control, schedule - from cybertools.agent.control import cmdline, remote - from cybertools.agent.crawl import base, filesystem, outlook - from cybertools.agent.transport import remote, loops # API registration: from cybertools.agent.system.windows import api api.setup(config) @@ -66,6 +58,13 @@ def setupEnvironment(config): http.setup(config) xmlrpc.setup(config) sftp.setup(config) + # self registration of components: + from cybertools.agent.base import agent, control, job, log, schedule + from cybertools.agent.core import agent, control, schedule + from cybertools.agent.control import cmdline, remote + from cybertools.agent.crawl import base, filesystem, outlook + from cybertools.agent.talk import http + from cybertools.agent.transport import remote, loops def startReactor(): diff --git a/agent/system/http.py b/agent/system/http.py index bfae41a..8f5a303 100644 --- a/agent/system/http.py +++ b/agent/system/http.py @@ -19,12 +19,12 @@ """ Configuration-controlled import of HTTP communication functionality. -$Id: rpcapi.py +$Id$ """ def setup(config): global listener, getPage - if config.talk.http == 'testing': + if config.talk.http.handler == 'testing': from cybertools.agent.testing.http import listener, getPage else: from twisted.internet import reactor as listener diff --git a/agent/talk/README.txt b/agent/talk/README.txt index 20039c3..6e984ef 100644 --- a/agent/talk/README.txt +++ b/agent/talk/README.txt @@ -4,20 +4,27 @@ Agents for Job Execution and Communication Tasks ($Id$) + +Communication Handling +====================== + +Communication services are provided by handlers specified in the ``talk`` +package. + >>> config = ''' ... controller(names=['core.sample']) ... scheduler(name='core') ... logger(name='default', standard=30) - ... talk.http = 'testing' + ... talk.server(names=['http']) + ... talk.server.http(port=8081) + ... talk.http(handler='testing') ... ''' >>> from cybertools.agent.main import setup >>> master = setup(config) Starting agent application... Using controllers core.sample. + Setting up HTTP handler for port 8081. -Communication Handling -====================== - - >>> from cybertools.agent.talk.http import Handler - + >>> master.servers + [] diff --git a/agent/talk/http.py b/agent/talk/http.py index ecb0471..7eda581 100644 --- a/agent/talk/http.py +++ b/agent/talk/http.py @@ -28,7 +28,10 @@ from twisted.web.server import Site from zope.interface import implements from cybertools.agent.base.agent import Master +from cybertools.agent.components import servers, clients from cybertools.agent.system.http import listener +from cybertools.agent.talk.interfaces import IServer, IClient +from cybertools.agent.talk.interfaces import ISession, IInteraction class RootResource(Resource): @@ -46,10 +49,45 @@ class CommandHandler(Resource): return '{"message": "OK"}' -class Handler(object): +class HttpServer(object): - def listen(self, port): - return listener.listenTCP(port, Site(RootResource())) + implements(IServer) - def send(self, clientId, data): - return defer.Deferred() + def __init__(self, agent): + self.agent = agent + self.port = agent.config.talk.server.http.port + self.subscribers = {} + + def setup(self): + print 'Setting up HTTP handler for port %i.' % self.port + listener.listenTCP(self.port, Site(RootResource())) + + def subscribe(self, subscriber, aspect): + pass + + def unsubscribe(self, subscriber, aspect): + pass + + def send(self, client, data, interaction=None): + return defer.Deferred() # Interaction + +servers.register(HttpServer, Master, name='http') + + +class HttpClient(object): + + implements(IClient) + + def __init__(self, agent): + self.agent = agent + + def logon(self, subscriber, url): + return defer.Deferred() # Session + + def logoff(self, session): + pass + + def send(self, session, data, interaction=None): + return defer.Deferred() # Interaction + +clients.register(HttpClient, Master, name='http') diff --git a/agent/talk/interfaces.py b/agent/talk/interfaces.py new file mode 100644 index 0000000..f0775ff --- /dev/null +++ b/agent/talk/interfaces.py @@ -0,0 +1,115 @@ +# +# Copyright (c) 2008 Helmut Merz helmutm@cy55.de +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +""" +Interfaces for handling asynchronous communication tasks. + +$Id$ +""" + +from zope.interface import Interface, Attribute + +from cybertools.util.jeep import Jeep + + +class IServer(Interface): + """ A server waits for connection requests from a client. A connected + client may then send data to or receive messages from the server. + """ + + def subscribe(subscriber, aspect): + """ The subscriber will receive messages via its ``onMesssage`` method. + + The aspect is a dotted string used to select the kind of + sessions/remote clients the subscriber wants to receive messages + from. + """ + + def unsubscribe(subscriber, aspect): + """ Stop receiving messages. + """ + + def send(session, data, interaction=None): + """ Send data to the remote client specified via the session given. + + If interaction is None, create a new one. + Return a deferred providing the interaction with its current state. + """ + + +class IClient(Interface): + """ A client initiates a connection (session) to a server and may then + sent data to or receive data from the server. + """ + + def logon(subscriber, url, credentials=None): + """ Connect to a server using the URL given, optionally logging in + with the credentials given. + + The subscriber will receive messages via its ``onMesssage`` callback. + + Return a Deferred that will provide an ISession implementation; + this may then be used sending data to the server. + """ + + def logoff(session): + """ Close the connection for the session given. + """ + + def send(session, data, interaction=None): + """ Send data to the server specified via the session given. + + If interaction is None, create a new one. + Return a deferred providing the interaction with its current state; + sending an interaction with ``finished`` set to True signifies + the last message of an interaction. + """ + + +# auxiliary interfaces + +class ISubscriber(Interface): + """ May receive message notifications. + """ + + def onMesssage(interaction, data): + """ Callback method for message notifications. + """ + + +class ISession(Interface): + """ Represents the connection to a server within a client or + a remote client connection within a server. + """ + + issuer = Attribute("""The issuer of the session, i.e. the server or + client object, respectively.""") + + state = Attribute("""A string specifying the current state of the session: + 'logon': The remote client is trying to connect/log in, + data may contain credential information; + 'logoff': The remote client is closing the connection; + 'open': The connection is open.""") + + +class IInteraction(Interface): + """ Represents a set of message exchanges belonging together. + """ + + session = Attribute("The session the interaction belongs to.") + finished = Attribute("The interaction is finished, interaction data may be cleared.")