work in progress: agent.talk - client connect basically working
git-svn-id: svn://svn.cy55.de/Zope3/src/cybertools/trunk@3291 fd906abe-77d9-0310-91a1-e0d9ade77398
This commit is contained in:
		
							parent
							
								
									fa7f9f3d3f
								
							
						
					
					
						commit
						254ec32216
					
				
					 2 changed files with 70 additions and 48 deletions
				
			
		| 
						 | 
					@ -42,14 +42,7 @@ class Session(object):
 | 
				
			||||||
        self.sending = False
 | 
					        self.sending = False
 | 
				
			||||||
        self.queue = []
 | 
					        self.queue = []
 | 
				
			||||||
        self.interactions = {}
 | 
					        self.interactions = {}
 | 
				
			||||||
 | 
					        self.interactionCount = 0
 | 
				
			||||||
    def connected(self, data):
 | 
					 | 
				
			||||||
        data = json.loads(data)
 | 
					 | 
				
			||||||
        self.state = 'open'
 | 
					 | 
				
			||||||
        self.subscriber.onMessage(None, data)
 | 
					 | 
				
			||||||
        self.sending = False
 | 
					 | 
				
			||||||
        self._processQueue()
 | 
					 | 
				
			||||||
        # self._poll()
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def received(self, data):
 | 
					    def received(self, data):
 | 
				
			||||||
        data = json.loads(data)
 | 
					        data = json.loads(data)
 | 
				
			||||||
| 
						 | 
					@ -57,33 +50,34 @@ class Session(object):
 | 
				
			||||||
        self.sending = False
 | 
					        self.sending = False
 | 
				
			||||||
        self._processQueue()
 | 
					        self._processQueue()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def pollReceived(self, data):
 | 
					    def send(self, data, interaction):
 | 
				
			||||||
        data = json.loads(data)
 | 
					        data['interaction'] = interaction.id
 | 
				
			||||||
        if data.get('action') != 'idle':
 | 
					 | 
				
			||||||
            self.subscriber.onMessage(interaction, data)
 | 
					 | 
				
			||||||
        # self._poll()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    def _send(self, data, interaction):
 | 
					 | 
				
			||||||
        if self.sending or self.queue:
 | 
					        if self.sending or self.queue:
 | 
				
			||||||
            self.queue.append(data)
 | 
					            self.queue.append(data)
 | 
				
			||||||
        else:
 | 
					        else:
 | 
				
			||||||
            self._sendData(data)
 | 
					            self._sendData(data)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def _processQueue(self):
 | 
					    def processQueue(self):
 | 
				
			||||||
        if not self.queue:
 | 
					        if not self.queue:
 | 
				
			||||||
            return
 | 
					            return
 | 
				
			||||||
        self._sendData(self.queue.pop(0))
 | 
					        self._sendData(self.queue.pop(0))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def _sendData(self, data, command='send'):
 | 
					    def sendData(self, data, command='send'):
 | 
				
			||||||
        self.sending = True
 | 
					        self.sending = True
 | 
				
			||||||
        content = dict(id=self.id, command=command, data=data)
 | 
					        content = dict(id=self.id, command=command, data=data)
 | 
				
			||||||
        d = getPage(self.url, postdata=json.dumps(content))
 | 
					        d = getPage(self.url, postdata=json.dumps(content))
 | 
				
			||||||
        d.addCallback(s.received)
 | 
					        d.addCallback(s.received)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def _poll(self):
 | 
					    def connected(self, data):
 | 
				
			||||||
        content = dict(id=self.id, command='poll')
 | 
					        data = json.loads(data)
 | 
				
			||||||
        d = getPage(self.url, postdata=json.dumps(content))
 | 
					        self.state = 'open'
 | 
				
			||||||
        d.addCallback(s.pollReceived)
 | 
					        self.subscriber.onMessage(None, data)
 | 
				
			||||||
 | 
					        self.sending = False
 | 
				
			||||||
 | 
					        self.processQueue()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def generateInteractionId(self):
 | 
				
			||||||
 | 
					        self.interactionCount += 1
 | 
				
			||||||
 | 
					        return '%07i' % self.interactionCount
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class Interaction(object):
 | 
					class Interaction(object):
 | 
				
			||||||
