Compare commits

..

15 commits

33 changed files with 109 additions and 805 deletions

1
.gitignore vendored
View file

@ -2,7 +2,6 @@
*.pyo
*.egg-info
*.project
*.log
*.swp
*.pydevproject
*.sublime-project

View file

@ -1,44 +0,0 @@
# py-scopes/demo/config.py
from dotenv import load_dotenv
import logging
from os import getenv
from scopes.web.app import zope_app_factory
load_dotenv()
log_file = 'log/scopes.log'
log_level = logging.DEBUG
log_format = '%(asctime)s %(levelname)s %(name)s %(message)s'
log_dateformat = '%Y-%m-%dT%H:%M:%S'
logging.basicConfig(filename=log_file, level=log_level,
format=log_format, datefmt=log_dateformat)
server_port = getenv('SERVER_PORT', '8099')
base_url = getenv('BASE_URL', 'https://demo.cy7.de')
app_factory = zope_app_factory
# storage settings
from scopes.storage.db.postgres import StorageFactory
dbengine = 'postgresql+psycopg'
dbname = getenv('DBNAME', 'demo')
dbuser = getenv('DBUSER', 'demo')
dbpassword = getenv('DBPASSWORD', 'secret')
dbschema = getenv('DBSCHEMA', 'demo')
# authentication settings
oidc_provider = getenv('OIDC_PROVIDER', 'https://a1.cy7.de')
oidc_client_id = getenv('OIDC_CLIENT_ID', '311613119816392525')
oidc_params = dict(
op_config_url=oidc_provider + '/.well-known/openid-configuration',
op_uris=None,
op_keys=None,
callback_url=getenv('OIDC_CALLBACK_URL', base_url + '/auth/callback'),
client_id = oidc_client_id,
cookie_name=getenv('OIDC_COOKIE_NAME', 'oidc_' + oidc_client_id),
cookie_domain=getenv('OIDC_COOKIE_DOMAIN', None),
cookie_lifetime=getenv('OIDC_COOKIE_LIFETIME', '86400'),
cookie_crypt=getenv('OIDC_COOKIE_CRYPT', None)
)

View file

@ -1,21 +0,0 @@
# py-scopes/demo/demo_server.py
from scopes.web.auth import oidc
from scopes.storage import topic
import logging
import waitress
def run(app, config):
oidc.startup() # todo: use generic app.startServices()
port = int(config.server_port)
print(f'Serving on port {port}.')
waitress.serve(app, port=port)
if __name__ == '__main__':
import config
app = config.app_factory(config)
run(app, config)
# see zope.app.wsgi.getWSGIApplication

View file

@ -1,2 +0,0 @@
directory for logfiles created by application

View file

@ -1,11 +0,0 @@
# scopes/demo/shell.py
# simple shell for interactive testing / accessing the database (storage)
# use: `python -i shell.py`
import config
from scopes.web.auth import oidc
from scopes.storage.folder import Root
from scopes.storage import topic
storage = config.StorageFactory(config)(config.dbschema)
root = Root(storage)

View file

@ -1,43 +0,0 @@
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"
[project]
name = "py-scopes"
version = "3.0.1"
description = "Implementation of the unknown 'scopes' paradigm in Python"
readme = "README.md"
license = {text = "MIT"}
keywords = ["scopes"]
authors = [{name = "Helmut Merz", email = "helmutm@cy55.de"}]
dependencies = [
"SQLAlchemy",
]
[project.optional-dependencies]
postgres = [
"psycopg[binary]",
"transaction",
"zope.sqlalchemy",
]
app = [
"python-dotenv",
"waitress",
"zope.authentication",
"zope.interface",
"zope.publisher",
"zope.traversing",
]
auth = ["pyjwt[crypto]", "cryptography", "requests"]
test = ["zope.testrunner"]
#test = ["pytest"]
[tool.setuptools]
packages = ["scopes"]
#[tool.pytest.ini_options]
#addopts = "-vv"
#python_files = "test_standard.py" # default: run only `standard` tests
# use .pytest.ini file with `python_files = test_*.py` to run all tests

View file

@ -1,5 +0,0 @@
# runtests.sh
# run all unit / doc tests
zope-testrunner --test-path=. $*

View file

@ -1,14 +1,9 @@
# scopes.interfaces
from zope.interface import Interface, Attribute
from zope.interface import Interface
class IViewable(Interface):
prefix = Attribute('Prefix string for identifying the type (class) of an object')
class ITraversable(IViewable):
class ITraversable(Interface):
def get(key, default=None):
"""Return the item addressed by `key`; return `default` if not found."""

View file

@ -1,16 +0,0 @@
# scopes.organize.mail
from zope.interface import implementer
from zope.sendmail.interfaces import IMailDelivery
"""Utilities for creating and sending emails."""
@implementer(IMailDelivery)
class DummyMailDelivery:
"""For testing purposes: just store mails in maildata.log"""
def send(self, fromaddr, toaddrs, message):
print("DummyMailDelivery")
print(f"fromaddr: {fromaddr}, toaddrs: {toaddrs}")
print(message)

View file

@ -0,0 +1 @@
"""package scopes.server"""

View file

