work in progress: agent.talk - client connect basically working

git-svn-id: svn://svn.cy55.de/Zope3/src/cybertools/trunk@3289 fd906abe-77d9-0310-91a1-e0d9ade77398
This commit is contained in:
helmutm 2009-03-22 15:46:13 +00:00
parent f2a8a754fc
commit e43810beb2
5 changed files with 142 additions and 31 deletions

View file

@ -4,6 +4,8 @@ Agents for Job Execution and Communication Tasks
($Id$)
>>> from cybertools.agent.tests import tester
Communication Handling
======================
@ -40,9 +42,9 @@ that receive messages.
... def onMessage(self, interaction, data):
... print ('%s receiving: interaction=%s, data=%s' %
... (self.name, interaction, data))
... tester.stop()
>>> serverSub = Subscriber('server')
>>> master.servers[0].subscribe(serverSub, 'testing')
Set up a client
@ -61,9 +63,8 @@ work with the client but handle the client directly.
Run the communication dialog
----------------------------
>>> from cybertools.agent.tests import tester
>>> tester.iterate(400)
Session receiving, data={"message": "OK"}
>>> tester.run()
client receiving: interaction=None, data={u'status': u'OK'}
Fin de Partie

View file

@ -22,24 +22,63 @@ Handling asynchronous communication tasks - common and base classes.
$Id$
"""
from twisted.web.client import getPage
from zope.interface import implements
from cybertools.agent.talk.interfaces import ISession, IInteraction
from cybertools.util import json
class Session(object):
implements(ISession)
def __init__(self, manager):
def __init__(self, id, manager, subscriber, url):
self.id = id
self.manager = manager
self.subscriber = subscriber
self.url = url
self.state = 'logon'
self.id = None
self.queue = []
self.interactions = {}
def receive(self, data):
print ('Session receiving, data=%s' % data)
def connected(self, data):
data = json.loads(data)
self.state = 'open'
self.subscriber.onMessage(None, data)
self._processQueue()
# self._poll()
def received(self, data):
data = json.loads(data)
# TODO: check data
self._processQueue()
def pollReceived(self, data):
data = json.loads(data)
if data.get('action') != 'idle':
self.subscriber.onMessage(None, data)
# self._poll()
def _send(self, data, interaction):
if self.queue:
self.queue.append(data)
else:
self._sendData(data)
def _processQueue(self):
if not self.queue:
return
def _sendData(self, data):
content = dict(id=self.id, command='send', data=data)
d = getPage(self.url, postdata=json.dumps(content))
d.addCallback(s.received)
def _poll(self):
content = dict(id=self.id, command='poll')
d = getPage(self.url, postdata=json.dumps(content))
d.addCallback(s.pollReceived)
class Interaction(object):

View file

@ -22,7 +22,6 @@ Handling asynchronous and possibly asymmetric communication tasks via HTTP.
$Id$
"""
from twisted.internet import defer
from twisted.web.client import getPage
from twisted.web.resource import Resource
from twisted.web.server import Site
@ -33,6 +32,7 @@ from cybertools.agent.components import servers, clients
from cybertools.agent.system.http import listener
from cybertools.agent.talk.base import Session, Interaction
from cybertools.agent.talk.interfaces import IServer, IClient
from cybertools.util import json
# server implementation
@ -47,14 +47,16 @@ class HttpServer(object):
self.port = agent.config.talk.server.http.port
self.subscribers = {}
self.sessions = {}
self.site = Site(RootResource())
self.site = Site(RootResource(self))
def setup(self):
print 'Setting up HTTP handler for port %i.' % self.port
listener.listenTCP(self.port, self.site)
def subscribe(self, subscriber, aspect):
pass
subs = self.subscribers.setdefault(aspect, [])
if subscriber not in subs:
subs.append(subscriber)
def unsubscribe(self, subscriber, aspect):
pass
@ -63,23 +65,54 @@ class HttpServer(object):
# respond to open poll request or put in queue
return defer.Deferred() # Interaction
def _process(self, client, data):
command = data.get('command')
if not command:
return self._error('missing command')
cmethod = self.commands.get(command)
if cmethod is None:
return self._error('illegal command %r' % command)
id = data.get('id')
if not id:
return self._error('missing id')
sessionId = ':'.join((client, data['id']))
message = cmethod(self, sessionId, client, data)
if message:
return self._error(message)
return '{"status": "OK"}'
def _connect(self, sessionId, client, data):
if sessionId in self.sessions:
return 'duplicate session id %r' % sessionId
self.sessions[sessionId] = Session(sessionId, self, None, client)
# TODO: notify subscribers
def _poll(self, sessionId, client, data):
pass
def _send(self, sessionId, client, data):
for sub in self.subscribers.values():
sub.onMessage(data)
def _error(self, message):
return json.dumps(dict(status='error', message=message))
commands = dict(connect=_connect, poll=_poll, send=_send)
servers.register(HttpServer, Master, name='http')
class RootResource(Resource):
def getChild(self, path, request):
return CommandHandler(path)
isLeaf = True
class CommandHandler(Resource):
def __init__(self, path):
self.command = path
def __init__(self, server):
self.server = server
def render(self, request):
#print request
return '{"message": "OK"}'
client = request.getClient()
data = json.loads(request.content.read())
return self.server._process(client, data)
# client implementation
@ -92,17 +125,31 @@ class HttpClient(object):
def __init__(self, agent):
self.agent = agent
self.sessions = {}
self.count = 0
def connect(self, subscriber, url, credentials=None):
s = Session(self)
d = getPage(url)
d.addCallback(s.receive)
id = self.generateSessionId()
s = Session(self, id, subscriber, url)
self.sessions[id] = s
data = dict(command='connect', id=id)
if credentials is not None:
data.update(credentials)
# s._send(data, None)
d = getPage(url, postdata=json.dumps(data))
d.addCallback(s.connected)
return s
def disconnect(self, session):
pass
def send(self, session, data, interaction=None):
return defer.Deferred() # Interaction
if interaction is None:
interaction = Interaction(session)
session._send(data, interaction)
return interaction # Interaction
def generateSessionId(self):
self.count += 1
return '%07i' % self.count
clients.register(HttpClient, Master, name='http')

