initial import of 'plugin' package

git-svn-id: svn://svn.cy55.de/Zope3/src/cybertools/trunk@3593 fd906abe-77d9-0310-91a1-e0d9ade77398
This commit is contained in:
helmutm 2009-10-20 17:12:22 +00:00
parent cc6db5c7da
commit ebd14c67af
6 changed files with 275 additions and 0 deletions

105
plugin/README.txt Normal file
View file

@ -0,0 +1,105 @@
=======================
Dynamic Plug-in Modules
=======================
($Id$)
>>> import os
>>> basePath = os.path.join(os.path.dirname(__file__), 'testing')
Let's first create a module with a function we'll call later.
>>> mod1Path = os.path.join(basePath, 'mod1.py')
>>> src = '''
... from cybertools.plugin.base import register
...
... @register()
... def show():
... print 'mod1.show() executed'
... '''
>>> f = open(mod1Path, 'w')
>>> f.write(src)
>>> f.close()
We could import this module now but in order to be able to automatically
reload it later (and be able to look it up in the plug-in module registry)
we use a loader module.
>>> loadPath = os.path.join(basePath, 'load.py')
>>> src = '''
... from cybertools.plugin.manage import loadModules
...
... from cybertools.plugin.testing import mod1
... loadModules(mod1)
...
... '''
>>> f = open(loadPath, 'w')
>>> f.write(src)
>>> f.close()
Now we first import the load module, then import the test module and call
the function in it.
>>> from cybertools.plugin.testing import load
>>> from cybertools.plugin.testing import mod1
>>> mod1.show()
mod1.show() executed
We now append additional code to mod1 and see if it is reloaded automatically;
in order to create a sufficient time difference we change the stored setting.
We also have to remove the .pyc file, otherwise Python will refuse to
recompile the source file because the modification time is not changed
during the run of the test script.
>>> from cybertools.plugin.manage import modules
>>> modules['cybertools.plugin.testing.mod1'].timeStamp -= 2
>>> os.remove(os.path.join(basePath, 'mod1.pyc'))
>>> src = ''' print 'now changed...'
...
... '''
>>> f = open(mod1Path, 'a')
>>> f.write(src)
>>> f.close()
>>> mod1.show()
mod1.show() executed
now changed...
We now append another function to the source file.
>>> src = '''
... @register()
... def another():
... print 'executing another function.'
...
... '''
>>> f = open(mod1Path, 'a')
>>> f.write(src)
>>> f.close()
>>> modules['cybertools.plugin.testing.mod1'].timeStamp -= 2
>>> os.remove(os.path.join(basePath, 'mod1.pyc'))
When we now try to call the new function, the module will not be reloaded
automatically.
>>> mod1.another()
Traceback (most recent call last):
...
AttributeError: 'module' object has no attribute 'another'
But just reloading the load module will also update the mod1 application module.
>>> reload(load)
<module 'cybertools.plugin.testing.load' ...>
>>> mod1.another()
executing another function.
Fin de partie
=============
>>> for fn in ('mod1', 'load'):
... os.remove(os.path.join(basePath, fn) + '.py')
... os.remove(os.path.join(basePath, fn) + '.pyc')

3
plugin/__init__.py Normal file
View file

@ -0,0 +1,3 @@
"""
$Id$
"""

49
plugin/base.py Normal file
View file

@ -0,0 +1,49 @@
#
# Copyright (c) 2009 Helmut Merz helmutm@cy55.de
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""
Basic plug-in functionality.
$Id$
"""
from cybertools.plugin.manage import checkReload, registerFunction
def register(*ifc):
def _register(fct):
def _fct(*args, **kw):
fctNew = checkReloadFunction(fct)
if fctNew is None:
raise ValueError('Function no longer present: %s.%s' %
fct.__module__, fct.__name__)
return fctNew(*args, **kw)
registerFunction(_fct, fct, ifc)
return _fct
return _register
def checkReloadFunction(f):
m = checkReload(f.__module__)
if m:
return getattr(m, f.__name__, None)
return f

87
plugin/manage.py Normal file
View file

@ -0,0 +1,87 @@
#
# Copyright (c) 2009 Helmut Merz helmutm@cy55.de
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""
Plug-in management.
$Id$
"""
import os, sys, time
from zope import component
class PluginModule(object):
def __init__(self, module):
self.name = module.__name__
self.module = module
self.timeStamp = time.time()
self.functions = dict()
self.interfaces = dict()
def registerFunction(self, f, name, ifc):
self.functions[name] = f
self.interfaces[name] = ifc
modules = dict()
def registerModule(m):
pm = PluginModule(m)
modules[m.__name__] = pm
return pm
def registerFunction(wrapped, base, ifc):
m = sys.modules[base.__module__]
pm = modules.get(m.__name__)
if pm:
if base.__name__ in pm.functions:
ifcOld = pm.interfaces[base.__name__]
if ifcOld:
gsm = component.getGlobalSiteManager()
gsm.unregisterHandler(pm.functions[base.__name__], ifcOld)
if pm is None or pm.module != m:
pm = registerModule(m)
pm.registerFunction(wrapped, base.__name__, ifc)
if ifc:
component.provideHandler(wrapped, ifc)
# automatic reloading
def checkReload(m):
if isinstance(m, str):
m = sys.modules[m]
fpath, ext = os.path.splitext(os.path.abspath(m.__file__))
src = fpath + '.py'
pm = modules[m.__name__]
mtime = pm.timeStamp
if os.path.getmtime(src) > mtime:
m = reload(m)
pm.timeStamp = time.time()
return m
return False
def loadModules(*mods):
for m in mods:
checkReload(m)

View file

@ -0,0 +1,3 @@
"""
$Id$
"""

28
plugin/tests.py Executable file
View file

@ -0,0 +1,28 @@
#! /usr/bin/python
"""
Tests for the 'cybertools.plugin' package.
$Id$
"""
import unittest, doctest
from zope.testing.doctestunit import DocFileSuite
class Test(unittest.TestCase):
"Basic tests for the plugin package."
def testBasicStuff(self):
pass
def test_suite():
flags = doctest.NORMALIZE_WHITESPACE | doctest.ELLIPSIS
return unittest.TestSuite((
unittest.makeSuite(Test),
DocFileSuite('README.txt', optionflags=flags),
))
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')