From e43810beb2260dfb287b8c9c80ba3aedc7d4aa31 Mon Sep 17 00:00:00 2001 From: helmutm Date: Sun, 22 Mar 2009 15:46:13 +0000 Subject: [PATCH] work in progress: agent.talk - client connect basically working git-svn-id: svn://svn.cy55.de/Zope3/src/cybertools/trunk@3289 fd906abe-77d9-0310-91a1-e0d9ade77398 --- agent/talk/README.txt | 9 +++-- agent/talk/base.py | 47 ++++++++++++++++++++++-- agent/talk/http.py | 79 ++++++++++++++++++++++++++++++++-------- agent/talk/interfaces.py | 18 ++++++--- agent/tests.py | 20 +++++++++- 5 files changed, 142 insertions(+), 31 deletions(-) diff --git a/agent/talk/README.txt b/agent/talk/README.txt index 5dd4741..8b9da88 100644 --- a/agent/talk/README.txt +++ b/agent/talk/README.txt @@ -4,6 +4,8 @@ Agents for Job Execution and Communication Tasks ($Id$) + >>> from cybertools.agent.tests import tester + Communication Handling ====================== @@ -40,9 +42,9 @@ that receive messages. ... def onMessage(self, interaction, data): ... print ('%s receiving: interaction=%s, data=%s' % ... (self.name, interaction, data)) + ... tester.stop() >>> serverSub = Subscriber('server') - >>> master.servers[0].subscribe(serverSub, 'testing') Set up a client @@ -61,9 +63,8 @@ work with the client but handle the client directly. Run the communication dialog ---------------------------- - >>> from cybertools.agent.tests import tester - >>> tester.iterate(400) - Session receiving, data={"message": "OK"} + >>> tester.run() + client receiving: interaction=None, data={u'status': u'OK'} Fin de Partie diff --git a/agent/talk/base.py b/agent/talk/base.py index c792574..d1ed3ef 100644 --- a/agent/talk/base.py +++ b/agent/talk/base.py @@ -22,24 +22,63 @@ Handling asynchronous communication tasks - common and base classes. $Id$ """ +from twisted.web.client import getPage from zope.interface import implements from cybertools.agent.talk.interfaces import ISession, IInteraction +from cybertools.util import json class Session(object): implements(ISession) - def __init__(self, manager): + def __init__(self, id, manager, subscriber, url): + self.id = id self.manager = manager + self.subscriber = subscriber + self.url = url self.state = 'logon' - self.id = None self.queue = [] self.interactions = {} - def receive(self, data): - print ('Session receiving, data=%s' % data) + def connected(self, data): + data = json.loads(data) + self.state = 'open' + self.subscriber.onMessage(None, data) + self._processQueue() + # self._poll() + + def received(self, data): + data = json.loads(data) + # TODO: check data + self._processQueue() + + def pollReceived(self, data): + data = json.loads(data) + if data.get('action') != 'idle': + self.subscriber.onMessage(None, data) + # self._poll() + + def _send(self, data, interaction): + if self.queue: + self.queue.append(data) + else: + self._sendData(data) + + def _processQueue(self): + if not self.queue: + return + + def _sendData(self, data): + content = dict(id=self.id, command='send', data=data) + d = getPage(self.url, postdata=json.dumps(content)) + d.addCallback(s.received) + + def _poll(self): + content = dict(id=self.id, command='poll') + d = getPage(self.url, postdata=json.dumps(content)) + d.addCallback(s.pollReceived) class Interaction(object): diff --git a/agent/talk/http.py b/agent/talk/http.py index 349d8c5..9bcbf39 100644 --- a/agent/talk/http.py +++ b/agent/talk/http.py @@ -22,7 +22,6 @@ Handling asynchronous and possibly asymmetric communication tasks via HTTP. $Id$ """ -from twisted.internet import defer from twisted.web.client import getPage from twisted.web.resource import Resource from twisted.web.server import Site @@ -33,6 +32,7 @@ from cybertools.agent.components import servers, clients from cybertools.agent.system.http import listener from cybertools.agent.talk.base import Session, Interaction from cybertools.agent.talk.interfaces import IServer, IClient +from cybertools.util import json # server implementation @@ -47,14 +47,16 @@ class HttpServer(object): self.port = agent.config.talk.server.http.port self.subscribers = {} self.sessions = {} - self.site = Site(RootResource()) + self.site = Site(RootResource(self)) def setup(self): print 'Setting up HTTP handler for port %i.' % self.port listener.listenTCP(self.port, self.site) def subscribe(self, subscriber, aspect): - pass + subs = self.subscribers.setdefault(aspect, []) + if subscriber not in subs: + subs.append(subscriber) def unsubscribe(self, subscriber, aspect): pass @@ -63,23 +65,54 @@ class HttpServer(object): # respond to open poll request or put in queue return defer.Deferred() # Interaction + def _process(self, client, data): + command = data.get('command') + if not command: + return self._error('missing command') + cmethod = self.commands.get(command) + if cmethod is None: + return self._error('illegal command %r' % command) + id = data.get('id') + if not id: + return self._error('missing id') + sessionId = ':'.join((client, data['id'])) + message = cmethod(self, sessionId, client, data) + if message: + return self._error(message) + return '{"status": "OK"}' + + def _connect(self, sessionId, client, data): + if sessionId in self.sessions: + return 'duplicate session id %r' % sessionId + self.sessions[sessionId] = Session(sessionId, self, None, client) + # TODO: notify subscribers + + def _poll(self, sessionId, client, data): + pass + + def _send(self, sessionId, client, data): + for sub in self.subscribers.values(): + sub.onMessage(data) + + def _error(self, message): + return json.dumps(dict(status='error', message=message)) + + commands = dict(connect=_connect, poll=_poll, send=_send) + servers.register(HttpServer, Master, name='http') class RootResource(Resource): - def getChild(self, path, request): - return CommandHandler(path) + isLeaf = True - -class CommandHandler(Resource): - - def __init__(self, path): - self.command = path + def __init__(self, server): + self.server = server def render(self, request): - #print request - return '{"message": "OK"}' + client = request.getClient() + data = json.loads(request.content.read()) + return self.server._process(client, data) # client implementation @@ -92,17 +125,31 @@ class HttpClient(object): def __init__(self, agent): self.agent = agent self.sessions = {} + self.count = 0 def connect(self, subscriber, url, credentials=None): - s = Session(self) - d = getPage(url) - d.addCallback(s.receive) + id = self.generateSessionId() + s = Session(self, id, subscriber, url) + self.sessions[id] = s + data = dict(command='connect', id=id) + if credentials is not None: + data.update(credentials) + # s._send(data, None) + d = getPage(url, postdata=json.dumps(data)) + d.addCallback(s.connected) return s def disconnect(self, session): pass def send(self, session, data, interaction=None): - return defer.Deferred() # Interaction + if interaction is None: + interaction = Interaction(session) + session._send(data, interaction) + return interaction # Interaction + + def generateSessionId(self): + self.count += 1 + return '%07i' % self.count clients.register(HttpClient, Master, name='http') diff --git a/agent/talk/interfaces.py b/agent/talk/interfaces.py index 6f2d8b9..7faf74c 100644 --- a/agent/talk/interfaces.py +++ b/agent/talk/interfaces.py @@ -44,9 +44,11 @@ class IServer(Interface): def send(session, data, interaction=None): """ Send data to the remote client specified via the session given. + The session has to be created previously by a connect attempt + from the client. If interaction is None, create a new one. - Return a deferred providing the interaction with its current state. + Return the interaction. """ @@ -73,8 +75,9 @@ class IClient(Interface): """ Send data to the server specified via the session given. If interaction is None, create a new one. - Return a deferred providing the interaction with its current state; - sending an interaction with ``finished`` set to True signifies + Return the interaction. + + Sending an interaction with ``finished`` set to True signifies the last message of an interaction. """ @@ -85,10 +88,14 @@ class ISubscriber(Interface): """ May receive message notifications. """ - def onMesssage(interaction, data): + def onMessage(interaction, data): """ Callback method for message notifications. """ + def onError(interaction, data): + """ Callback method for error notifications. + """ + class ISession(Interface): """ Represents the connection to a server within a client or @@ -97,7 +104,8 @@ class ISession(Interface): manager = Attribute("""The server or client object, respectively, that created the session.""") - + subscriber = Attribute("The subscriber that initiated the session.") + url = Attribute("The URL of the server the session connects to.") state = Attribute("""A string specifying the current state of the session: 'logon': The remote client is trying to connect/log in, data may contain credential information; diff --git a/agent/tests.py b/agent/tests.py index 3dbd824..4cfbace 100755 --- a/agent/tests.py +++ b/agent/tests.py @@ -16,13 +16,29 @@ class Tester(object): """ Used for controlled execution of reactor iteration cycles. """ - def iterate(self, n=10, delays={}): + stopped = False + + def iterate(self, n=10, delay=0): + self.stopped = False for i in range(n): - delay = delays.get(i, 0) + if self.stopped: + return reactor.iterate(delay) + def run(self, maxduration=1.0, delay=0): + self.stopped = False + end = time.time() + maxduration + while not self.stopped: + reactor.iterate(delay) + if time.time() >= end: + return + + def stop(self): + self.stopped = True + def stopThreads(self): reactor.threadpool.stop() + reactor.threadpool = None tester = Tester()