diff --git a/agent/talk/base.py b/agent/talk/base.py index b61ca84..2975667 100644 --- a/agent/talk/base.py +++ b/agent/talk/base.py @@ -42,14 +42,7 @@ class Session(object): self.sending = False self.queue = [] self.interactions = {} - - def connected(self, data): - data = json.loads(data) - self.state = 'open' - self.subscriber.onMessage(None, data) - self.sending = False - self._processQueue() - # self._poll() + self.interactionCount = 0 def received(self, data): data = json.loads(data) @@ -57,33 +50,34 @@ class Session(object): self.sending = False self._processQueue() - def pollReceived(self, data): - data = json.loads(data) - if data.get('action') != 'idle': - self.subscriber.onMessage(interaction, data) - # self._poll() - - def _send(self, data, interaction): + def send(self, data, interaction): + data['interaction'] = interaction.id if self.sending or self.queue: self.queue.append(data) else: self._sendData(data) - def _processQueue(self): + def processQueue(self): if not self.queue: return self._sendData(self.queue.pop(0)) - def _sendData(self, data, command='send'): + def sendData(self, data, command='send'): self.sending = True content = dict(id=self.id, command=command, data=data) d = getPage(self.url, postdata=json.dumps(content)) d.addCallback(s.received) - def _poll(self): - content = dict(id=self.id, command='poll') - d = getPage(self.url, postdata=json.dumps(content)) - d.addCallback(s.pollReceived) + def connected(self, data): + data = json.loads(data) + self.state = 'open' + self.subscriber.onMessage(None, data) + self.sending = False + self.processQueue() + + def generateInteractionId(self): + self.interactionCount += 1 + return '%07i' % self.interactionCount class Interaction(object): @@ -94,4 +88,6 @@ class Interaction(object): def __init__(self, session): self.session = session + self.id = self.session.generateInteractionId() + self.session.interactions[self.id] = self diff --git a/agent/talk/http.py b/agent/talk/http.py index 9bcbf39..cfecc2c 100644 --- a/agent/talk/http.py +++ b/agent/talk/http.py @@ -22,9 +22,10 @@ Handling asynchronous and possibly asymmetric communication tasks via HTTP. $Id$ """ +from time import time from twisted.web.client import getPage from twisted.web.resource import Resource -from twisted.web.server import Site +from twisted.web.server import Site, NOT_DONE_YET from zope.interface import implements from cybertools.agent.base.agent import Master @@ -62,21 +63,24 @@ class HttpServer(object): pass def send(self, session, data, interaction=None): - # respond to open poll request or put in queue - return defer.Deferred() # Interaction + if interaction is None: + interaction = Interaction(session) + # check session's queue + # check open poll - write response + return interaction - def _process(self, client, data): - command = data.get('command') - if not command: - return self._error('missing command') - cmethod = self.commands.get(command) - if cmethod is None: - return self._error('illegal command %r' % command) - id = data.get('id') - if not id: - return self._error('missing id') - sessionId = ':'.join((client, data['id'])) - message = cmethod(self, sessionId, client, data) + def process(self, client, data): + action = data.get('action') + if not action: + return self._error('missing action') + amethod = self.actions.get(action) + if amethod is None: + return self._error('illegal action %r' % action) + sid = data.get('session') + if not sid: + return self._error('missing session id') + sessionId = ':'.join((client, sid)) + message = amethod(self, sessionId, client, data) if message: return self._error(message) return '{"status": "OK"}' @@ -84,11 +88,12 @@ class HttpServer(object): def _connect(self, sessionId, client, data): if sessionId in self.sessions: return 'duplicate session id %r' % sessionId - self.sessions[sessionId] = Session(sessionId, self, None, client) + self.sessions[sessionId] = HttpServerSession(sessionId, self, None, client) # TODO: notify subscribers def _poll(self, sessionId, client, data): - pass + # record deferred with session + return NOT_DONE_YET def _send(self, sessionId, client, data): for sub in self.subscribers.values(): @@ -97,7 +102,7 @@ class HttpServer(object): def _error(self, message): return json.dumps(dict(status='error', message=message)) - commands = dict(connect=_connect, poll=_poll, send=_send) + actions = dict(connect=_connect, poll=_poll, send=_send) servers.register(HttpServer, Master, name='http') @@ -112,7 +117,12 @@ class RootResource(Resource): def render(self, request): client = request.getClient() data = json.loads(request.content.read()) - return self.server._process(client, data) + return self.server.process(client, data) + + +class HttpServerSession(Session): + + pass # client implementation @@ -125,16 +135,15 @@ class HttpClient(object): def __init__(self, agent): self.agent = agent self.sessions = {} - self.count = 0 def connect(self, subscriber, url, credentials=None): id = self.generateSessionId() - s = Session(self, id, subscriber, url) + s = HttpClientSession(self, id, subscriber, url) self.sessions[id] = s - data = dict(command='connect', id=id) + data = dict(action='connect', session=id) if credentials is not None: data.update(credentials) - # s._send(data, None) + # s.send(data, None) d = getPage(url, postdata=json.dumps(data)) d.addCallback(s.connected) return s @@ -145,11 +154,28 @@ class HttpClient(object): def send(self, session, data, interaction=None): if interaction is None: interaction = Interaction(session) - session._send(data, interaction) - return interaction # Interaction + session.send(data, interaction) + return interaction def generateSessionId(self): - self.count += 1 - return '%07i' % self.count + return str(time()) clients.register(HttpClient, Master, name='http') + + +class HttpClientSession(Session): + + def connected(self, data): + super(HttpClientSession, self).connected(data) + # self.poll() + + def pollReceived(self, data): + data = json.loads(data) + if data.get('action') != 'idle': + self.subscriber.onMessage(interaction, data) + # self.poll() + + def poll(self): + content = dict(id=self.id, command='poll') + d = getPage(self.url, postdata=json.dumps(content)) + d.addCallback(s.pollReceived)