add 'talk' package providing a common communication framework

git-svn-id: svn://svn.cy55.de/Zope3/src/cybertools/trunk@2923 fd906abe-77d9-0310-91a1-e0d9ade77398
This commit is contained in:
helmutm 2008-10-17 20:46:28 +00:00
parent 10d4a15ca4
commit e0503fea5b
12 changed files with 150 additions and 29 deletions

View file

@ -53,17 +53,19 @@ def setup(configInfo=None):
def setupEnvironment(config): def setupEnvironment(config):
# self registration of components:
from cybertools.agent.base import agent, control, job, log, schedule from cybertools.agent.base import agent, control, job, log, schedule
from cybertools.agent.core import agent, control, schedule from cybertools.agent.core import agent, control, schedule
from cybertools.agent.control import cmdline, remote, ajaxclient from cybertools.agent.control import cmdline, remote
from cybertools.agent.crawl import base, filesystem, outlook
from cybertools.agent.transport import remote, loops from cybertools.agent.transport import remote, loops
# API registration:
from cybertools.agent.system.windows import api from cybertools.agent.system.windows import api
api.setup(config) api.setup(config)
from cybertools.agent.system import rpcapi from cybertools.agent.system import http, xmlrpc, sftp
rpcapi.setup(config) http.setup(config)
from cybertools.agent.system import sftpapi xmlrpc.setup(config)
sftpapi.setup(config) sftp.setup(config)
from cybertools.agent.crawl import base, filesystem, outlook
def startReactor(): def startReactor():

31
agent/system/http.py Normal file
View file

@ -0,0 +1,31 @@
#
# Copyright (c) 2008 Helmut Merz helmutm@cy55.de
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""
Configuration-controlled import of HTTP communication functionality.
$Id: rpcapi.py
"""
def setup(config):
global listener, getPage
if config.talk.http == 'testing':
from cybertools.agent.testing.http import listener, getPage
else:
from twisted.internet import reactor as listener
from twisted.web.client import getPage

View file

@ -17,7 +17,7 @@
# #
""" """
Configuration controlled import of sftp functionality Configuration-controlled import of sftp functionality.
$Id: rpcapi.py $Id: rpcapi.py
""" """

View file

@ -25,6 +25,6 @@ $Id: rpcapi.py
def setup(config): def setup(config):
global xmlrpc global xmlrpc
if config.transport.remote.server == 'testing': if config.transport.remote.server == 'testing':
from cybertools.agent.testing.rpcserver import RPCServer, xmlrpc from cybertools.agent.testing.rpcserver import xmlrpc
else: else:
from twisted.web import xmlrpc from twisted.web import xmlrpc

23
agent/talk/README.txt Normal file
View file

@ -0,0 +1,23 @@
================================================
Agents for Job Execution and Communication Tasks
================================================
($Id$)
>>> config = '''
... controller(names=['core.sample'])
... scheduler(name='core')
... logger(name='default', standard=30)
... talk.http = 'testing'
... '''
>>> from cybertools.agent.main import setup
>>> master = setup(config)
Starting agent application...
Using controllers core.sample.
Communication Handling
======================
>>> from cybertools.agent.talk.http import Handler

4
agent/talk/__init__.py Normal file
View file

@ -0,0 +1,4 @@
"""
$Id$
"""

55
agent/talk/http.py Normal file
View file

@ -0,0 +1,55 @@
#
# Copyright (c) 2008 Helmut Merz helmutm@cy55.de
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""
Handling asynchronous and possibly asymmetric communication tasks.
$Id$
"""
from twisted.internet import defer
from twisted.web.resource import Resource
from twisted.web.server import Site
from zope.interface import implements
from cybertools.agent.base.agent import Master
from cybertools.agent.system.http import listener
class RootResource(Resource):
def getChild(self, path, request):
return CommandHandler(path)
class CommandHandler(Resource):
def __init__(self, path):
self.command = path
def render(self, request):
return '{"message": "OK"}'
class Handler(object):
def listen(self, port):
return listener.listenTCP(port, Site(RootResource()))
def send(self, clientId, data):
return defer.Deferred()

View file