@ -1,31 +1,22 @@
# scopes.web.app
# scopes.server.app
import logging
from zope.i18n.interfaces import IUserPreferredCharsets
from zope.interface import implementer
from zope.publisher.base import DefaultPublication
from zope.publisher.browser import BrowserRequest
from zope.publisher.interfaces import NotFound
from zope.publisher.publish import publish
from scopes.interfaces import ITraversable, IView
from scopes.web.browser import getView
from scopes.server.browser import getView
import scopes.storage.concept # register container classes
from scopes.storage.folder import Root
@implementer(IUserPreferredCharsets)
class Request(BrowserRequest):
def getPreferredCharsets(self):
return ['UTF-8']
def zope_app_factory(config):
storageFactory = config.StorageFactory(config)
def zope_app(environ, start_response):
storage = storageFactory(config.dbschema)
appRoot = Root(storage)
request = Request(environ['wsgi.input'], environ)
request = BrowserRequest(environ['wsgi.input'], environ)
request.setPublication(Publication(appRoot))
request = publish(request, True)
response = request.response
@ -36,12 +27,6 @@ def zope_app_factory(config):
class Publication(DefaultPublication):
def beforeTraversal(self, request):
super(Publication, self).beforeTraversal(request)
from scopes.web.auth.oidc import authentication
prc = authentication.authenticate(request)
request.setPrincipal(prc)
def traverseName(self, request, ob, name):
next = getView(request, ob, name)
if next is not None:

View file

@ -1,25 +1,21 @@
# scopes.web.browser
# scopes.server.browser
import json
import logging
from zope.interface import implementer
from scopes.interfaces import IContainer, IReference, IView
logger = logging.getLogger('web.browser')
views = {} # registry for all views: {name: {prefix: viewClass, ...}, ...}
def register(name, *contextTypes):
"""Use as decorator: `@register(name, class_or_prefix, ...).
No class (or `None` or `''`) means default view for all classes."""
def register(name, *contextClasses):
"""Use as decorator: `@register(name, class, ...).
class `None` means default view for all classes."""
def doRegister(factory):
implementer(IView)(factory)
nameEntry = views.setdefault(name, {})
cts = contextTypes or ['']
for ct in cts:
if not isinstance(ct, str):
ct = ct.prefix
nameEntry[ct] = factory
for cl in contextClasses:
nameEntry[cl.prefix] = factory
else:
nameEntry[''] = factory
return factory
return doRegister
@ -32,7 +28,6 @@ def getView(request, ob, name):
factory = nameEntry.get('')
if factory is None:
return None
logger.debug('getView: %s %s', ob, request['PATH_INFO'])
return factory(ob, request)
@ -59,11 +54,10 @@ class DefaultView:
result['target'] = target.asDict()
if IContainer.providedBy(target):
result['target']['items'] = [v.asDict() for v in target.values()]
prc = self.request.principal
if prc is not None:
result['principal'] = prc.asDict()
return result
def render(self, result):
self.request.response.setHeader('Content-type', 'application/json; charset=utf-8')
return json.dumps(result).encode('UTF-8')

View file

@ -52,8 +52,8 @@ class Storage(object):
return metadata.tables.get((schema and schema + '.' or '') + tableName)
def dropTable(self, tableName):
prefix = self.schema and self.schema + '.' or ''
with self.engine.begin() as conn:
prefix = self.schema and self.schema + '.' or ''
conn.execute(text('drop table if exists %s%s' % (prefix, tableName)))
def resetSequence(self, tableName, colName, v):
@ -63,19 +63,15 @@ class Storage(object):
conn.execute(text(sq))
class StorageFactory:
class StorageFactory(object):
def sessionFactory(self):
return self.engine.connect
@staticmethod
def getEngine(dbtype, dbname, user, pw, **kw):
def getEngine(dbtype, dbname, user, pw, host='localhost', port=5432, **kw):
return create_engine('%s:///%s' % (dbtype, dbname), **kw)
def engineFromConfig(self, config):
return self.getEngine(config.dbengine, config.dbname,
config.dbuser, config.dbpassword)
@staticmethod
def mark_changed(session):
pass
@ -90,7 +86,8 @@ class StorageFactory:
storageClass = Storage
def __init__(self, config, storageClass=None):
self.engine = self.engineFromConfig(config)
self.engine = self.getEngine(config.dbengine, config.dbname,
config.dbuser, config.dbpassword)
self.Session = self.sessionFactory()
if storageClass is not None:
self.storageClass = storageClass

View file

@ -24,12 +24,6 @@ class StorageFactory(StorageFactory):
return create_engine('%s://%s:%s@%s:%s/%s' % (
dbtype, user, pw, host, port, dbname), **kw)
def engineFromConfig(self, config):
return self.getEngine(config.dbengine, config.dbname,
config.dbuser, config.dbpassword,
host=getattr(config, 'dbhost', 'localhost'),
port=getattr(config, 'dbport', 5432))
@staticmethod
def mark_changed(session):
return mark_changed(session)

View file

@ -7,18 +7,6 @@ from scopes.storage.common import registerContainerClass
from scopes.storage.tracking import Container, Track
class DummyFolder(dict):
prefix = 'dummy'
def asDict(self):
return self
def __repr__(self):
return '<%s: %s>' % (self.__class__.__name__,
super(DummyFolder, self).__repr__())
@implementer(IContainer, IReference)
class Folder(Track):
@ -69,8 +57,6 @@ class Root(Folder):
"""A dummy (virtual) root folder for creating real folders
using the Folder API."""
prefix = 'root'
def __init__(self, storage):
cont = storage.create(Folders)
super(Root, self).__init__(container=cont)

