work in progress: 'agent.talk' HTTP communication
git-svn-id: svn://svn.cy55.de/Zope3/src/cybertools/trunk@3276 fd906abe-77d9-0310-91a1-e0d9ade77398
This commit is contained in:
parent
b0a545d16f
commit
c0ce542b9c
6 changed files with 64 additions and 8 deletions
|
@ -2,7 +2,7 @@
|
||||||
Agents for Job Execution and Communication Tasks
|
Agents for Job Execution and Communication Tasks
|
||||||
================================================
|
================================================
|
||||||
|
|
||||||
Agents do some work specified by a jobs, the main task being to collect
|
Agents do some work specified by jobs, the main task being to collect
|
||||||
information objects from the local machine or some external source and
|
information objects from the local machine or some external source and
|
||||||
transfer them e.g. to a loops server on the same machine or another.
|
transfer them e.g. to a loops server on the same machine or another.
|
||||||
|
|
||||||
|
|
|
@ -11,6 +11,9 @@ Communication Handling
|
||||||
Communication services are provided by handlers specified in the ``talk``
|
Communication services are provided by handlers specified in the ``talk``
|
||||||
package.
|
package.
|
||||||
|
|
||||||
|
Set up and start an agent with a server
|
||||||
|
---------------------------------------
|
||||||
|
|
||||||
>>> config = '''
|
>>> config = '''
|
||||||
... controller(names=['core.sample'])
|
... controller(names=['core.sample'])
|
||||||
... scheduler(name='core')
|
... scheduler(name='core')
|
||||||
|
@ -25,6 +28,45 @@ package.
|
||||||
Using controllers core.sample.
|
Using controllers core.sample.
|
||||||
Setting up HTTP handler for port 8081.
|
Setting up HTTP handler for port 8081.
|
||||||
|
|
||||||
|
|
||||||
>>> master.servers
|
>>> master.servers
|
||||||
[<cybertools.agent.talk.http.HttpServer object...>]
|
[<cybertools.agent.talk.http.HttpServer object...>]
|
||||||
|
|
||||||
|
We also provide a class to be used for creating subscribers, i.e. objects
|
||||||
|
that receive messages.
|
||||||
|
|
||||||
|
>>> class Subscriber(object):
|
||||||
|
... def __init__(self, name):
|
||||||
|
... self.name = name
|
||||||
|
... def onMessage(self, interaction, data):
|
||||||
|
... print ('%s receiving: interaction=%s, data=%s' %
|
||||||
|
... (self.name, interaction, data))
|
||||||
|
|
||||||
|
>>> serverSub = Subscriber('server')
|
||||||
|
|
||||||
|
>>> master.servers[0].subscribe(serverSub, 'testing')
|
||||||
|
|
||||||
|
Set up a client
|
||||||
|
---------------
|
||||||
|
|
||||||
|
In order to simplify the testing we do not set up a separate agent to
|
||||||
|
work with the client but handle the client directly.
|
||||||
|
|
||||||
|
>>> from cybertools.agent.talk.http import HttpClient
|
||||||
|
>>> client = HttpClient(master)
|
||||||
|
|
||||||
|
>>> clientSub = Subscriber('client')
|
||||||
|
|
||||||
|
>>> session = client.connect(clientSub, 'http://localhost:8081/')
|
||||||
|
|
||||||
|
Run the communication dialog
|
||||||
|
----------------------------
|
||||||
|
|
||||||
|
>>> from cybertools.agent.tests import tester
|
||||||
|
>>> tester.iterate(400)
|
||||||
|
Session receiving, data={"message": "OK"}
|
||||||
|
|
||||||
|
|
||||||
|
Fin de Partie
|
||||||
|
=============
|
||||||
|
|
||||||
|
>>> tester.stopThreads()
|
||||||
|
|
|
@ -33,8 +33,14 @@ class Session(object):
|
||||||
|
|
||||||
def __init__(self, manager):
|
def __init__(self, manager):
|
||||||
self.manager = manager
|
self.manager = manager
|
||||||
|
self.state = 'logon'
|
||||||
|
self.id = None
|
||||||
|
self.queue = []
|
||||||
self.interactions = {}
|
self.interactions = {}
|
||||||
|
|
||||||
|
def receive(self, data):
|
||||||
|
print ('Session receiving, data=%s' % data)
|
||||||
|
|
||||||
|
|
||||||
class Interaction(object):
|
class Interaction(object):
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
#
|
#
|
||||||
# Copyright (c) 2008 Helmut Merz helmutm@cy55.de
|
# Copyright (c) 2009 Helmut Merz helmutm@cy55.de
|
||||||
#
|
#
|
||||||
# This program is free software; you can redistribute it and/or modify
|
# This program is free software; you can redistribute it and/or modify
|
||||||
# it under the terms of the GNU General Public License as published by
|
# it under the terms of the GNU General Public License as published by
|
||||||
|
@ -23,6 +23,7 @@ $Id$
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
from twisted.web.client import getPage
|
||||||
from twisted.web.resource import Resource
|
from twisted.web.resource import Resource
|
||||||
from twisted.web.server import Site
|
from twisted.web.server import Site
|
||||||
from zope.interface import implements
|
from zope.interface import implements
|
||||||
|
@ -77,7 +78,7 @@ class CommandHandler(Resource):
|
||||||
self.command = path
|
self.command = path
|
||||||
|
|
||||||
def render(self, request):
|
def render(self, request):
|
||||||
#
|
#print request
|
||||||
return '{"message": "OK"}'
|
return '{"message": "OK"}'
|
||||||
|
|
||||||
|
|
||||||
|
@ -90,9 +91,13 @@ class HttpClient(object):
|
||||||
|
|
||||||
def __init__(self, agent):
|
def __init__(self, agent):
|
||||||
self.agent = agent
|
self.agent = agent
|
||||||
|
self.sessions = {}
|
||||||
|
|
||||||
def connect(self, subscriber, url, credentials=None):
|
def connect(self, subscriber, url, credentials=None):
|
||||||
return defer.Deferred() # Session
|
s = Session(self)
|
||||||
|
d = getPage(url)
|
||||||
|
d.addCallback(s.receive)
|
||||||
|
return s
|
||||||
|
|
||||||
def disconnect(self, session):
|
def disconnect(self, session):
|
||||||
pass
|
pass
|
||||||
|
|
|
@ -61,8 +61,8 @@ class IClient(Interface):
|
||||||
|
|
||||||
The subscriber will receive messages via its ``onMesssage`` callback.
|
The subscriber will receive messages via its ``onMesssage`` callback.
|
||||||
|
|
||||||
Return a Deferred that will provide an ISession implementation;
|
Return a an ISession implementation that may be used for sending
|
||||||
this may then be used sending data to the server.
|
data to the server.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def disconnect(session):
|
def disconnect(session):
|
||||||
|
|
|
@ -21,6 +21,9 @@ class Tester(object):
|
||||||
delay = delays.get(i, 0)
|
delay = delays.get(i, 0)
|
||||||
reactor.iterate(delay)
|
reactor.iterate(delay)
|
||||||
|
|
||||||
|
def stopThreads(self):
|
||||||
|
reactor.threadpool.stop()
|
||||||
|
|
||||||
tester = Tester()
|
tester = Tester()
|
||||||
|
|
||||||
|
|
||||||
|
@ -42,11 +45,11 @@ def test_suite():
|
||||||
testSuite = unittest.TestSuite((
|
testSuite = unittest.TestSuite((
|
||||||
unittest.makeSuite(Test),
|
unittest.makeSuite(Test),
|
||||||
DocFileSuite('README.txt', optionflags=flags),
|
DocFileSuite('README.txt', optionflags=flags),
|
||||||
DocFileSuite('talk/README.txt', optionflags=flags),
|
|
||||||
DocFileSuite('crawl/README.txt', optionflags=flags),
|
DocFileSuite('crawl/README.txt', optionflags=flags),
|
||||||
DocFileSuite('crawl/filesystem.txt', optionflags=flags),
|
DocFileSuite('crawl/filesystem.txt', optionflags=flags),
|
||||||
DocFileSuite('crawl/outlook.txt', optionflags=flags),
|
DocFileSuite('crawl/outlook.txt', optionflags=flags),
|
||||||
DocFileSuite('transport/transporter.txt', optionflags=flags),
|
DocFileSuite('transport/transporter.txt', optionflags=flags),
|
||||||
|
DocFileSuite('talk/README.txt', optionflags=flags),
|
||||||
))
|
))
|
||||||
return testSuite
|
return testSuite
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue