remove old psu module; + minor Py3 fixes

This commit is contained in:
Helmut Merz 2025-01-11 11:43:30 +01:00
parent 678ed53ab5
commit ec88df3405
3 changed files with 2 additions and 258 deletions

View file

@ -68,6 +68,6 @@ class PyWriter(object):
def toStr(value): def toStr(value):
if isinstance(value, unicode): if isinstance(value, str):
return value.encode('UTF-8') return value.encode('UTF-8')
return str(value) return str(value)

View file

@ -1,256 +0,0 @@
# psu - paster shell utilities
# use this from (e.g.):
#
# bin/paster shell deploy.ini
#
# then:
#
# from cco.common import psu
# from custom.config import myproject as config
# psu.setup(root, 'path/to/loopsRoot', config)
# obj = psu.byuid('578457950')
#
import os
from transaction import commit, abort
from zope.cachedescriptors.property import Lazy
from zope.catalog.interfaces import ICatalog
from zope import component
from zope.component.hooks import setSite
from zope.container.contained import ObjectAddedEvent, ObjectRemovedEvent
from zope.copypastemove.interfaces import IContainerItemRenamer
from zope.event import notify
from zope.exceptions.interfaces import DuplicationError
from zope.lifecycleevent import ObjectCreatedEvent, ObjectModifiedEvent
from zope.publisher.browser import TestRequest as BaseTestRequest
from zope.pluggableauth.factories import Principal
from zope.security.management import getInteraction, newInteraction, endInteraction
from zope.interface import Interface
from cybertools.util.date import date2TimeStamp, strptime
from cybertools.util.jeep import Jeep
from loops.common import adapted, baseObject
from loops.util import getObjectForUid, getUidForObject, getCatalog, reindex
#from xxx import config
os.environ['NLS_LANG'] = 'German_Germany.UTF8'
sc = Jeep() # shortcuts
def setup(root, loopsRootPath=[], config=None):
global sm, smdefault, intids, pau, loopsRoot, sc
setSite(root)
sm = component.getSiteManager(root)
smdefault = sm['default']
intids = smdefault['IntIds']
pau = smdefault['PluggableAuthentication']
user = getattr(config, 'shell_user', 'zope.manager')
password = (getattr(config, 'shell_pw', None) or
raw_input('Enter manager password: '))
login(Principal(user, password, u'Manager'))
loopsRoot = root
for name in loopsRootPath.split('/'):
if name:
loopsRoot = loopsRoot[name]
sc.concepts = loopsRoot['concepts']
for name in ('standard', 'hasType',):
sc[name] = sc.concepts[name]
def byuid(uid):
return getObjectForUid(uid)
def uid(obj):
return getUidForObject(obj)
def notifyModification(obj):
obj = baseObject(obj)
notify(ObjectModifiedEvent(obj))
def save(obj):
notifyModification(obj)
commit()
def notifyAdded(obj):
obj = baseObject(obj)
notify(ObjectAddedEvent(obj))
def notifyRemoved(obj):
obj = baseObject(obj)
notify(ObjectRemovedEvent(obj))
def delete(container, name, docommit=True):
obj = container.get(name)
if obj is None:
print('*** Object', name, 'not found!')
return
notifyRemoved(obj)
del container[name]
if docommit:
commit()
def rename(container, old, new, docommit=True):
obj = container.get(old)
if obj is None:
print('*** Object', old, 'not found!')
return
renamer = IContainerItemRenamer(container)
if new != old:
try:
renamer.renameItem(old, new)
except DuplicationError:
print('*** Object', new, 'already exists!')
# container[new] = obj
# notifyAdded(obj)
notifyModification(obj)
if docommit:
commit()
def move(source, target, name):
obj = source.get(name)
if obj is None:
print('*** Object', name, 'not found!')
return
#notifyRemoved(obj)
#del source[name]
target[name] = obj
#notifyAdded(obj)
notifyModification(obj)
commit()
def get(container, obj):
if isinstance(obj, basestring):
name = obj
obj = container.get(name)
if obj is None:
print('*** Object', name, 'not found!')
return None
return adapted(obj)
# startup, loop, finish...
def startup(msg, **kw):
print('***', msg)
step = kw.pop('step', 10)
return Jeep(count=0, step=step, message=msg, **kw)
def update(fct, obj, info):
info.count += 1
start = info.get('start')
if start and info.count < start:
return
if info.count % info.step == 0:
try:
objInfo = obj.__name__
except:
try:
objInfo = obj.context.__name__
except:
objInfo = obj
print('*** Processing object # %i: %s' % (info.count, objInfo))
if info.get('updated'):
print('*** updated: %i.' % info.updated)
if info.get('errors'):
print('*** errors: %i.' % info.error)
commit()
return fct(obj, info)
def finish(info):
print('*** count: %i.' % info.count)
if info.get('updated'):
print('*** updated: %i.' % info.updated)
if info.get('errors'):
print('*** errors: %i.' % info.error)
commit()
def stop_condition(info):
stop = info.get('stop')
return stop is not None and info.count > stop
def loop(message, objects, fct, **kw):
def _fct(obj, info):
params = info.get('fctparams', {})
fct(obj, info, **params)
info = startup(message, **kw)
for obj in objects:
update(_fct, obj, info)
if stop_condition(info):
break
finish(info)
# indexing
def reindex_objects(objs, **kw):
catalog = util.getCatalog(objs[0])
def do_reindex(obj, info):
util.reindex(obj, catalog)
loop('reindex %s objects' % len(objs), objs, do_reindex, **kw)
# some common repair tasks
def get_type_instances(name):
return sc.concepts[name].getChildren([sc.hasType])
def notify_modification(c, info):
notifyModification(c)
def update_type_instances(**kw):
objs = get_type_instances(kw.pop('type'))
loop('Notify Type Instances', objs, notify_modification, **kw)
def update_type_instances_title_from_adapted(**kw):
def update_type_title_from_adapted(c, info):
c.title = adapted(c).title
notifyModification(c)
objs = get_type_instances(kw.pop('type'))
loop('Update Type Instances Title', objs, update_type_title_from_adapted, **kw)
def removeRecords(container, **kw):
"""Remove records from container selected by the criteria given."""
def remove(obj, info):
notifyRemoved(obj)
del container[obj.__name__]
info = startup('Remove records', **kw)
date = kw.pop('date', None)
if date:
kw['timeFromTo'] = (
date2TimeStamp(strptime(date + ' 00:00:00')),
date2TimeStamp(strptime(date + ' 23:59:59')))
loop('Remove records', container.query(**kw), remove, **kw)
# helper functions and classes
def login(principal):
endInteraction()
newInteraction(Participation(principal))
class TestRequest(BaseTestRequest):
basePrincipal = BaseTestRequest.principal
@Lazy
def principal(self):
interaction = getInteraction()
if interaction is not None:
parts = interaction.participations
if parts:
prin = parts[0].principal
if prin is not None:
return prin
return self.basePrincipal
class Participation(object):
""" Dummy Participation class for testing.
"""
interaction = None
def __init__(self, principal):
self.principal = principal

View file

@ -395,7 +395,7 @@ class ExternalFileAdapter(FileAdapter):
def getData(self): def getData(self):
if self.storageName in ('unknown', None): # object not set up yet if self.storageName in ('unknown', None): # object not set up yet
return '' return b''
return self.storage.getData(self.externalAddress, params=self.storageParams) return self.storage.getData(self.externalAddress, params=self.storageParams)
data = property(getData, setData) data = property(getData, setData)