View file

@ -44,9 +44,11 @@ class IServer(Interface):
def send(session, data, interaction=None):
""" Send data to the remote client specified via the session given.
The session has to be created previously by a connect attempt
from the client.
If interaction is None, create a new one.
Return a deferred providing the interaction with its current state.
Return the interaction.
"""
@ -73,8 +75,9 @@ class IClient(Interface):
""" Send data to the server specified via the session given.
If interaction is None, create a new one.
Return a deferred providing the interaction with its current state;
sending an interaction with ``finished`` set to True signifies
Return the interaction.
Sending an interaction with ``finished`` set to True signifies
the last message of an interaction.
"""
@ -85,10 +88,14 @@ class ISubscriber(Interface):
""" May receive message notifications.
"""
def onMesssage(interaction, data):
def onMessage(interaction, data):
""" Callback method for message notifications.
"""
def onError(interaction, data):
""" Callback method for error notifications.
"""
class ISession(Interface):
""" Represents the connection to a server within a client or
@ -97,7 +104,8 @@ class ISession(Interface):
manager = Attribute("""The server or client object, respectively, that
created the session.""")
subscriber = Attribute("The subscriber that initiated the session.")
url = Attribute("The URL of the server the session connects to.")
state = Attribute("""A string specifying the current state of the session:
'logon': The remote client is trying to connect/log in,
data may contain credential information;

View file

@ -16,13 +16,29 @@ class Tester(object):
""" Used for controlled execution of reactor iteration cycles.
"""
def iterate(self, n=10, delays={}):
stopped = False
def iterate(self, n=10, delay=0):
self.stopped = False
for i in range(n):
delay = delays.get(i, 0)
if self.stopped:
return
reactor.iterate(delay)
def run(self, maxduration=1.0, delay=0):
self.stopped = False
end = time.time() + maxduration
while not self.stopped:
reactor.iterate(delay)
if time.time() >= end:
return
def stop(self):
self.stopped = True
def stopThreads(self):
reactor.threadpool.stop()
reactor.threadpool = None
tester = Tester()