work in progress: talk package for generic data communication tasks
git-svn-id: svn://svn.cy55.de/Zope3/src/cybertools/trunk@2924 fd906abe-77d9-0310-91a1-e0d9ade77398
This commit is contained in:
parent
e0503fea5b
commit
9a6d66dfea
8 changed files with 192 additions and 21 deletions
|
@ -128,6 +128,8 @@ We make the contollers provide the specifications via the master agent's
|
||||||
``setup()`` method.
|
``setup()`` method.
|
||||||
|
|
||||||
>>> master.setup()
|
>>> master.setup()
|
||||||
|
Starting agent application...
|
||||||
|
Using controllers base.sample.
|
||||||
|
|
||||||
Other Agents
|
Other Agents
|
||||||
------------
|
------------
|
||||||
|
|
|
@ -27,6 +27,7 @@ from zope.interface import implements
|
||||||
from cybertools.agent.common import states
|
from cybertools.agent.common import states
|
||||||
from cybertools.agent.components import agents, controllers, jobs
|
from cybertools.agent.components import agents, controllers, jobs
|
||||||
from cybertools.agent.components import loggers, schedulers
|
from cybertools.agent.components import loggers, schedulers
|
||||||
|
from cybertools.agent.components import servers, clients
|
||||||
from cybertools.agent.interfaces import IAgent
|
from cybertools.agent.interfaces import IAgent
|
||||||
from cybertools.util.config import Configurator
|
from cybertools.util.config import Configurator
|
||||||
|
|
||||||
|
@ -71,15 +72,22 @@ class Master(Agent):
|
||||||
self.master = self
|
self.master = self
|
||||||
self.controllers = []
|
self.controllers = []
|
||||||
self.children = {}
|
self.children = {}
|
||||||
|
self.servers = []
|
||||||
|
|
||||||
def setup(self):
|
def setup(self):
|
||||||
config = self.config
|
config = self.config
|
||||||
self.logger = loggers(self, name=config.logger.name)
|
self.logger = loggers(self, name=config.logger.name)
|
||||||
|
print 'Starting agent application...'
|
||||||
for n in config.controller.names:
|
for n in config.controller.names:
|
||||||
self.controllers.append(controllers(self, n))
|
self.controllers.append(controllers(self, n))
|
||||||
self.scheduler = schedulers(self, name=config.scheduler.name)
|
self.scheduler = schedulers(self, name=config.scheduler.name)
|
||||||
for cont in self.controllers:
|
for cont in self.controllers:
|
||||||
cont.setup()
|
cont.setup()
|
||||||
|
print 'Using controllers %s.' % ', '.join(config.controller.names)
|
||||||
|
for n in config.talk.server.names:
|
||||||
|
server = servers(self, n)
|
||||||
|
self.servers.append(server)
|
||||||
|
server.setup()
|
||||||
|
|
||||||
def setupAgents(self, controller, agentSpecs):
|
def setupAgents(self, controller, agentSpecs):
|
||||||
for spec in agentSpecs:
|
for spec in agentSpecs:
|
||||||
|
|
|
@ -30,3 +30,5 @@ controllers = AdapterFactory()
|
||||||
jobs = AdapterFactory()
|
jobs = AdapterFactory()
|
||||||
loggers = AdapterFactory()
|
loggers = AdapterFactory()
|
||||||
schedulers = AdapterFactory()
|
schedulers = AdapterFactory()
|
||||||
|
servers = AdapterFactory()
|
||||||
|
clients = AdapterFactory()
|
||||||
|
|
|
@ -47,18 +47,10 @@ def setup(configInfo=None):
|
||||||
master = Master(configInfo)
|
master = Master(configInfo)
|
||||||
setupEnvironment(master.config)
|
setupEnvironment(master.config)
|
||||||
master.setup()
|
master.setup()
|
||||||
print 'Starting agent application...'
|
|
||||||
print 'Using controllers %s.' % ', '.join(master.config.controller.names)
|
|
||||||
return master
|
return master
|
||||||
|
|
||||||
|
|
||||||
def setupEnvironment(config):
|
def setupEnvironment(config):
|
||||||
# self registration of components:
|
|
||||||
from cybertools.agent.base import agent, control, job, log, schedule
|
|
||||||
from cybertools.agent.core import agent, control, schedule
|
|
||||||
from cybertools.agent.control import cmdline, remote
|
|
||||||
from cybertools.agent.crawl import base, filesystem, outlook
|
|
||||||
from cybertools.agent.transport import remote, loops
|
|
||||||
# API registration:
|
# API registration:
|
||||||
from cybertools.agent.system.windows import api
|
from cybertools.agent.system.windows import api
|
||||||
api.setup(config)
|
api.setup(config)
|
||||||
|
@ -66,6 +58,13 @@ def setupEnvironment(config):
|
||||||
http.setup(config)
|
http.setup(config)
|
||||||
xmlrpc.setup(config)
|
xmlrpc.setup(config)
|
||||||
sftp.setup(config)
|
sftp.setup(config)
|
||||||
|
# self registration of components:
|
||||||
|
from cybertools.agent.base import agent, control, job, log, schedule
|
||||||
|
from cybertools.agent.core import agent, control, schedule
|
||||||
|
from cybertools.agent.control import cmdline, remote
|
||||||
|
from cybertools.agent.crawl import base, filesystem, outlook
|
||||||
|
from cybertools.agent.talk import http
|
||||||
|
from cybertools.agent.transport import remote, loops
|
||||||
|
|
||||||
|
|
||||||
def startReactor():
|
def startReactor():
|
||||||
|
|
|
@ -19,12 +19,12 @@
|
||||||
"""
|
"""
|
||||||
Configuration-controlled import of HTTP communication functionality.
|
Configuration-controlled import of HTTP communication functionality.
|
||||||
|
|
||||||
$Id: rpcapi.py
|
$Id$
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def setup(config):
|
def setup(config):
|
||||||
global listener, getPage
|
global listener, getPage
|
||||||
if config.talk.http == 'testing':
|
if config.talk.http.handler == 'testing':
|
||||||
from cybertools.agent.testing.http import listener, getPage
|
from cybertools.agent.testing.http import listener, getPage
|
||||||
else:
|
else:
|
||||||
from twisted.internet import reactor as listener
|
from twisted.internet import reactor as listener
|
||||||
|
|
|
@ -4,20 +4,27 @@ Agents for Job Execution and Communication Tasks
|
||||||
|
|
||||||
($Id$)
|
($Id$)
|
||||||
|
|
||||||
|
|
||||||
|
Communication Handling
|
||||||
|
======================
|
||||||
|
|
||||||
|
Communication services are provided by handlers specified in the ``talk``
|
||||||
|
package.
|
||||||
|
|
||||||
>>> config = '''
|
>>> config = '''
|
||||||
... controller(names=['core.sample'])
|
... controller(names=['core.sample'])
|
||||||
... scheduler(name='core')
|
... scheduler(name='core')
|
||||||
... logger(name='default', standard=30)
|
... logger(name='default', standard=30)
|
||||||
... talk.http = 'testing'
|
... talk.server(names=['http'])
|
||||||
|
... talk.server.http(port=8081)
|
||||||
|
... talk.http(handler='testing')
|
||||||
... '''
|
... '''
|
||||||
>>> from cybertools.agent.main import setup
|
>>> from cybertools.agent.main import setup
|
||||||
>>> master = setup(config)
|
>>> master = setup(config)
|
||||||
Starting agent application...
|
Starting agent application...
|
||||||
Using controllers core.sample.
|
Using controllers core.sample.
|
||||||
|
Setting up HTTP handler for port 8081.
|
||||||
|
|
||||||
|
|
||||||
Communication Handling
|
>>> master.servers
|
||||||
======================
|
[<cybertools.agent.talk.http.HttpServer object...>]
|
||||||
|
|
||||||
>>> from cybertools.agent.talk.http import Handler
|
|
||||||
|
|
||||||
|
|
|
@ -28,7 +28,10 @@ from twisted.web.server import Site
|
||||||
from zope.interface import implements
|
from zope.interface import implements
|
||||||
|
|
||||||
from cybertools.agent.base.agent import Master
|
from cybertools.agent.base.agent import Master
|
||||||
|
from cybertools.agent.components import servers, clients
|
||||||
from cybertools.agent.system.http import listener
|
from cybertools.agent.system.http import listener
|
||||||
|
from cybertools.agent.talk.interfaces import IServer, IClient
|
||||||
|
from cybertools.agent.talk.interfaces import ISession, IInteraction
|
||||||
|
|
||||||
|
|
||||||
class RootResource(Resource):
|
class RootResource(Resource):
|
||||||
|
@ -46,10 +49,45 @@ class CommandHandler(Resource):
|
||||||
return '{"message": "OK"}'
|
return '{"message": "OK"}'
|
||||||
|
|
||||||
|
|
||||||
class Handler(object):
|
class HttpServer(object):
|
||||||
|
|
||||||
def listen(self, port):
|
implements(IServer)
|
||||||
return listener.listenTCP(port, Site(RootResource()))
|
|
||||||
|
|
||||||
def send(self, clientId, data):
|
def __init__(self, agent):
|
||||||
return defer.Deferred()
|
self.agent = agent
|
||||||
|
self.port = agent.config.talk.server.http.port
|
||||||
|
self.subscribers = {}
|
||||||
|
|
||||||
|
def setup(self):
|
||||||
|
print 'Setting up HTTP handler for port %i.' % self.port
|
||||||
|
listener.listenTCP(self.port, Site(RootResource()))
|
||||||
|
|
||||||
|
def subscribe(self, subscriber, aspect):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def unsubscribe(self, subscriber, aspect):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def send(self, client, data, interaction=None):
|
||||||
|
return defer.Deferred() # Interaction
|
||||||
|
|
||||||
|
servers.register(HttpServer, Master, name='http')
|
||||||
|
|
||||||
|
|
||||||
|
class HttpClient(object):
|
||||||
|
|
||||||
|
implements(IClient)
|
||||||
|
|
||||||
|
def __init__(self, agent):
|
||||||
|
self.agent = agent
|
||||||
|
|
||||||
|
def logon(self, subscriber, url):
|
||||||
|
return defer.Deferred() # Session
|
||||||
|
|
||||||
|
def logoff(self, session):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def send(self, session, data, interaction=None):
|
||||||
|
return defer.Deferred() # Interaction
|
||||||
|
|
||||||
|
clients.register(HttpClient, Master, name='http')
|
||||||
|
|
115
agent/talk/interfaces.py
Normal file
115
agent/talk/interfaces.py
Normal file
|
@ -0,0 +1,115 @@
|
||||||
|
#
|
||||||
|
# Copyright (c) 2008 Helmut Merz helmutm@cy55.de
|
||||||
|
#
|
||||||
|
# This program is free software; you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU General Public License as published by
|
||||||
|
# the Free Software Foundation; either version 2 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
#
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU General Public License for more details.
|
||||||
|
#
|
||||||
|
# You should have received a copy of the GNU General Public License
|
||||||
|
# along with this program; if not, write to the Free Software
|
||||||
|
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||||
|
#
|
||||||
|
|
||||||
|
"""
|
||||||
|
Interfaces for handling asynchronous communication tasks.
|
||||||
|
|
||||||
|
$Id$
|
||||||
|
"""
|
||||||
|
|
||||||
|
from zope.interface import Interface, Attribute
|
||||||
|
|
||||||
|
from cybertools.util.jeep import Jeep
|
||||||
|
|
||||||
|
|
||||||
|
class IServer(Interface):
|
||||||
|
""" A server waits for connection requests from a client. A connected
|
||||||
|
client may then send data to or receive messages from the server.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def subscribe(subscriber, aspect):
|
||||||
|
""" The subscriber will receive messages via its ``onMesssage`` method.
|
||||||
|
|
||||||
|
The aspect is a dotted string used to select the kind of
|
||||||
|
sessions/remote clients the subscriber wants to receive messages
|
||||||
|
from.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def unsubscribe(subscriber, aspect):
|
||||||
|
""" Stop receiving messages.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def send(session, data, interaction=None):
|
||||||
|
""" Send data to the remote client specified via the session given.
|
||||||
|
|
||||||
|
If interaction is None, create a new one.
|
||||||
|
Return a deferred providing the interaction with its current state.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class IClient(Interface):
|
||||||
|
""" A client initiates a connection (session) to a server and may then
|
||||||
|
sent data to or receive data from the server.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def logon(subscriber, url, credentials=None):
|
||||||
|
""" Connect to a server using the URL given, optionally logging in
|
||||||
|
with the credentials given.
|
||||||
|
|
||||||
|
The subscriber will receive messages via its ``onMesssage`` callback.
|
||||||
|
|
||||||
|
Return a Deferred that will provide an ISession implementation;
|
||||||
|
this may then be used sending data to the server.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def logoff(session):
|
||||||
|
""" Close the connection for the session given.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def send(session, data, interaction=None):
|
||||||
|
""" Send data to the server specified via the session given.
|
||||||
|
|
||||||
|
If interaction is None, create a new one.
|
||||||
|
Return a deferred providing the interaction with its current state;
|
||||||
|
sending an interaction with ``finished`` set to True signifies
|
||||||
|
the last message of an interaction.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
# auxiliary interfaces
|
||||||
|
|
||||||
|
class ISubscriber(Interface):
|
||||||
|
""" May receive message notifications.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def onMesssage(interaction, data):
|
||||||
|
""" Callback method for message notifications.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class ISession(Interface):
|
||||||
|
""" Represents the connection to a server within a client or
|
||||||
|
a remote client connection within a server.
|
||||||
|
"""
|
||||||
|
|
||||||
|
issuer = Attribute("""The issuer of the session, i.e. the server or
|
||||||
|
client object, respectively.""")
|
||||||
|
|
||||||
|
state = Attribute("""A string specifying the current state of the session:
|
||||||
|
'logon': The remote client is trying to connect/log in,
|
||||||
|
data may contain credential information;
|
||||||
|
'logoff': The remote client is closing the connection;
|
||||||
|
'open': The connection is open.""")
|
||||||
|
|
||||||
|
|
||||||
|
class IInteraction(Interface):
|
||||||
|
""" Represents a set of message exchanges belonging together.
|
||||||
|
"""
|
||||||
|
|
||||||
|
session = Attribute("The session the interaction belongs to.")
|
||||||
|
finished = Attribute("The interaction is finished, interaction data may be cleared.")
|
Loading…
Add table
Reference in a new issue