merge branch master, + fix tests

This commit is contained in:
Helmut Merz 2024-03-17 09:15:55 +01:00
commit f100a18f22
10 changed files with 414 additions and 70 deletions

2
.gitignore vendored
View file

@ -7,6 +7,8 @@
*.sublime-project
*.sublime-workspace
*.ropeproject
.env
.pytest.ini
*#*#
*.#*
__pycache__

40
scopes/interfaces.py Normal file
View file

@ -0,0 +1,40 @@
# scopes.interfaces
from zope.interface import Interface
class ITraversable(Interface):
def get(key, default=None):
"""Return the item addressed by `key`; return `default` if not found."""
class IContainer(ITraversable):
def values():
"""Return a sequence of child objects."""
def __getitem__(key):
"""Return the item addressed by `key`; raise KeyError if not found."""
def __setitem__(key, value):
"""Store the `value` under the `key`.
May modify `value` so that the attributes referencing this object
and the value object (e.g. `parent` and `name`) are stored correctly."""
class IReference(Interface):
def getTarget():
"""Return item referenced by this object."""
def setTarget(target):
"""Store reference to target item."""
class IView(Interface):
def __call__():
"""Render the view data as HTML or JSON."""

View file

@ -2,43 +2,49 @@
from zope.publisher.base import DefaultPublication
from zope.publisher.browser import BrowserRequest
from zope.publisher.interfaces import NotFound
from zope.publisher.publish import publish
from zope.traversing.publicationtraverse import PublicationTraverser
from scopes.interfaces import ITraversable, IView
from scopes.server.browser import getView
import scopes.storage.concept # register container classes
from scopes.storage.folder import Root
def demo_app(environ, start_response):
print(f'*** environ {environ}.')
status = '200 OK'
headers = [("Content-type", "text/plain; charset=utf-8")]
start_response(status, headers)
return ['Hello World'.encode()]
def zope_app_factory(config):
storageFactory = config.StorageFactory(config)
def zope_app(environ, start_response):
storage = storageFactory(config.dbschema)
appRoot = Root(storage)
request = BrowserRequest(environ['wsgi.input'], environ)
request.setPublication(Publication(appRoot))
request = publish(request, True)
response = request.response
start_response(response.getStatusString(), response.getHeaders())
return response.consumeBodyIter()
return zope_app
def zope_app(environ, start_response):
request = BrowserRequest(environ['wsgi.input'], environ)
request.setPublication(DefaultPublication(AppRoot()))
request = publish(request, False)
response = request.response
start_response(response.getStatusString(), response.getHeaders())
return response.consumeBodyIter()
class Publication(DefaultPublication):
def traverseName(self, request, ob, name):
next = getView(request, ob, name)
if next is not None:
return next
if ITraversable.providedBy(ob):
next = ob.get(name)
if next is None:
raise NotFound(ob, name, request)
return next
class AppRoot:
"""Zope Demo AppRoot"""
def __call__(self):
"""calling AppRoot"""
return 'At root'
def __getitem__(self, key):
def child():
"""get child"""
print(f'--- getitem {key}')
return 'getitem'
return child
def hello(self):
"""method hello"""
return 'Hello AppRoot'
def getDefaultTraversal(self, request, ob):
if IView.providedBy(ob):
return ob, ()
return ob, ('index.html',)
def handleException(self, ob, request, exc_info, retry_allowed=True):
if exc_info[0] != NotFound:
raise
request.response.reset()
request.response.handleException(exc_info)

63
scopes/server/browser.py Normal file
View file

@ -0,0 +1,63 @@
# scopes.server.browser
import json
from zope.interface import implementer
from scopes.interfaces import IContainer, IReference, IView
views = {} # registry for all views: {name: {prefix: viewClass, ...}, ...}
def register(name, *contextClasses):
"""Use as decorator: `@register(name, class, ...).
class `None` means default view for all classes."""
def doRegister(factory):
implementer(IView)(factory)
nameEntry = views.setdefault(name, {})
for cl in contextClasses:
nameEntry[cl.prefix] = factory
else:
nameEntry[''] = factory
return factory
return doRegister
def getView(request, ob, name):
nameEntry = views.get(name)
if nameEntry is None:
return None
factory = nameEntry.get(ob.prefix)
if factory is None:
factory = nameEntry.get('')
if factory is None:
return None
return factory(ob, request)
@register('index.html')
@register('index.json')
class DefaultView:
def __init__(self, context, request):
self.context = context
self.request = request
def __call__(self):
result = self.prepareResult()
return self.render(result)
def prepareResult(self):
ob = self.context
result = ob.asDict()
if IContainer.providedBy(ob):
result['items'] = [v.asDict() for v in ob.values()]
if IReference.providedBy(ob):
target = ob.getTarget()
if target:
result['target'] = target.asDict()
if IContainer.providedBy(target):
result['target']['items'] = [v.asDict() for v in target.values()]
return result
def render(self, result):
self.request.response.setHeader('Content-type', 'application/json; charset=utf-8')
return json.dumps(result).encode('UTF-8')

View file

@ -33,13 +33,17 @@ class Storage(object):
def add(self, container):
self.containers[container.itemFactory.prefix] = container
def getItem(self, uid):
prefix, id = uid.split('-')
id = int(id)
def getContainer(self, itemClass):
prefix = itemClass.prefix
container = self.containers.get(prefix)
if container is None:
container = self.create(registry[prefix])
return container.get(id)
return self.create(registry[prefix])
return container
def getItem(self, uid):
prefix, id = uid.split('-')
cls = registry[prefix].itemFactory
return self.getContainer(cls).get(int(id))
def getExistingTable(self, tableName):
metadata = self.metadata

View file

@ -1,12 +1,145 @@
# scopes.storage.concept
"""Abstract base classes for concept map application classes."""
"""Core classes for concept map structure."""
from scopes.storage.common import registerContainerClass
from zope.interface import implementer
from scopes.interfaces import IContainer
from scopes.storage.common import registerContainerClass, registry
from scopes.storage.tracking import Container, Track
defaultPredicate = 'standard'
class Concept(Track):
headFields = ['name']
def parents(self, predicate=None):
return (r.getFirst() for r in self.parentRels(predicate))
def parentRels(self, predicate=None):
return self.container.queryRels(second=self, predicate=predicate)
def children(self, predicate=None):
return (r.getSecond() for r in self.childRels(predicate))
def childRels(self, predicate=None):
return self.container.queryRels(first=self, predicate=predicate)
def values(self):
return self.children(defaultPredicate)
def addChild(self, child, predicate=defaultPredicate):
rels = self.container.storage.getContainer(Triple)
rels.save(Triple(self.uid, child.uid, predicate))
class Concepts(Container):
insertOnChange = False
indexes = None
def queryRels(self, **crit):
#pred = crit.get(predicate)
#if pred is not None and isinstance(pred, ('string', 'bytes')):
# crit['predicate'] = self.storage.getContainer(Predicate).queryLast(name=pred)
for k, v in crit.items():
if isinstance(v, Track):
crit[k] = v.uid
rels = self.storage.getContainer(Triple)
return rels.query(**crit)
# implementation of relationships between concepts using RDF-like triples
class Predicate(Concept):
prefix = 'pred'
@registerContainerClass
class Predicates(Concepts):
itemFactory = Predicate
tableName = 'preds'
def storePredicate(storage, name):
preds = storage.getContainer(Predicate)
preds.save(Predicate(name))
class Triple(Track):
headFields = ['first', 'second', 'predicate']
prefix = 'rel'
def getFirst(self):
return self.container.storage.getItem(self.first)
def getSecond(self):
return self.container.storage.getItem(self.second)
def getPredicate(self):
return self.container.storage.getItem(self.second)
@registerContainerClass
class Rels(Container):
itemFactory = Triple
indexes = [('first', 'predicate', 'second'),
('first', 'second'), ('predicate', 'second')]
tableName = 'rels'
insertOnChange = False
# types stuff
@implementer(IContainer)
class Type(Concept):
headFields = ['name', 'tprefix']
prefix = 'type'
@property
def typeClass(self):
return registry[self.tprefix].itemFactory
def values(self):
cont = self.container.storage.getContainer(self.typeClass)
return cont.query()
def get(self, key, default=None):
cont = self.container.storage.getContainer(self.typeClass)
return cont.queryLast(name=key) or default
def __getitem__(self, key):
value = self.get(key)
if value is None:
raise KeyError(key)
return value
def __setitem__(self, key, value):
cont = self.container.storage.getContainer(self.typeClass)
value.name = key
cont.save(value)
@registerContainerClass
class Types(Concepts):
itemFactory = Type
indexes = [('name',), ('tprefix',)]
tableName = 'types'
def storeType(storage, cls, name):
types = storage.getContainer(Type)
types.save(Type(name, cls.prefix))
def setupCoreTypes(storage):
for c in registry.values():
cls = c.itemFactory
storeType(storage, cls, cls.__name__.lower())