View file

@ -1,21 +0,0 @@
# scopes.storage.message
"""Generic messages (or events) to be stored in SQL database."""
from scopes.storage.common import registerContainerClass
from scopes.storage.tracking import Container, Track
class Message(Track):
headFields = ['domain', 'action', 'class', 'item']
prefix = 'msg'
@registerContainerClass
class Messages(Container):
itemFactory = Message
indexes = [('domain', 'action', 'class', 'item'), ('domain', 'class', 'item')]
tableName = 'messages'
insertOnChange = True

View file

@ -20,8 +20,7 @@ class Track(object):
headFields = ['taskId', 'userName']
prefix = 'rec'
def __init__(self, *keys, data=None, timeStamp=None, trackId=None,
container=None, **kw):
def __init__(self, *keys, **kw):
self.head = {}
for k, v in kw.items():
if k in self.headFields:
@ -32,10 +31,10 @@ class Track(object):
if self.head.get(k) is None:
self.head[k] = ''
setattr(self, k, self.head[k])
self.data = data or {}
self.timeStamp = timeStamp
self.trackId = trackId
self.container = container
self.data = kw.get('data') or {}
self.timeStamp = kw.get('timeStamp')
self.trackId = kw.get('trackId')
self.container = kw.get('container')
def set(self, attr, value):
if attr in self.headFields:
@ -67,7 +66,7 @@ class Track(object):
return str(self.trackId)
def __repr__(self):
return '<%s: %s>' % (self.__class__.__name__, self.asDict())
return '%s: %s' % (self.__class__.__name__, self.asDict())
def asDict(self):
return dict(uid=self.uid, head=self.head, data=self.data,

33
scopes/tests.py Normal file
View file

@ -0,0 +1,33 @@
# scopes/tests.py
"""The real test implementations"""
import unittest
from scopes import tlib_storage
import config
config.dbengine = 'postgresql'
config.dbname = 'ccotest'
config.dbuser = 'ccotest'
config.dbpassword = 'cco'
config.dbschema = 'testing'
# PostgreSQL-specific settings
from scopes.storage.db.postgres import StorageFactory
config.storageFactory = StorageFactory(config)
#storage = factory(schema='testing')
class Test(unittest.TestCase):
def test_001_tracking(self):
tlib_storage.test_tracking(self, config)
def test_002_folder(self):
tlib_storage.test_folder(self, config)
def test_003_type(self):
tlib_storage.test_type(self, config)
def test_004_topic(self):
tlib_storage.test_topic(self, config)

View file

@ -1 +0,0 @@
# py-scopes/tests

View file

@ -1,57 +0,0 @@
# py-scopes/tests/config.py
import logging
from os import getenv
import sys
#from scopes.web.app import demo_app, zope_app
log_file = 'scopes/tests/log/scopes-test.log'
log_level = logging.INFO
log_format = '%(asctime)s %(levelname)s %(name)s %(message)s'
log_dateformat = '%Y-%m-%dT%H:%M:%S'
def setup_logging():
hdlr = logging.getLogger().handlers[-1]
logging.getLogger().removeHandler(hdlr) # remove NullHandler added by testrunner
logging.basicConfig(filename=log_file, level=log_level,
format=log_format, datefmt=log_dateformat)
setup_logging()
# server / app settings
server_port = '8999'
base_url = 'testing:'
#app = zope_app
# storage settings
# SQLite
dbengine = 'sqlite'
dbname = 'var/test.db'
dbuser = None
dbpassword = None
dbschema = None
# special testing stuff
from scopes.tests import data_auth # add oidc URIs and keys to dummy_requests data
from scopes.tests import dummy_requests
sys.modules['requests'] = dummy_requests
# authentication settings
oidc_provider = 'test://oidc'
oidc_client_id = getenv('OIDC_CLIENT_ID', '12345')
oidc_params = dict(
op_config_url=oidc_provider + '/.well-known/openid-configuration',
op_uris=None,
op_keys=None,
callback_url=getenv('OIDC_CALLBACK_URL', base_url + '/auth/callback'),
client_id=oidc_client_id,
principal_prefix=getenv('OIDC_PRINCIPAL_PREFIX', 'loops.'),
cookie_name=getenv('OIDC_COOKIE_NAME', 'oidc_' + oidc_client_id),
cookie_domain=getenv('OIDC_COOKIE_DOMAIN', None),
cookie_lifetime=getenv('OIDC_COOKIE_LIFETIME', '86400'),
cookie_crypt=getenv('OIDC_COOKIE_CRYPT', None)
)

View file

@ -1,33 +0,0 @@
# scopes.tests.data_auth
"""provide response data for testing (via dummy_requests)"""
from cryptography.hazmat.primitives.asymmetric import rsa
from scopes import util
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
public_key = private_key.public_key()
public_key_n = util.b64e(public_key.public_numbers().n.to_bytes(256)).decode('ASCII')
oidc_data = {
'test://oidc/.well-known/openid-configuration': {
"issuer": "test://oidc",
"authorization_endpoint": "test://oidc/oauth/v2/authorize",
"token_endpoint": "test://oidc/oauth/v2/token",
"introspection_endpoint": "test://oidc/oauth/v2/introspect",
"userinfo_endpoint": "test://oidc/oidc/v1/userinfo",
"revocation_endpoint": "test://oidc/oauth/v2/revoke",
"end_session_endpoint": "test://oidc/oidc/v1/end_session",
"device_authorization_endpoint": "test://oidc/oauth/v2/device_authorization",
"jwks_uri": "test://oidc/oauth/v2/keys"},
'test://oidc/oauth/v2/keys': { "keys": [
{"use": "sig",
"kty": "RSA",
"kid": "316766976250797901",
"alg": "RS256",
"n": public_key_n,
"e": "AQAB"}]}
}
from scopes.tests.dummy_requests import response_data
response_data.update(oidc_data)

View file

@ -1,22 +0,0 @@
# scopes.tests.requests
"""Dummy requests implementation for testing."""
from logging import getLogger
logger = getLogger('tests.dummy_requests')
def get(url, *args, **kw):
logger.info(f'get: %s - %s - %s', url, args, kw)
return FakeResponse(response_data[url])
class FakeResponse:
def __init__(self, data):
self.data = data
def json(self):
return self.data
response_data = {}

View file

@ -1 +0,0 @@
directory for logfiles created by tests

View file

@ -1,36 +0,0 @@
# scopes.tests.scratch
"""for keeping experimental and demonstration code and data (mainly for testing)"""
import base64
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
from cryptography.utils import int_to_bytes
from scopes import util
# generate (and serialize) or load private key
# generate public JWKS (key set) from private ky
#private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
#pem = private_key.private_bytes(encoding=serialization.Encoding.PEM,
# format=serialization.PrivateFormat.PKCS8,
# encryption_algorithm=serialization.NoEncryption())
pem = b'-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQD1ipW1hnV+OWF/\nE09YEQB9GTwQ0DUhmM1ZtmONV3xp1zrpLsvQtJJ6lZ8rp+ws35HGEEQWuBvgKP+K\nyyhILkgHp5LUtXsIruvo/+20Gbxr3bbIp2Omv8NuUnQIlb0SkZwauz/iLi+dFnoW\nTzkCwRnOPG+xqrSzD4CUCIZc4fU61h457I5PWTgK+bZXlsPytook9HpCIPX+9PaU\n935KHeWLonx3GZWqcqUyHSUn8dgRMypaKO70zCZkZWwDbCkJv7OUuY9QijN9niA+\nX3MFyqf9+WK7/lLnNYimdXk9Pz9Lim7HhoAApoW/0ueUfok54OIzuemTAUaunaN6\nmdOvh54dAgMBAAECggEAAWW5Vx7kAY1Bto104U51EEm519EvhUm6B5T4+VTp5LYK\nF1rEMIFwtBkoFfjzLJFEwEakJ8Hgq80TIRdyJ72r9ACIPGtTlH0Jjg94IqyvwOy6\n23fPEXkrLHR0Z5g8bR+6A95liQrA+tCD1QVFRAWUktlkVXfxRC95TMCA3i7rakGG\no6BbyUD2Hp7IEO5fbfKNiqrUO/LwvLOMBTe3Q9EVv7fblm21HiwWG26RgO29UtFo\nEKcaALSqIYwYCkc9EuLpcGRzLfH1opid3Xtu4+hm5enNgdXZRKr9lSQUCSi9oZRE\npszNGyDsaot2ixwKP3c6Eo3bQKMmXXw8Y3JsNiZzIQKBgQD8QuCjKyAeH0QrhW1Y\nvSmUC3q517WZPRgsEMx917MYn/Gl1ywsROjawP/ClxTZkpXVgALFvrggjB75p/QG\noFamZpf9U95fVivAkANQYmhGmqPVQUeb2vgHQOdPSdvvMoRumcEcfS/dOo9R//s+\nfrBQNYfp/tTttdyM2N0CzsuBzQKBgQD5LjX8SvLPe6q6xJchdExywbTk+DLtiKDs\nT5avaGhOqzYhir6FvEt7Gw74J6IWzIt5X4OteSkpRvj2TN0ik+KAfZOk+v9C5iK7\nxbicWj5XSaPJcYvhxogO7mb8sSrl/cY20HRkCsyQOKeH005PHsGZztZoHrsF6t2X\nbtKgNWp9kQKBgHLDHh09RlxNzx6Zkfh3/k1qt4eKmgQ/5hpN/ioWElVWloHjFSaC\npwi2GuT1BLhC1sWNejVqIaw08vaTMRI+qY0ESYsnN5hZxIfTPJ66VkQgn/4pt6Ex\nCfuKzHCm4la8vcDvVApY7YiQ1pjwguWYjy++WrnahBYs0UyGcG2RlMXVAoGBAJNJ\nf1ubqZ5+yNIQ9gwuRCno2dYl52SESCqmeLlCC7XEegCllCxUuoEP429HbgXv7dlW\nXe0iGvRtISflEyknJNEyaR0xx8RxZ8J6Ar9YkFTkEE44MajIww+gV3ux9Vtw/8LS\nwJmJ0JTHCC++9SDLW0BhBFcTIxVCWKz0MsfECyghAoGAOLm2dF59JiiiynGiWXzJ\nZcbZQStR+ysjLX+RqTATAorhweUiv+HwbB4boWhOnDD7q3t+93GnzeqI9m9Q5yo6\nRTyyNYa4uJXMMarZxi3m+RGvuCC2h+7ja6H6UGg3xrHNwcykJkuRAZvqCN+f6Nxo\njc8o9LMimQyTv+yptrOhi2w=\n-----END PRIVATE KEY-----\n'
private_key = serialization.load_pem_private_key(pem, password=None)
public_key = private_key.public_key()
nums = public_key.public_numbers()
#e = base64.urlsafe_b64encode(int_to_bytes(nums.e)).rstrip(b'=')
#e = base64.urlsafe_b64encode(nums.e.to_bytes(3)).rstrip(b'=')
e = b'AQAB'
#n = base64.urlsafe_b64encode(int_to_bytes(nums.n)).rstrip(b'=')
n = util.b64e(nums.n.to_bytes(256))
# data from example session with zitadel
keys_response = {"keys":[{"use":"sig","kty":"RSA","kid":"317781576895225677","alg":"RS256","n":"rBE2kf4S1-uLqTKoRcxmBDZXeNU0PcDZtXX3pYK4hbE_cVNqMnWZQNXh4asLvZ_VpInIRI_gRe5pFfrUfnFppnu7sgUGPHS_i6E1JSDWDDViwU0DbQXzTMA36xb8fI9vm_StrKqQ2snpkV2YprWC597vOa6w8eWMEfwWV39gmpgjyaYfwKi2ldvnn_djYSEyy6Cs8QW9L8uaTKDYUuuIY7iyjTRI7kM1boZcg26WQ4nDgSU8BMEmPGc-h7BHIm3PVEKNBUU3dRIcvIlnAx7AJoqdmUU0BllMGOd-To7PP760Vx4gYBsTFRGaQ_9P7vRApn0Wywnhj0BaVWGyJPkggQ","e":"AQAB"}]}
token_response = {'access_token': 'WGn4xjOluOgEugwqfxACziBZxAxQ1SKBKU6p27DysWw9slM55Fn7wkz_avpy5BjFR1uU_2rJoQTMNsO8NyxOEkmhbpP9xURUQrLRBiZT', 'token_type': 'Bearer', 'expires_in': 43199, 'id_token': 'eyJhbGciOiJSUzI1NiIsImtpZCI6IjMxNzc4MTU3Njg5NTIyNTY3NyIsInR5cCI6IkpXVCJ9.eyJhbXIiOlsicHdkIl0sImF0X2hhc2giOiJQbC1kMjBvRG9DRHZqTUtGMmF4S3N3IiwiYXVkIjpbIjMxMTYxMzExOTgxNjM5MjUyNSIsIjMxNDQ2OTIwOTA3OTkzMDcwMSIsIjMxMTQ3MjQ1MTc2ODg2ODY4NSJdLCJhdXRoX3RpbWUiOjE3NDU5MTAxNTksImF6cCI6IjMxMTYxMzExOTgxNjM5MjUyNSIsImNsaWVudF9pZCI6IjMxMTYxMzExOTgxNjM5MjUyNSIsImVtYWlsIjoiaGVsbXV0Lm1lcnpAcG9zdGVvLmRlIiwiZW1haWxfdmVyaWZpZWQiOnRydWUsImV4cCI6MTc0NTk1MzUwOSwiZmFtaWx5X25hbWUiOiJVc2VyIiwiZ2VuZGVyIjoibWFsZSIsImdpdmVuX25hbWUiOiJUZXN0IiwiaWF0IjoxNzQ1OTEwMzA5LCJpc3MiOiJodHRwczovL2ExLmN5Ny5kZSIsImxvY2FsZSI6ImRlIiwibmFtZSI6IlRlc3QgVXNlciIsIm5vbmNlIjoiMVJGem9RNDkyazNWTFEzcyIsInByZWZlcnJlZF91c2VybmFtZSI6InRzdDkiLCJzaWQiOiJWMV8zMTc3ODQyMjY2MjE2MTE4NTMiLCJzdWIiOiIzMTE4NDY3OTY3Mzk1MzQ2NjkiLCJ1cGRhdGVkX2F0IjoxNzQ1MzA1MTkzLCJ1cm46eml0YWRlbDppYW06b3JnOnByb2plY3Q6MzExNDcyNDUxNzY4ODY4Njg1OnJvbGVzIjp7ImNjLXVzZXIiOnsiMzExNDczNTAyMjc0MjQ4NTI1IjoiY3liZXJjb25jZXB0cy5hMS5jeTcuZGUifX0sInVybjp6aXRhZGVsOmlhbTpvcmc6cHJvamVjdDpyb2xlcyI6eyJjYy11c2VyIjp7IjMxMTQ3MzUwMjI3NDI0ODUyNSI6ImN5YmVyY29uY2VwdHMuYTEuY3k3LmRlIn19LCJ1cm46eml0YWRlbDppYW06dXNlcjpyZXNvdXJjZW93bmVyOmlkIjoiMzExNDczNTAyMjc0MjQ4NTI1IiwidXJuOnppdGFkZWw6aWFtOnVzZXI6cmVzb3VyY2Vvd25lcjpuYW1lIjoiY3liZXJjb25jZXB0cyIsInVybjp6aXRhZGVsOmlhbTp1c2VyOnJlc291cmNlb3duZXI6cHJpbWFyeV9kb21haW4iOiJjeWJlcmNvbmNlcHRzLmExLmN5Ny5kZSJ9.hx8JWXWVPq-WfBx9AMgFRWaEX3CioU8BCZwT3d8YETMejsR33LDsbPfnzO0Dbw885tiyNIsdNRj7lfk5HtuZVWdHx58hnaMw-czGhIfB0GXvuh2UFemBoLNJLajtRvCBi-YIZ_c6TD0Ryd2wn_yQlgY1J2R8xTsBWeKM_SYqlKU20r4r3nsX4krg6Q9x-Kc3k0Rg3lMXoxFYgQSzXrKjxTI3HHb03H4lzgmrdcbQqfbLoHKdvGxzfo-Gf8Rlt9rQ-fpGu7fRfPJQV14SPJ6CQtKP0adq_D2blcp1I1xKXML8TytJa05RjL_l_KH-segFfORtGMc4mYo8JUpMW0KOvg'}
token_id_claims = {'amr': ['pwd'], 'at_hash': 'Pl-d20oDoCDvjMKF2axKsw', 'aud': ['311613119816392525', '314469209079930701', '311472451768868685'], 'auth_time': 1745910159, 'azp': '311613119816392525', 'client_id': '311613119816392525', 'email': 'helmut.merz@posteo.de', 'email_verified': True, 'exp': 1745953509, 'family_name': 'User', 'gender': 'male', 'given_name': 'Test', 'iat': 1745910309, 'iss': 'https://a1.cy7.de', 'locale': 'de', 'name': 'Test User', 'nonce': '1RFzoQ492k3VLQ3s', 'preferred_username': 'tst9', 'sid': 'V1_317784226621611853', 'sub': '311846796739534669', 'updated_at': 1745305193, 'urn:zitadel:iam:org:project:311472451768868685:roles': {'cc-user': {'311473502274248525': 'cyberconcepts.a1.cy7.de'}}, 'urn:zitadel:iam:org:project:roles': {'cc-user': {'311473502274248525': 'cyberconcepts.a1.cy7.de'}}, 'urn:zitadel:iam:user:resourceowner:id': '311473502274248525', 'urn:zitadel:iam:user:resourceowner:name': 'cyberconcepts', 'urn:zitadel:iam:user:resourceowner:primary_domain': 'cyberconcepts.a1.cy7.de'}

View file

@ -1,44 +0,0 @@
# scopes.tests.test_postgres
"""Tests for the 'scopes.storage' package - using PostgreSQL."""
import os, sys
sys.path = [os.path.dirname(__file__)] + sys.path
import unittest
from scopes.tests import tlib_storage
from scopes.storage.db.postgres import StorageFactory
import config
config.dbengine = 'postgresql+psycopg'
config.dbname = 'testdb'
config.dbuser = 'testuser'
config.dbpassword = 'secret'
config.dbschema = 'testing'
config.storageFactory = StorageFactory(config)
class Test(unittest.TestCase):
def test_001_tracking(self):
tlib_storage.test_tracking(self, config)
def test_002_folder(self):
tlib_storage.test_folder(self, config)
def test_003_type(self):
tlib_storage.test_type(self, config)
def test_004_topic(self):
tlib_storage.test_topic(self, config)
def test_005_message(self):
tlib_storage.test_message(self, config)
def test_suite():
return unittest.TestSuite((
unittest.TestLoader().loadTestsFromTestCase(Test),
))
if __name__ == '__main__':
unittest.main(defaultTest='test_suite')

View file

@ -1,44 +0,0 @@
# scopes.tests.test_standard
"""Tests for the 'scopes.storage' package."""
import os, sys
sys.path = [os.path.dirname(__file__)] + sys.path
import unittest
from scopes.tests import tlib_web, tlib_storage
from scopes.storage.common import StorageFactory
import config
config.dbengine = 'sqlite'
config.dbname = 'var/test.db'
config.dbschema = None
config.storageFactory = StorageFactory(config)
class Test(unittest.TestCase):
def test_001_tracking(self):
tlib_storage.test_tracking(self, config)
def test_002_folder(self):
tlib_storage.test_folder(self, config)
def test_003_type(self):
tlib_storage.test_type(self, config)
def test_004_topic(self):
tlib_storage.test_topic(self, config)
def test_013_web(self):
tlib_web.test_app(self, config)
tlib_web.test_auth(self, config)
def suite():
return unittest.TestSuite((
unittest.TestLoader().loadTestsFromTestCase(Test),
))
if __name__ == '__main__':
unittest.main(defaultTest='suite')

View file

@ -1,42 +0,0 @@
# tests/tlib_web.py
"""Test implementation for the `scopes.web` package."""
import json
import logging
from zope.publisher.browser import TestRequest
from zope.publisher.publish import publish
from scopes.web.app import Publication
from scopes.storage.folder import Root
logger = logging.getLogger('tlib_web')
def publishRequest(config, storage, path):
appRoot = Root(storage)
request = TestRequest(environ=dict(PATH_INFO=path))
request.setPublication(Publication(appRoot))
request = publish(request, False)
return request.response
def test_app(self, config):
storage = config.storageFactory(config.dbschema)
response = publishRequest(config, storage, '/top')
logger.info('test_app: response %s %s', response.getStatus(), response.getHeaders())
result = json.loads(response.consumeBody())
self.assertEqual(result['items'][0]['head']['name'], 'level2-item1')
def test_auth(self, config):
from scopes.web.auth import oidc
oidc.startup() # todo: use generic app.startServices()
self.assertEqual(len(config.oidc_params['op_uris']), 8)
storage = config.storageFactory(config.dbschema)
response = publishRequest(config, storage, '/top/auth/login')
headers = dict(response.getHeaders())
logger.info('test_auth: response %s %s', response.getStatus(), headers)
self.assertEqual(response.getStatus(), 302)
uri = config.oidc_params['op_uris']['jwks_uri']
keys = oidc.loadOidcKeys(uri)
logger.info('test_auth keys: %s', keys)

View file

@ -1,9 +1,9 @@
# tests/tlib_storage.py
# scopes/tlib_storage.py
"""Test implementation for the `scopes.storage` package."""
from datetime import datetime
from scopes.storage import concept, folder, message, topic, tracking
from scopes.storage import concept, folder, topic, tracking
def test_tracking(self, config):
@ -82,7 +82,7 @@ def test_type(self, config):
concept.setupCoreTypes(storage)
types = storage.getContainer(concept.Type)
tps = list(types.query())
self.assertEqual(len(tps), 7)
self.assertEqual(len(tps), 6)
tfolder = types.queryLast(name='folder')
fldrs = list(tfolder.values())
@ -94,8 +94,6 @@ def test_type(self, config):
def test_topic(self, config):
storage = config.storageFactory(config.dbschema)
storage.dropTable('rels')
rels = storage.getContainer(concept.Triple)
storage.dropTable('topics')
topics = storage.getContainer(topic.Topic)
types = storage.getContainer(concept.Type)
@ -114,7 +112,6 @@ def test_topic(self, config):
title='Programming Languages',
description='Programming Languages'))
topics.save(tp_proglang)
#storage.commit() # avoid "database locked" error with sqlite
tp_itc.addChild(tp_proglang)
c = list(tp_itc.children())
@ -122,12 +119,3 @@ def test_topic(self, config):
storage.commit()
def test_message(self, config):
storage = config.storageFactory(config.dbschema)
storage.dropTable('messages')
tracks = storage.create(message.Messages)
storage.commit()