| 
						 | 
					@ -94,4 +88,6 @@ class Interaction(object):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def __init__(self, session):
 | 
					    def __init__(self, session):
 | 
				
			||||||
        self.session = session
 | 
					        self.session = session
 | 
				
			||||||
 | 
					        self.id = self.session.generateInteractionId()
 | 
				
			||||||
 | 
					        self.session.interactions[self.id] = self
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -22,9 +22,10 @@ Handling asynchronous and possibly asymmetric communication tasks via HTTP.
 | 
				
			||||||
$Id$
 | 
					$Id$
 | 
				
			||||||
"""
 | 
					"""
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					from time import time
 | 
				
			||||||
from twisted.web.client import getPage
 | 
					from twisted.web.client import getPage
 | 
				
			||||||
from twisted.web.resource import Resource
 | 
					from twisted.web.resource import Resource
 | 
				
			||||||
from twisted.web.server import Site
 | 
					from twisted.web.server import Site, NOT_DONE_YET
 | 
				
			||||||
from zope.interface import implements
 | 
					from zope.interface import implements
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from cybertools.agent.base.agent import Master
 | 
					from cybertools.agent.base.agent import Master
 | 
				
			||||||
| 
						 | 
					@ -62,21 +63,24 @@ class HttpServer(object):
 | 
				
			||||||
        pass
 | 
					        pass
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def send(self, session, data, interaction=None):
 | 
					    def send(self, session, data, interaction=None):
 | 
				
			||||||
        # respond to open poll request or put in queue
 | 
					        if interaction is None:
 | 
				
			||||||
        return defer.Deferred() # Interaction
 | 
					            interaction = Interaction(session)
 | 
				
			||||||
 | 
					        # check session's queue
 | 
				
			||||||
 | 
					        # check open poll - write response
 | 
				
			||||||
 | 
					        return interaction
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def _process(self, client, data):
 | 
					    def process(self, client, data):
 | 
				
			||||||
        command = data.get('command')
 | 
					        action = data.get('action')
 | 
				
			||||||
        if not command:
 | 
					        if not action:
 | 
				
			||||||
            return self._error('missing command')
 | 
					            return self._error('missing action')
 | 
				
			||||||
        cmethod = self.commands.get(command)
 | 
					        amethod = self.actions.get(action)
 | 
				
			||||||
        if cmethod is None:
 | 
					        if amethod is None:
 | 
				
			||||||
            return self._error('illegal command %r' % command)
 | 
					            return self._error('illegal action %r' % action)
 | 
				
			||||||
        id = data.get('id')
 | 
					        sid = data.get('session')
 | 
				
			||||||
        if not id:
 | 
					        if not sid:
 | 
				
			||||||
            return self._error('missing id')
 | 
					            return self._error('missing session id')
 | 
				
			||||||
        sessionId = ':'.join((client, data['id']))
 | 
					        sessionId = ':'.join((client, sid))
 | 
				
			||||||
        message = cmethod(self, sessionId, client, data)
 | 
					        message = amethod(self, sessionId, client, data)
 | 
				
			||||||
        if message:
 | 
					        if message:
 | 
				
			||||||
            return self._error(message)
 | 
					            return self._error(message)
 | 
				
			||||||
        return '{"status": "OK"}'
 | 
					        return '{"status": "OK"}'
 | 
				
			||||||
| 
						 | 
					@ -84,11 +88,12 @@ class HttpServer(object):
 | 
				
			||||||
    def _connect(self, sessionId, client, data):
 | 
					    def _connect(self, sessionId, client, data):
 | 
				
			||||||
        if sessionId in self.sessions:
 | 
					        if sessionId in self.sessions:
 | 
				
			||||||
            return 'duplicate session id %r' % sessionId
 | 
					            return 'duplicate session id %r' % sessionId
 | 
				
			||||||
        self.sessions[sessionId] = Session(sessionId, self, None, client)
 | 
					        self.sessions[sessionId] = HttpServerSession(sessionId, self, None, client)
 | 
				
			||||||
        # TODO: notify subscribers
 | 
					        # TODO: notify subscribers
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def _poll(self, sessionId, client, data):
 | 
					    def _poll(self, sessionId, client, data):
 | 
				
			||||||
        pass
 | 
					        # record deferred with session
 | 
				
			||||||
 | 
					        return NOT_DONE_YET
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def _send(self, sessionId, client, data):
 | 
					    def _send(self, sessionId, client, data):
 | 
				
			||||||
        for sub in self.subscribers.values():
 | 
					        for sub in self.subscribers.values():
 | 
				
			||||||