@ -17,24 +17,28 @@
# #
""" """
Controller that receives and responds to requests from a browser (AJAX) client; Fake testing objects/functions for HTTP communication.
in parallel it sends informations to the client by responding to
polling requests from the client.
$Id$ $Id$
""" """
from zope.interface import implements from twisted.internet.defer import Deferred, succeed
from cybertools.agent.base.agent import Master
from cybertools.agent.core.control import SampleController
from cybertools.agent.components import controllers
class ClientController(SampleController): class Listener(object):
def setup(self): site = port = None
super(ClientController, self).setup()
def listenTCP(self, port, site):
self.port = port
self.site = site
self.resource = site.resource
deferred = self.deferred = Deferred()
return deferred
controllers.register(ClientController, Master, name='ajaxclient') listener = Listener()
def getPage(url, contextFactory=None, method='GET', postdata=None, **kwargs):
return succeed('{"message": "OK"}')

View file

@ -42,6 +42,7 @@ def test_suite():
testSuite = unittest.TestSuite(( testSuite = unittest.TestSuite((
unittest.makeSuite(Test), unittest.makeSuite(Test),
DocFileSuite('README.txt', optionflags=flags), DocFileSuite('README.txt', optionflags=flags),
DocFileSuite('talk/README.txt', optionflags=flags),
DocFileSuite('crawl/README.txt', optionflags=flags), DocFileSuite('crawl/README.txt', optionflags=flags),
DocFileSuite('crawl/filesystem.txt', optionflags=flags), DocFileSuite('crawl/filesystem.txt', optionflags=flags),
DocFileSuite('crawl/outlook.txt', optionflags=flags), DocFileSuite('crawl/outlook.txt', optionflags=flags),

View file

@ -28,6 +28,7 @@ from twisted.internet import defer, protocol, reactor
CHUNKSIZE = 4096 CHUNKSIZE = 4096
class FileTransfer(protocol.ClientFactory): class FileTransfer(protocol.ClientFactory):
""" Transfers files to a remote SCP/SFTP server. """ Transfers files to a remote SCP/SFTP server.
""" """

View file

@ -28,8 +28,8 @@ from twisted.internet import defer
from zope.interface import implements from zope.interface import implements
import os import os
from cybertools.agent.system import rpcapi from cybertools.agent.system import xmlrpc
from cybertools.agent.system import sftpapi from cybertools.agent.system import sftp
from cybertools.agent.base.agent import Master from cybertools.agent.base.agent import Master
from cybertools.agent.core.agent import QueueableAgent from cybertools.agent.core.agent import QueueableAgent
from cybertools.agent.interfaces import ITransporter from cybertools.agent.interfaces import ITransporter
@ -51,11 +51,11 @@ class Transporter(QueueableAgent):
super(Transporter, self).__init__(master) super(Transporter, self).__init__(master)
config = master.config config = master.config
serverURL = config.transport.remote.url serverURL = config.transport.remote.url
self.server = rpcapi.xmlrpc.Proxy(serverURL) self.server = xmlrpc.xmlrpc.Proxy(serverURL)
userName = config.transport.remote.ftp.user userName = config.transport.remote.ftp.user
password = config.transport.remote.ftp.password password = config.transport.remote.ftp.password
host = config.transport.remote.ftp.url host = config.transport.remote.ftp.url
self.ftpServer = sftpapi.FileTransfer(host, self.port, userName, password) self.ftpServer = sftp.FileTransfer(host, self.port, userName, password)
def process(self): def process(self):
return self.transfer(self.params['resource']) return self.transfer(self.params['resource'])

View file

@ -19,13 +19,13 @@ Agents for Job Execution and Communication Tasks
Transporter Transporter
============== ===========
The agent uses Twisted's cooperative multitasking model. The agent uses Twisted's cooperative multitasking model.
The Transporter is used to contact an xmlrpc Server and transmit the metadata to the other The Transporter is used to contact an xmlrpc Server and transmit the metadata
loops system. The Transporter is derived from Queueable agent, because also here applies that only one to the other loops system. The Transporter is derived from Queueable agent
item at a time is transmitted. to ensure that only one item at a time is transmitted.
Returns a deferred that must be supplied with a callback method (and in Returns a deferred that must be supplied with a callback method (and in
most cases also an errback method). most cases also an errback method).