View file

@ -1,26 +0,0 @@
# scopes.util
import base64
import hashlib
from secrets import choice
import string
# random strings, hashes, encodings
# for authentication, encryption, and other purposes
BASE = string.ascii_letters + string.digits
BASE2 = BASE + '-._~'
def rndstr(size=16):
return ''.join([choice(BASE) for _ in range(size)])
def rndstr2(size=64):
return ''.join([choice(BASE2) for _ in range(size)])
def b64e(b):
return base64.urlsafe_b64encode(b).rstrip(b'=')
def hashS256(s):
h = hashlib.sha256(s.encode('ascii')).digest()
return b64e(h).decode('ascii')

View file

@ -1 +0,0 @@
"""package scopes.web"""

View file

@ -1 +0,0 @@
"""package scopes.web.auth"""

View file

@ -1,239 +0,0 @@
# scopes.web.auth.uidc
from cryptography.fernet import Fernet
from email.utils import formatdate
import json
import jwt
import logging
import requests
from time import time
from urllib.parse import urlencode
from zope.authentication.interfaces import IAuthentication, IPrincipal
from zope.interface import implementer
from zope.publisher.interfaces import Unauthorized
from zope.security.interfaces import IGroupAwarePrincipal
from scopes.web.browser import DefaultView, register
from scopes.storage.folder import DummyFolder, Root
from scopes import util
import config
logger = logging.getLogger('web.auth.oidc')
@implementer(IAuthentication)
class OidcAuthentication:
def __init__(self, baseAuth):
self.baseAuth = baseAuth
def authenticate(self, request):
auth = Authenticator(request)
prc = auth.authenticate()
if prc is None and self.baseAuth is not None:
prc = self.baseAuth.authenticate(request)
return prc
def getPrincipal(self, id):
if self.baseAuth is not None:
return self.baseAuth.getPrincipal(id)
def unauthenticatedPrincipal(self):
if self.baseAuth is not None:
return self.baseAuth.unauthenticatedPrincipal()
def unauthorized(self, id, request):
if self.baseAuth is not None:
return self.baseAuth.unauthorized(id, request)
Authenticator(request).login()
def logout(self, request):
Authenticator(request).logout()
authentication = OidcAuthentication(None)
@implementer(IGroupAwarePrincipal)
class Principal:
group_prefix = 'gloops.'
def __init__(self, id, data):
self.id = id
self.data = data
@property
def title(self):
return self.data['name']
@property
def groups(self):
groups = [self.group_prefix + g for g in self.data.get('groups', [])]
return groups
def asDict(self):
data = self.data.copy()
data['id'] = self.id
return data
class Authenticator(DummyFolder):
prefix = 'auth.oidc'
def __init__(self, request):
self.request = request
self.params = config.oidc_params
self.reqUrl = config.base_url
self.setCrypt(self.params.get('cookie_crypt'))
def setReqUrl(self, base, path):
self.reqUrl = '/'.join((base, path))
def setCrypt(self, key):
self.cookieCrypt = key and Fernet(key) or None
def authenticate(self):
''' return principal or None'''
data = self.loadSession()
logger.debug('authenticate: %s', data)
if data and 'userid' in data:
id = self.params.get('principal_prefix', '') + data.pop('userid')
return Principal(id, data)
return None
def login(self):
state = util.rndstr()
nonce = util.rndstr()
codeVerifier = util.rndstr2()
codeChallenge = util.hashS256(codeVerifier)
args = dict(
client_id=self.params['client_id'],
response_type='code', # 'code id_token token',
state=state, nonce=nonce,
code_challenge=codeChallenge, code_challenge_method='S256',
scope='openid profile email urn:zitadel:iam:user:resourceowner',
redirect_uri=self.params['callback_url'],
request_uri=self.reqUrl,
)
self.storeSession(dict(state=state, nonce=nonce, code_verifier=codeVerifier))
authUrl = self.params['op_uris']['authorization_endpoint']
loginUrl = '?'.join((authUrl, urlencode(args)))
logger.debug('login: URL %s', loginUrl)
self.request.response.redirect(loginUrl, trusted=True)
def callback(self):
req = self.request
logger.debug('callback: %s %s', self, req.form)
sdata = self.loadSession()
code = req.form['code']
# !check state: req.form['state'] == sdata['state']
args = dict(
grant_type='authorization_code',
code=code,
redirect_uri=self.params['callback_url'],
client_id=self.params['client_id'],
code_verifier=sdata['code_verifier']
)
# !set header: 'Content-Type: application/x-www-form-urlencoded'
tokenUrl = self.params['op_uris']['token_endpoint']
tokenResponse = requests.post(tokenUrl, data=args)
tdata = tokenResponse.json()
userData = self.getIdTokenData(tdata['id_token'])
groupInfo = userData.get('urn:zitadel:iam:org:project:roles', {})
ndata = dict(
userid=userData['preferred_username'],
name=userData['name'],
email=userData['email'],
groups=list(groupInfo.keys()),
access_token=tdata['access_token'],
session_id=userData['sid'],
)
self.storeSession(ndata)
logger.debug('callback: session data: %s', ndata)
req.response.redirect(self.reqUrl, trusted=True)
def logout(self):
cname = self.params['cookie_name']
logger.debug('logout, cookie: %s', cname)
self.request.response.expireCookie(cname, path='/')
self.request.response.redirect(config.base_url, trusted=True)
def storeSession(self, data):
lifetime = int(self.params['cookie_lifetime'])
options = dict(
path='/',
expires=formatdate(time() + lifetime, localtime=False, usegmt=True),
httponly=True,
)
options['max-age'] = lifetime
domain = self.params['cookie_domain']
if domain:
options['domain'] = domain
name = self.params['cookie_name']
value = json.dumps(data)
if self.cookieCrypt:
value = self.cookieCrypt.encrypt(value.encode('UTF-8')).decode('ASCII')
self.request.response.setCookie(name, value, **options)
def loadSession(self):
cookie = self.request.getCookies().get(self.params['cookie_name'])
if cookie is None:
return {}
if self.cookieCrypt:
cookie = self.cookieCrypt.decrypt(cookie)
# !error check: return None - or raise error?
data = json.loads(cookie)
return data
def getIdTokenData(self, token):
uri = self.params['op_uris']['jwks_uri']
keys = loadOidcKeys(uri)
header = jwt.get_unverified_header(token)
key = jwt.PyJWK(keys[header['kid']])
return jwt.decode(token, key, audience=self.params['client_id'])
@register('auth')
def authView(context, request):
return Authenticator(request)
@register('login', Authenticator)
def login(context, request):
context.login()
return DefaultView(context, request)
@register('callback', Authenticator)
def callback(context, request):
context.callback()
return DefaultView(context, request)
@register('logout', Authenticator)
def logout(context, request):
context.logout()
return DefaultView(context, request)
def startup():
loadOidcProviderData()
#app.Publication.registerBeforeTraversal(
# lambda req: req.setPrincipal(authentication.authenticate(req))
oidcProviderUris = ['authorization_endpoint', 'token_endpoint',
'introspection_endpoint', 'userinfo_endpoint',
'revocation_endpoint', 'end_session_endpoint',
'device_authorization_endpoint', 'jwks_uri']
def loadOidcProviderData(force=False):
params = config.oidc_params
if force or params.get('op_uris') is None:
uris = params['op_uris'] = {}
opData = requests.get(params['op_config_url']).json()
for key in oidcProviderUris:
uris[key] = opData[key]
#if force or params.get('op_keys') is None:
#params['op_keys'] = requests.get(uris['jwks_uri']).json()['keys']
def loadOidcKeys(uri):
return dict((item['kid'], item) for item in requests.get(uri).json()['keys'])

View file

@ -1,4 +1,47 @@
from setuptools import setup
from setuptools import setup, find_packages
import os
setup()
version = '2.0'
long_description = (
open('README.md').read()
+ '\n' +
'Contributors\n'
'============\n'
+ '\n' +
open('CONTRIBUTORS.txt').read()
+ '\n' +
open('CHANGES.txt').read()
+ '\n')
setup(name='py-scopes',
version=version,
description="combined triple and event storage for the cco application platform",
long_description=long_description,
# Get more strings from
# http://pypi.python.org/pypi?%3Aaction=list_classifiers
classifiers=[
"Programming Language :: Python",
],
keywords='',
author='cyberconcepts.org team',
author_email='team@cyberconcepts.org',
url='http://www.cyberconcepts.org',
license='MIT',
packages=find_packages(),
#package_dir = {'': 'src'},
#namespace_packages=['cco'],
include_package_data=True,
zip_safe=False,
install_requires=[
'setuptools',
'transaction',
'psycopg2-binary',
'SQLAlchemy',
'zope.sqlalchemy',
# -*- Extra requirements: -*-
],
entry_points="""
# -*- Entry points: -*-
""",
)