View file

@ -1,17 +1,26 @@
# scopes.storage.folder
from zope.interface import implementer
from scopes.interfaces import IContainer, IReference
from scopes.storage.common import registerContainerClass
from scopes.storage.tracking import Container, Track
@implementer(IContainer, IReference)
class Folder(Track):
headFields = ['parent', 'name', 'ref']
prefix = 'fldr'
def values(self):
return self.container.query(parent=self.rid)
def items(self):
return ((v.name, v) for v in self.values())
def keys(self):
for f in self.container.query(parent=self.rid):
yield f.name
return (v.name for v in self.values())
def get(self, key, default=None):
value = self.container.queryLast(parent=self.rid, name=key)
@ -20,7 +29,7 @@ class Folder(Track):
return value
def __getitem__(self, key):
value = self.container.queryLast(parent=self.rid, name=key)
value = self.get(key)
if value is None:
raise KeyError(key)
return value
@ -30,6 +39,19 @@ class Folder(Track):
value.set('name', key)
self.container.save(value)
def getTarget(self):
if self.ref == '':
return None
return self.container.storage.getItem(self.ref)
def setTarget(self, target):
self.set('ref', target.uid)
self.container.update(self)
def __str__(self):
return '%s: %s; keys: %s' % (self.__class__.__name__,
self.name, list(self.keys()))
class Root(Folder):
"""A dummy (virtual) root folder for creating real folders

18
scopes/storage/topic.py Normal file
View file

@ -0,0 +1,18 @@
# scopes.storage.topic
from scopes.storage.common import registerContainerClass
from scopes.storage import concept
class Topic(concept.Concept):
prefix = 'tpc'
@registerContainerClass
class Topics(concept.Concepts):
itemFactory = Topic
tableName = 'topics'

View file

@ -22,6 +22,9 @@ class Track(object):
def __init__(self, *keys, **kw):
self.head = {}
for k, v in kw.items():
if k in self.headFields:
self.head[k] = kw.pop(k)
for ix, k in enumerate(keys):
self.head[self.headFields[ix]] = k
for k in self.headFields:
@ -62,6 +65,13 @@ class Track(object):
return ''
return str(self.trackId)
def __repr__(self):
return '%s: %s' % (self.__class__.__name__, self.asDict())
def asDict(self):
return dict(uid=self.uid, head=self.head, data=self.data,
timeStamp = str(self.timeStamp)[:19])
@registerContainerClass
class Container(object):
@ -91,8 +101,11 @@ class Container(object):
return tr
def query(self, **crit):
stmt = self.table.select().where(
if crit:
stmt = self.table.select().where(
and_(*self.setupWhere(crit))).order_by(self.table.c.trackid)
else:
stmt = self.table.select().order_by(self.table.c.trackid)
for r in self.session.execute(stmt):
yield self.makeTrack(r)
@ -102,16 +115,23 @@ class Container(object):
return self.makeTrack(self.session.execute(stmt).first())
def save(self, track):
track.container = self
crit = dict((hf, track.head[hf]) for hf in track.headFields)
found = self.queryLast(**crit)
if found is None:
return self.insert(track)
if self.insertOnChange and found.data != track.data:
return self.insert(track)
if found.data != track.data or found.timeStamp != track.timeStamp:
changed = False
if found.data != track.data:
found.update(track.data)
changed = True
if track.timeStamp is not None and found.timeStamp != track.timeStamp:
found.timeStamp = track.timeStamp
changed = True
if changed:
self.update(found)
track.trackId = found.trackId
return found.trackId
def insert(self, track, withTrackId=False):
@ -119,14 +139,15 @@ class Container(object):
values = self.setupValues(track, withTrackId)
stmt = t.insert().values(**values).returning(t.c.trackid)
trackId = self.session.execute(stmt).first()[0]
track.trackId = trackId
self.storage.mark_changed()
return trackId
def update(self, track):
def update(self, track, updateTimeStamp=False):
t = self.table
if updateTimeStamp or track.timeStamp is None:
track.timeStamp = datetime.now()
values = self.setupValues(track)
if track.timeStamp is None:
values['timestamp'] = datetime.now()
stmt = t.update().values(**values).where(t.c.trackid == track.trackId)
n = self.session.execute(stmt).rowcount
if n > 0:
@ -158,7 +179,7 @@ class Container(object):
*r[1:-2], trackId=r[0],timeStamp=r[-2], data=r[-1], container=self)
def setupWhere(self, crit):
return [self.table.c[k.lower()] == v for k, v in crit.items()]
return [self.table.c[k.lower()] == v for k, v in crit.items() if v is not None]
def setupValues(self, track, withTrackId=False):
values = {}

View file

@ -1,9 +1,10 @@
# scopes/tests.py
"""The real test implementations"""
from datetime import datetime
import unittest
from scopes.storage import folder, tracking
from scopes.storage import concept, folder, topic, tracking
import config
config.dbengine = 'postgresql'
@ -22,33 +23,33 @@ class Test(unittest.TestCase):
def test_001_tracking(self):
storage.dropTable('tracks')
tracks = storage.create(tracking.Container)
tr01 = tracking.Track('t01', 'john')
tr01.update(dict(activity='testing'))
self.assertEqual(tr01.head, {'taskId': 't01', 'userName': 'john'})
self.assertEqual(tr01.taskId, 't01')
self.assertEqual(tr01.userName, 'john')
self.assertTrue(tracks.getTable() is not None)
trid01 = tracks.save(tr01)
self.assertTrue(trid01 > 0)
#tr01a = tracks.get(trid01)
tr01a = tracks['%07i' % trid01]
self.assertEqual(tr01a.head, tr01.head)
self.assertEqual(tr01a.trackId, trid01)
self.assertEqual(tr01a.data.get('activity'), 'testing')
tr01a.update(dict(text='Set up unit tests.'))
tr01a.timeStamp = None
self.assertTrue(tracks.save(tr01a) > 0)
tr01b = tracks.queryLast(taskId='t01')
self.assertEqual(tr01b.head, tr01.head)
self.assertNotEqual(tr01b.trackId, trid01)
self.assertEqual(tr01b.data.get('activity'), 'testing')
tr02 = tracking.Track('t02', 'jim', trackId=31, timeStamp=datetime(2023, 11, 30),
data=dict(activity='concept'))
trid02 = tracks.upsert(tr02)
@ -58,16 +59,16 @@ class Test(unittest.TestCase):
trid021 = tracks.upsert(tr02)
self.assertEqual(trid021, trid01)
self.assertEqual(tr02.uid, 'rec-' + str(trid01))
tr03 = storage.getItem('rec-31')
self.assertEqual(tr03.trackId, 31)
n = tracks.remove(31)
self.assertEqual(n, 1)
self.assertEqual(tracks.get(31), None)
storage.commit()
def test_002_folder(self):
storage.dropTable('folders')
root = folder.Root(storage)
@ -80,14 +81,48 @@ class Test(unittest.TestCase):
ch1 = top['child1']
self.assertEqual(ch1.parent, top.rid)
self.assertEqual(list(top.keys()), ['child1'])
storage.commit()
def suite():
return unittest.TestSuite((
unittest.TestLoader().loadTestsFromTestCase(Test),
))
if __name__ == '__main__':
unittest.main(defaultTest='suite')
def test_003_type(self):
storage.dropTable('types')
concept.setupCoreTypes(storage)
types = storage.getContainer(concept.Type)
tps = list(types.query())
self.assertEqual(len(tps), 6)
self.assertEqual(tps[0].name, 'topic')
tfolder = types.queryLast(name='folder')
fldrs = list(tfolder.values())
self.assertEqual(len(fldrs), 2)
self.assertEqual(fldrs[0].name, 'top')
storage.commit()
def test_004_topic(self):
storage.dropTable('topics')
topics = storage.getContainer(topic.Topic)
types = storage.getContainer(concept.Type)
concept.storePredicate(storage, concept.defaultPredicate)
root = folder.Root(storage)
root['top']['topics'] = ftopics = folder.Folder()
ttopic = types.queryLast(name='topic')
self.assertEqual(ttopic.name, 'topic')
ftopics.setTarget(ttopic)
self.assertEqual(ftopics.ref, 'type-1')
tp_itc = topic.Topic('itc', data=dict(
title='ITC', description='Information and Communication Technology'))
topics.save(tp_itc)
tp_proglang = topic.Topic('prog_lang', data=dict(
title='Programming Languages',
description='Programming Languages'))
topics.save(tp_proglang)
tp_itc.addChild(tp_proglang)
c = list(tp_itc.children())
self.assertEqual(c[0].name, 'prog_lang')
storage.commit()