| 
						 | 
					@ -97,7 +102,7 @@ class HttpServer(object):
 | 
				
			||||||
    def _error(self, message):
 | 
					    def _error(self, message):
 | 
				
			||||||
        return json.dumps(dict(status='error', message=message))
 | 
					        return json.dumps(dict(status='error', message=message))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    commands = dict(connect=_connect, poll=_poll, send=_send)
 | 
					    actions = dict(connect=_connect, poll=_poll, send=_send)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
servers.register(HttpServer, Master, name='http')
 | 
					servers.register(HttpServer, Master, name='http')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -112,7 +117,12 @@ class RootResource(Resource):
 | 
				
			||||||
    def render(self, request):
 | 
					    def render(self, request):
 | 
				
			||||||
        client = request.getClient()
 | 
					        client = request.getClient()
 | 
				
			||||||
        data = json.loads(request.content.read())
 | 
					        data = json.loads(request.content.read())
 | 
				
			||||||
        return self.server._process(client, data)
 | 
					        return self.server.process(client, data)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class HttpServerSession(Session):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    pass
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# client implementation
 | 
					# client implementation
 | 
				
			||||||
| 
						 | 
					@ -125,16 +135,15 @@ class HttpClient(object):
 | 
				
			||||||
    def __init__(self, agent):
 | 
					    def __init__(self, agent):
 | 
				
			||||||
        self.agent = agent
 | 
					        self.agent = agent
 | 
				
			||||||
        self.sessions = {}
 | 
					        self.sessions = {}
 | 
				
			||||||
        self.count = 0
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def connect(self, subscriber, url, credentials=None):
 | 
					    def connect(self, subscriber, url, credentials=None):
 | 
				
			||||||
        id = self.generateSessionId()
 | 
					        id = self.generateSessionId()
 | 
				
			||||||
        s = Session(self, id, subscriber, url)
 | 
					        s = HttpClientSession(self, id, subscriber, url)
 | 
				
			||||||
        self.sessions[id] = s
 | 
					        self.sessions[id] = s
 | 
				
			||||||
        data = dict(command='connect', id=id)
 | 
					        data = dict(action='connect', session=id)
 | 
				
			||||||
        if credentials is not None:
 | 
					        if credentials is not None:
 | 
				
			||||||
            data.update(credentials)
 | 
					            data.update(credentials)
 | 
				
			||||||
        # s._send(data, None)
 | 
					        # s.send(data, None)
 | 
				
			||||||
        d = getPage(url, postdata=json.dumps(data))
 | 
					        d = getPage(url, postdata=json.dumps(data))
 | 
				
			||||||
        d.addCallback(s.connected)
 | 
					        d.addCallback(s.connected)
 | 
				
			||||||
        return s
 | 
					        return s
 | 
				
			||||||
| 
						 | 
					@ -145,11 +154,28 @@ class HttpClient(object):
 | 
				
			||||||
    def send(self, session, data, interaction=None):
 | 
					    def send(self, session, data, interaction=None):
 | 
				
			||||||
        if interaction is None:
 | 
					        if interaction is None:
 | 
				
			||||||
            interaction = Interaction(session)
 | 
					            interaction = Interaction(session)
 | 
				
			||||||
        session._send(data, interaction)
 | 
					        session.send(data, interaction)
 | 
				
			||||||
        return interaction # Interaction
 | 
					        return interaction
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def generateSessionId(self):
 | 
					    def generateSessionId(self):
 | 
				
			||||||
        self.count += 1
 | 
					        return str(time())
 | 
				
			||||||
        return '%07i' % self.count
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
clients.register(HttpClient, Master, name='http')
 | 
					clients.register(HttpClient, Master, name='http')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					class HttpClientSession(Session):
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def connected(self, data):
 | 
				
			||||||
 | 
					        super(HttpClientSession, self).connected(data)
 | 
				
			||||||
 | 
					        # self.poll()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def pollReceived(self, data):
 | 
				
			||||||
 | 
					        data = json.loads(data)
 | 
				
			||||||
 | 
					        if data.get('action') != 'idle':
 | 
				
			||||||
 | 
					            self.subscriber.onMessage(interaction, data)
 | 
				
			||||||
 | 
					        # self.poll()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    def poll(self):
 | 
				
			||||||
 | 
					        content = dict(id=self.id, command='poll')
 | 
				
			||||||
 | 
					        d = getPage(self.url, postdata=json.dumps(content))
 | 
				
			||||||
 | 
					        d.addCallback(s.pollReceived)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		
		Reference in a new issue