Compare commits

...
Sign in to create a new pull request.

37 commits

Author SHA1 Message Date
b55191dab3 storage.message: definitions, start with tests 2025-04-30 18:08:45 +02:00
ee5a76a808 auth improvements (JWT stuff, esp tests); new: storage.message (event store) 2025-04-30 16:42:40 +02:00
5eb9531997 auth: directly use user data from id_token (no user_info request) 2025-04-29 17:36:02 +02:00
99f717a816 provide some example data + code (for JWT encoding/decoding) in 'scratch' module 2025-04-29 15:05:03 +02:00
258baa88b2 auth fixes (fetch keys via request); collect responses for testing in separate file 2025-04-29 09:18:57 +02:00
6857601ab8 work in progress: oidc auth tests 2025-04-28 15:29:21 +02:00
01fc7d2874 auth, work in progress: decode id_token, + other improvements 2025-04-25 20:34:52 +02:00
87310b9798 auth: improve loading of oidc provider data, provide and check in test 2025-04-23 09:10:43 +02:00
b2d1c7888b work in progress: dummy oidc data handler for testing 2025-04-22 11:10:43 +02:00
f21910e675 auth: get OIDC provider URIs, endpoints, and keys from OP config URLs 2025-04-21 11:53:23 +02:00
96afb631e0 rename server to web 2025-04-20 16:42:13 +02:00
bba081156b create package server.auth with module oidc (former server.auth module) 2025-04-20 11:10:05 +02:00
67985a6bdb auth: logout: start implementation (expire cookie) 2025-04-20 10:42:42 +02:00
d128c5f138 auth: use logger.debug instead of print 2025-04-16 16:51:32 +02:00
1918183c59 basic logging set-up 2025-04-16 10:54:25 +02:00
cae934c2d7 fix pyproject: +requests 2025-04-15 18:33:37 +02:00
4b791cf83b auth: principal with correct groups => login and auth basically working 2025-04-07 10:21:25 +02:00
2a52d8a481 auth: user info -> principal 2025-04-06 22:39:10 +02:00
35cf8884bf auth: unauthorized: call login() 2025-04-05 17:33:36 +02:00
8d3ff5b667 auth: store user data in cookie, retrieve in authenticate() 2025-04-05 12:31:26 +02:00
7bca60e74c auth: basic OIDC flow with cookie encryption and final redirect working 2025-04-04 16:48:53 +02:00
ec80be5f97 use waitress as http server; provide simple shell script; auth improvments 2025-03-27 22:35:19 +01:00
f911dbf590 oidc auth: get rid of pyoidc (oid) - provide random and crypt functionality in scopes.util 2025-03-27 08:27:02 +01:00
950fcb4174 oidc auth: login and retrieval of user data basically working 2025-03-26 18:15:38 +01:00
c1f07effee oidc auth: improvements, store info in cookie 2025-03-26 15:43:50 +01:00
0207d12b46 work in progress: oidc auth - redirect to oidc provider OK, start processing callback 2025-03-25 11:08:14 +01:00
87c0c1db2e work in progress: oidc authentication: start login processing 2025-03-24 22:26:17 +01:00
3e25b5e593 work in progress: entry points for OpenID Connect (oidc) authentication 2025-03-24 12:04:53 +01:00
b4051147ee server.browser.register: allow for explicit string prefixes as context types 2025-03-22 11:02:36 +01:00
f04297d570 provide DummyMailDelivery for testing; fix auth 2025-03-09 09:31:59 +01:00
efd47419a0 allow additional db parameters when setting up storage 2025-02-11 11:31:56 +01:00
1eff3d2c8b fix auth: don't raise Unauthorized 2024-11-22 11:34:40 +01:00
eaa2055c76 work in progress: JWT authentication: baseAuth as property, remove registration function 2024-11-16 09:27:01 +01:00
a3da3f07f0 work in progress: JWT authentication 2024-11-15 08:41:42 +01:00
ab050ba360 move tests into scopes package, provide runtests.sh calling zope-testrunner 2024-09-30 09:50:58 +02:00
c9cb428707 fix tests: avoid db locked error with sqlite3 2024-09-20 15:21:56 +02:00
a4c24a44d8 avoid 'db locked' error with sqlite on first run: provide additional commit() 2024-08-17 11:26:04 +02:00
33 changed files with 665 additions and 92 deletions

1
.gitignore vendored
View file

@ -2,6 +2,7 @@
*.pyo
*.egg-info
*.project
*.log
*.swp
*.pydevproject
*.sublime-project

View file

@ -1,12 +1,21 @@
# py-scopes/demo/config.py
from dotenv import load_dotenv
import logging
from os import getenv
from scopes.server.app import zope_app_factory
from scopes.web.app import zope_app_factory
load_dotenv()
log_file = 'log/scopes.log'
log_level = logging.DEBUG
log_format = '%(asctime)s %(levelname)s %(name)s %(message)s'
log_dateformat = '%Y-%m-%dT%H:%M:%S'
logging.basicConfig(filename=log_file, level=log_level,
format=log_format, datefmt=log_dateformat)
server_port = getenv('SERVER_PORT', '8099')
base_url = getenv('BASE_URL', 'https://demo.cy7.de')
app_factory = zope_app_factory
@ -18,3 +27,18 @@ dbuser = getenv('DBUSER', 'demo')
dbpassword = getenv('DBPASSWORD', 'secret')
dbschema = getenv('DBSCHEMA', 'demo')
# authentication settings
oidc_provider = getenv('OIDC_PROVIDER', 'https://a1.cy7.de')
oidc_client_id = getenv('OIDC_CLIENT_ID', '311613119816392525')
oidc_params = dict(
op_config_url=oidc_provider + '/.well-known/openid-configuration',
op_uris=None,
op_keys=None,
callback_url=getenv('OIDC_CALLBACK_URL', base_url + '/auth/callback'),
client_id = oidc_client_id,
cookie_name=getenv('OIDC_COOKIE_NAME', 'oidc_' + oidc_client_id),
cookie_domain=getenv('OIDC_COOKIE_DOMAIN', None),
cookie_lifetime=getenv('OIDC_COOKIE_LIFETIME', '86400'),
cookie_crypt=getenv('OIDC_COOKIE_CRYPT', None)
)

View file

@ -1,20 +1,21 @@
# py-scopes/demo/demo_server.py
from wsgiref.simple_server import make_server
from scopes.web.auth import oidc
from scopes.storage import topic
import logging
import waitress
def run(app, config):
oidc.startup() # todo: use generic app.startServices()
port = int(config.server_port)
with make_server('', port, app) as httpd:
print(f'Serving on port {port}.')
try:
httpd.serve_forever()
except KeyboardInterrupt:
print('Shutting down.')
print(f'Serving on port {port}.')
waitress.serve(app, port=port)
if __name__ == '__main__':
import config
#run(config.app, config)
app = config.app_factory(config)
run(app, config)
# see zope.app.wsgi.getWSGIApplication

2
demo/log/README.md Normal file
View file

@ -0,0 +1,2 @@
directory for logfiles created by application

11
demo/shell.py Normal file
View file

@ -0,0 +1,11 @@
# scopes/demo/shell.py
# simple shell for interactive testing / accessing the database (storage)
# use: `python -i shell.py`
import config
from scopes.web.auth import oidc
from scopes.storage.folder import Root
from scopes.storage import topic
storage = config.StorageFactory(config)(config.dbschema)
root = Root(storage)

View file

@ -21,14 +21,23 @@ postgres = [
"transaction",
"zope.sqlalchemy",
]
app = ["python-dotenv", "zope.publisher", "zope.traversing"]
test = ["pytest"]
app = [
"python-dotenv",
"waitress",
"zope.authentication",
"zope.interface",
"zope.publisher",
"zope.traversing",
]
auth = ["pyjwt[crypto]", "cryptography", "requests"]
test = ["zope.testrunner"]
#test = ["pytest"]
[tool.setuptools]
packages = ["scopes"]
[tool.pytest.ini_options]
addopts = "-vv"
python_files = "test_standard.py" # default: run only `standard` tests
#[tool.pytest.ini_options]
#addopts = "-vv"
#python_files = "test_standard.py" # default: run only `standard` tests
# use .pytest.ini file with `python_files = test_*.py` to run all tests

View file

@ -1,3 +1,5 @@
# runtests.sh
# run all unit / doc tests
zope-testrunner --test-path=. $*
python tests/test_postgres.py
python tests/test_standard.py

View file

@ -1,9 +1,14 @@
# scopes.interfaces
from zope.interface import Interface
from zope.interface import Interface, Attribute
class ITraversable(Interface):
class IViewable(Interface):
prefix = Attribute('Prefix string for identifying the type (class) of an object')
class ITraversable(IViewable):
def get(key, default=None):
"""Return the item addressed by `key`; return `default` if not found."""

16
scopes/organize/mail.py Normal file
View file

@ -0,0 +1,16 @@
# scopes.organize.mail
from zope.interface import implementer
from zope.sendmail.interfaces import IMailDelivery
"""Utilities for creating and sending emails."""
@implementer(IMailDelivery)
class DummyMailDelivery:
"""For testing purposes: just store mails in maildata.log"""
def send(self, fromaddr, toaddrs, message):
print("DummyMailDelivery")
print(f"fromaddr: {fromaddr}, toaddrs: {toaddrs}")
print(message)

View file

@ -1 +0,0 @@
"""package scopes.server"""

View file

@ -52,8 +52,8 @@ class Storage(object):
return metadata.tables.get((schema and schema + '.' or '') + tableName)
def dropTable(self, tableName):
prefix = self.schema and self.schema + '.' or ''
with self.engine.begin() as conn:
prefix = self.schema and self.schema + '.' or ''
conn.execute(text('drop table if exists %s%s' % (prefix, tableName)))
def resetSequence(self, tableName, colName, v):
@ -63,15 +63,19 @@ class Storage(object):
conn.execute(text(sq))
class StorageFactory(object):
class StorageFactory:
def sessionFactory(self):
return self.engine.connect
@staticmethod
def getEngine(dbtype, dbname, user, pw, host='localhost', port=5432, **kw):
def getEngine(dbtype, dbname, user, pw, **kw):
return create_engine('%s:///%s' % (dbtype, dbname), **kw)
def engineFromConfig(self, config):
return self.getEngine(config.dbengine, config.dbname,
config.dbuser, config.dbpassword)
@staticmethod
def mark_changed(session):
pass
@ -86,8 +90,7 @@ class StorageFactory(object):
storageClass = Storage
def __init__(self, config, storageClass=None):
self.engine = self.getEngine(config.dbengine, config.dbname,
config.dbuser, config.dbpassword)
self.engine = self.engineFromConfig(config)
self.Session = self.sessionFactory()
if storageClass is not None:
self.storageClass = storageClass

View file

@ -24,6 +24,12 @@ class StorageFactory(StorageFactory):
return create_engine('%s://%s:%s@%s:%s/%s' % (
dbtype, user, pw, host, port, dbname), **kw)
def engineFromConfig(self, config):
return self.getEngine(config.dbengine, config.dbname,
config.dbuser, config.dbpassword,
host=getattr(config, 'dbhost', 'localhost'),
port=getattr(config, 'dbport', 5432))
@staticmethod
def mark_changed(session):
return mark_changed(session)

View file

@ -7,6 +7,18 @@ from scopes.storage.common import registerContainerClass
from scopes.storage.tracking import Container, Track
class DummyFolder(dict):
prefix = 'dummy'
def asDict(self):
return self
def __repr__(self):
return '<%s: %s>' % (self.__class__.__name__,
super(DummyFolder, self).__repr__())
@implementer(IContainer, IReference)
class Folder(Track):
@ -57,6 +69,8 @@ class Root(Folder):
"""A dummy (virtual) root folder for creating real folders
using the Folder API."""
prefix = 'root'
def __init__(self, storage):
cont = storage.create(Folders)
super(Root, self).__init__(container=cont)

21
scopes/storage/message.py Normal file
View file

@ -0,0 +1,21 @@
# scopes.storage.message
"""Generic messages (or events) to be stored in SQL database."""
from scopes.storage.common import registerContainerClass
from scopes.storage.tracking import Container, Track
class Message(Track):
headFields = ['domain', 'action', 'class', 'item']
prefix = 'msg'
@registerContainerClass
class Messages(Container):
itemFactory = Message
indexes = [('domain', 'action', 'class', 'item'), ('domain', 'class', 'item')]
tableName = 'messages'
insertOnChange = True

View file

@ -67,7 +67,7 @@ class Track(object):
return str(self.trackId)
def __repr__(self):
return '%s: %s' % (self.__class__.__name__, self.asDict())
return '<%s: %s>' % (self.__class__.__name__, self.asDict())
def asDict(self):
return dict(uid=self.uid, head=self.head, data=self.data,

1
scopes/tests/__init__.py Normal file
View file

@ -0,0 +1 @@
# py-scopes/tests

57
scopes/tests/config.py Normal file
View file

@ -0,0 +1,57 @@
# py-scopes/tests/config.py
import logging
from os import getenv
import sys
#from scopes.web.app import demo_app, zope_app
log_file = 'scopes/tests/log/scopes-test.log'
log_level = logging.INFO
log_format = '%(asctime)s %(levelname)s %(name)s %(message)s'
log_dateformat = '%Y-%m-%dT%H:%M:%S'
def setup_logging():
hdlr = logging.getLogger().handlers[-1]
logging.getLogger().removeHandler(hdlr) # remove NullHandler added by testrunner
logging.basicConfig(filename=log_file, level=log_level,
format=log_format, datefmt=log_dateformat)
setup_logging()
# server / app settings
server_port = '8999'
base_url = 'testing:'
#app = zope_app
# storage settings
# SQLite
dbengine = 'sqlite'
dbname = 'var/test.db'
dbuser = None
dbpassword = None
dbschema = None
# special testing stuff
from scopes.tests import data_auth # add oidc URIs and keys to dummy_requests data
from scopes.tests import dummy_requests
sys.modules['requests'] = dummy_requests
# authentication settings
oidc_provider = 'test://oidc'
oidc_client_id = getenv('OIDC_CLIENT_ID', '12345')
oidc_params = dict(
op_config_url=oidc_provider + '/.well-known/openid-configuration',
op_uris=None,
op_keys=None,
callback_url=getenv('OIDC_CALLBACK_URL', base_url + '/auth/callback'),
client_id=oidc_client_id,
principal_prefix=getenv('OIDC_PRINCIPAL_PREFIX', 'loops.'),
cookie_name=getenv('OIDC_COOKIE_NAME', 'oidc_' + oidc_client_id),
cookie_domain=getenv('OIDC_COOKIE_DOMAIN', None),
cookie_lifetime=getenv('OIDC_COOKIE_LIFETIME', '86400'),
cookie_crypt=getenv('OIDC_COOKIE_CRYPT', None)
)

33
scopes/tests/data_auth.py Normal file
View file

@ -0,0 +1,33 @@
# scopes.tests.data_auth
"""provide response data for testing (via dummy_requests)"""
from cryptography.hazmat.primitives.asymmetric import rsa
from scopes import util
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
public_key = private_key.public_key()
public_key_n = util.b64e(public_key.public_numbers().n.to_bytes(256)).decode('ASCII')
oidc_data = {
'test://oidc/.well-known/openid-configuration': {
"issuer": "test://oidc",
"authorization_endpoint": "test://oidc/oauth/v2/authorize",
"token_endpoint": "test://oidc/oauth/v2/token",
"introspection_endpoint": "test://oidc/oauth/v2/introspect",
"userinfo_endpoint": "test://oidc/oidc/v1/userinfo",
"revocation_endpoint": "test://oidc/oauth/v2/revoke",
"end_session_endpoint": "test://oidc/oidc/v1/end_session",
"device_authorization_endpoint": "test://oidc/oauth/v2/device_authorization",
"jwks_uri": "test://oidc/oauth/v2/keys"},
'test://oidc/oauth/v2/keys': { "keys": [
{"use": "sig",
"kty": "RSA",
"kid": "316766976250797901",
"alg": "RS256",
"n": public_key_n,
"e": "AQAB"}]}
}
from scopes.tests.dummy_requests import response_data
response_data.update(oidc_data)

View file

@ -0,0 +1,22 @@
# scopes.tests.requests
"""Dummy requests implementation for testing."""
from logging import getLogger
logger = getLogger('tests.dummy_requests')
def get(url, *args, **kw):
logger.info(f'get: %s - %s - %s', url, args, kw)
return FakeResponse(response_data[url])
class FakeResponse:
def __init__(self, data):
self.data = data
def json(self):
return self.data
response_data = {}

View file

@ -0,0 +1 @@
directory for logfiles created by tests

36
scopes/tests/scratch.py Normal file
View file

@ -0,0 +1,36 @@
# scopes.tests.scratch
"""for keeping experimental and demonstration code and data (mainly for testing)"""
import base64
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
from cryptography.utils import int_to_bytes
from scopes import util
# generate (and serialize) or load private key
# generate public JWKS (key set) from private ky
#private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
#pem = private_key.private_bytes(encoding=serialization.Encoding.PEM,
# format=serialization.PrivateFormat.PKCS8,
# encryption_algorithm=serialization.NoEncryption())
pem = b'-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQD1ipW1hnV+OWF/\nE09YEQB9GTwQ0DUhmM1ZtmONV3xp1zrpLsvQtJJ6lZ8rp+ws35HGEEQWuBvgKP+K\nyyhILkgHp5LUtXsIruvo/+20Gbxr3bbIp2Omv8NuUnQIlb0SkZwauz/iLi+dFnoW\nTzkCwRnOPG+xqrSzD4CUCIZc4fU61h457I5PWTgK+bZXlsPytook9HpCIPX+9PaU\n935KHeWLonx3GZWqcqUyHSUn8dgRMypaKO70zCZkZWwDbCkJv7OUuY9QijN9niA+\nX3MFyqf9+WK7/lLnNYimdXk9Pz9Lim7HhoAApoW/0ueUfok54OIzuemTAUaunaN6\nmdOvh54dAgMBAAECggEAAWW5Vx7kAY1Bto104U51EEm519EvhUm6B5T4+VTp5LYK\nF1rEMIFwtBkoFfjzLJFEwEakJ8Hgq80TIRdyJ72r9ACIPGtTlH0Jjg94IqyvwOy6\n23fPEXkrLHR0Z5g8bR+6A95liQrA+tCD1QVFRAWUktlkVXfxRC95TMCA3i7rakGG\no6BbyUD2Hp7IEO5fbfKNiqrUO/LwvLOMBTe3Q9EVv7fblm21HiwWG26RgO29UtFo\nEKcaALSqIYwYCkc9EuLpcGRzLfH1opid3Xtu4+hm5enNgdXZRKr9lSQUCSi9oZRE\npszNGyDsaot2ixwKP3c6Eo3bQKMmXXw8Y3JsNiZzIQKBgQD8QuCjKyAeH0QrhW1Y\nvSmUC3q517WZPRgsEMx917MYn/Gl1ywsROjawP/ClxTZkpXVgALFvrggjB75p/QG\noFamZpf9U95fVivAkANQYmhGmqPVQUeb2vgHQOdPSdvvMoRumcEcfS/dOo9R//s+\nfrBQNYfp/tTttdyM2N0CzsuBzQKBgQD5LjX8SvLPe6q6xJchdExywbTk+DLtiKDs\nT5avaGhOqzYhir6FvEt7Gw74J6IWzIt5X4OteSkpRvj2TN0ik+KAfZOk+v9C5iK7\nxbicWj5XSaPJcYvhxogO7mb8sSrl/cY20HRkCsyQOKeH005PHsGZztZoHrsF6t2X\nbtKgNWp9kQKBgHLDHh09RlxNzx6Zkfh3/k1qt4eKmgQ/5hpN/ioWElVWloHjFSaC\npwi2GuT1BLhC1sWNejVqIaw08vaTMRI+qY0ESYsnN5hZxIfTPJ66VkQgn/4pt6Ex\nCfuKzHCm4la8vcDvVApY7YiQ1pjwguWYjy++WrnahBYs0UyGcG2RlMXVAoGBAJNJ\nf1ubqZ5+yNIQ9gwuRCno2dYl52SESCqmeLlCC7XEegCllCxUuoEP429HbgXv7dlW\nXe0iGvRtISflEyknJNEyaR0xx8RxZ8J6Ar9YkFTkEE44MajIww+gV3ux9Vtw/8LS\nwJmJ0JTHCC++9SDLW0BhBFcTIxVCWKz0MsfECyghAoGAOLm2dF59JiiiynGiWXzJ\nZcbZQStR+ysjLX+RqTATAorhweUiv+HwbB4boWhOnDD7q3t+93GnzeqI9m9Q5yo6\nRTyyNYa4uJXMMarZxi3m+RGvuCC2h+7ja6H6UGg3xrHNwcykJkuRAZvqCN+f6Nxo\njc8o9LMimQyTv+yptrOhi2w=\n-----END PRIVATE KEY-----\n'
private_key = serialization.load_pem_private_key(pem, password=None)
public_key = private_key.public_key()
nums = public_key.public_numbers()
#e = base64.urlsafe_b64encode(int_to_bytes(nums.e)).rstrip(b'=')
#e = base64.urlsafe_b64encode(nums.e.to_bytes(3)).rstrip(b'=')
e = b'AQAB'
#n = base64.urlsafe_b64encode(int_to_bytes(nums.n)).rstrip(b'=')
n = util.b64e(nums.n.to_bytes(256))
# data from example session with zitadel
keys_response = {"keys":[{"use":"sig","kty":"RSA","kid":"317781576895225677","alg":"RS256","n":"rBE2kf4S1-uLqTKoRcxmBDZXeNU0PcDZtXX3pYK4hbE_cVNqMnWZQNXh4asLvZ_VpInIRI_gRe5pFfrUfnFppnu7sgUGPHS_i6E1JSDWDDViwU0DbQXzTMA36xb8fI9vm_StrKqQ2snpkV2YprWC597vOa6w8eWMEfwWV39gmpgjyaYfwKi2ldvnn_djYSEyy6Cs8QW9L8uaTKDYUuuIY7iyjTRI7kM1boZcg26WQ4nDgSU8BMEmPGc-h7BHIm3PVEKNBUU3dRIcvIlnAx7AJoqdmUU0BllMGOd-To7PP760Vx4gYBsTFRGaQ_9P7vRApn0Wywnhj0BaVWGyJPkggQ","e":"AQAB"}]}
token_response = {'access_token': 'WGn4xjOluOgEugwqfxACziBZxAxQ1SKBKU6p27DysWw9slM55Fn7wkz_avpy5BjFR1uU_2rJoQTMNsO8NyxOEkmhbpP9xURUQrLRBiZT', 'token_type': 'Bearer', 'expires_in': 43199, 'id_token': 'eyJhbGciOiJSUzI1NiIsImtpZCI6IjMxNzc4MTU3Njg5NTIyNTY3NyIsInR5cCI6IkpXVCJ9.eyJhbXIiOlsicHdkIl0sImF0X2hhc2giOiJQbC1kMjBvRG9DRHZqTUtGMmF4S3N3IiwiYXVkIjpbIjMxMTYxMzExOTgxNjM5MjUyNSIsIjMxNDQ2OTIwOTA3OTkzMDcwMSIsIjMxMTQ3MjQ1MTc2ODg2ODY4NSJdLCJhdXRoX3RpbWUiOjE3NDU5MTAxNTksImF6cCI6IjMxMTYxMzExOTgxNjM5MjUyNSIsImNsaWVudF9pZCI6IjMxMTYxMzExOTgxNjM5MjUyNSIsImVtYWlsIjoiaGVsbXV0Lm1lcnpAcG9zdGVvLmRlIiwiZW1haWxfdmVyaWZpZWQiOnRydWUsImV4cCI6MTc0NTk1MzUwOSwiZmFtaWx5X25hbWUiOiJVc2VyIiwiZ2VuZGVyIjoibWFsZSIsImdpdmVuX25hbWUiOiJUZXN0IiwiaWF0IjoxNzQ1OTEwMzA5LCJpc3MiOiJodHRwczovL2ExLmN5Ny5kZSIsImxvY2FsZSI6ImRlIiwibmFtZSI6IlRlc3QgVXNlciIsIm5vbmNlIjoiMVJGem9RNDkyazNWTFEzcyIsInByZWZlcnJlZF91c2VybmFtZSI6InRzdDkiLCJzaWQiOiJWMV8zMTc3ODQyMjY2MjE2MTE4NTMiLCJzdWIiOiIzMTE4NDY3OTY3Mzk1MzQ2NjkiLCJ1cGRhdGVkX2F0IjoxNzQ1MzA1MTkzLCJ1cm46eml0YWRlbDppYW06b3JnOnByb2plY3Q6MzExNDcyNDUxNzY4ODY4Njg1OnJvbGVzIjp7ImNjLXVzZXIiOnsiMzExNDczNTAyMjc0MjQ4NTI1IjoiY3liZXJjb25jZXB0cy5hMS5jeTcuZGUifX0sInVybjp6aXRhZGVsOmlhbTpvcmc6cHJvamVjdDpyb2xlcyI6eyJjYy11c2VyIjp7IjMxMTQ3MzUwMjI3NDI0ODUyNSI6ImN5YmVyY29uY2VwdHMuYTEuY3k3LmRlIn19LCJ1cm46eml0YWRlbDppYW06dXNlcjpyZXNvdXJjZW93bmVyOmlkIjoiMzExNDczNTAyMjc0MjQ4NTI1IiwidXJuOnppdGFkZWw6aWFtOnVzZXI6cmVzb3VyY2Vvd25lcjpuYW1lIjoiY3liZXJjb25jZXB0cyIsInVybjp6aXRhZGVsOmlhbTp1c2VyOnJlc291cmNlb3duZXI6cHJpbWFyeV9kb21haW4iOiJjeWJlcmNvbmNlcHRzLmExLmN5Ny5kZSJ9.hx8JWXWVPq-WfBx9AMgFRWaEX3CioU8BCZwT3d8YETMejsR33LDsbPfnzO0Dbw885tiyNIsdNRj7lfk5HtuZVWdHx58hnaMw-czGhIfB0GXvuh2UFemBoLNJLajtRvCBi-YIZ_c6TD0Ryd2wn_yQlgY1J2R8xTsBWeKM_SYqlKU20r4r3nsX4krg6Q9x-Kc3k0Rg3lMXoxFYgQSzXrKjxTI3HHb03H4lzgmrdcbQqfbLoHKdvGxzfo-Gf8Rlt9rQ-fpGu7fRfPJQV14SPJ6CQtKP0adq_D2blcp1I1xKXML8TytJa05RjL_l_KH-segFfORtGMc4mYo8JUpMW0KOvg'}
token_id_claims = {'amr': ['pwd'], 'at_hash': 'Pl-d20oDoCDvjMKF2axKsw', 'aud': ['311613119816392525', '314469209079930701', '311472451768868685'], 'auth_time': 1745910159, 'azp': '311613119816392525', 'client_id': '311613119816392525', 'email': 'helmut.merz@posteo.de', 'email_verified': True, 'exp': 1745953509, 'family_name': 'User', 'gender': 'male', 'given_name': 'Test', 'iat': 1745910309, 'iss': 'https://a1.cy7.de', 'locale': 'de', 'name': 'Test User', 'nonce': '1RFzoQ492k3VLQ3s', 'preferred_username': 'tst9', 'sid': 'V1_317784226621611853', 'sub': '311846796739534669', 'updated_at': 1745305193, 'urn:zitadel:iam:org:project:311472451768868685:roles': {'cc-user': {'311473502274248525': 'cyberconcepts.a1.cy7.de'}}, 'urn:zitadel:iam:org:project:roles': {'cc-user': {'311473502274248525': 'cyberconcepts.a1.cy7.de'}}, 'urn:zitadel:iam:user:resourceowner:id': '311473502274248525', 'urn:zitadel:iam:user:resourceowner:name': 'cyberconcepts', 'urn:zitadel:iam:user:resourceowner:primary_domain': 'cyberconcepts.a1.cy7.de'}

View file

@ -1,9 +1,12 @@
#! /usr/bin/python
# scopes.tests.test_postgres
"""Tests for the 'scopes.storage' package - using PostgreSQL."""
import os, sys
sys.path = [os.path.dirname(__file__)] + sys.path
import unittest
import tlib_storage
from scopes.tests import tlib_storage
from scopes.storage.db.postgres import StorageFactory
import config
@ -29,10 +32,13 @@ class Test(unittest.TestCase):
def test_004_topic(self):
tlib_storage.test_topic(self, config)
def suite():
def test_005_message(self):
tlib_storage.test_message(self, config)
def test_suite():
return unittest.TestSuite((
unittest.TestLoader().loadTestsFromTestCase(Test),
))
if __name__ == '__main__':
unittest.main(defaultTest='suite')
unittest.main(defaultTest='test_suite')

View file

@ -1,9 +1,12 @@
#! /usr/bin/python
# scopes.tests.test_standard
"""Tests for the 'scopes.storage' package."""
import os, sys
sys.path = [os.path.dirname(__file__)] + sys.path
import unittest
import tlib_server, tlib_storage
from scopes.tests import tlib_web, tlib_storage
from scopes.storage.common import StorageFactory
import config
@ -27,8 +30,9 @@ class Test(unittest.TestCase):
def test_004_topic(self):
tlib_storage.test_topic(self, config)
def test_013_server(self):
tlib_server.test_app(self, config)
def test_013_web(self):
tlib_web.test_app(self, config)
tlib_web.test_auth(self, config)
def suite():

View file

@ -3,7 +3,7 @@
"""Test implementation for the `scopes.storage` package."""
from datetime import datetime
from scopes.storage import concept, folder, topic, tracking
from scopes.storage import concept, folder, message, topic, tracking
def test_tracking(self, config):
@ -82,7 +82,7 @@ def test_type(self, config):
concept.setupCoreTypes(storage)
types = storage.getContainer(concept.Type)
tps = list(types.query())
self.assertEqual(len(tps), 6)
self.assertEqual(len(tps), 7)
tfolder = types.queryLast(name='folder')
fldrs = list(tfolder.values())
@ -94,6 +94,8 @@ def test_type(self, config):
def test_topic(self, config):
storage = config.storageFactory(config.dbschema)
storage.dropTable('rels')
rels = storage.getContainer(concept.Triple)
storage.dropTable('topics')
topics = storage.getContainer(topic.Topic)
types = storage.getContainer(concept.Type)
@ -112,10 +114,20 @@ def test_topic(self, config):
title='Programming Languages',
description='Programming Languages'))
topics.save(tp_proglang)
#storage.commit() # avoid "database locked" error with sqlite
tp_itc.addChild(tp_proglang)
c = list(tp_itc.children())
self.assertEqual(c[0].name, 'prog_lang')
storage.commit()
def test_message(self, config):
storage = config.storageFactory(config.dbschema)
storage.dropTable('messages')
tracks = storage.create(message.Messages)
storage.commit()

42
scopes/tests/tlib_web.py Normal file
View file

@ -0,0 +1,42 @@
# tests/tlib_web.py
"""Test implementation for the `scopes.web` package."""
import json
import logging
from zope.publisher.browser import TestRequest
from zope.publisher.publish import publish
from scopes.web.app import Publication
from scopes.storage.folder import Root
logger = logging.getLogger('tlib_web')
def publishRequest(config, storage, path):
appRoot = Root(storage)
request = TestRequest(environ=dict(PATH_INFO=path))
request.setPublication(Publication(appRoot))
request = publish(request, False)
return request.response
def test_app(self, config):
storage = config.storageFactory(config.dbschema)
response = publishRequest(config, storage, '/top')
logger.info('test_app: response %s %s', response.getStatus(), response.getHeaders())
result = json.loads(response.consumeBody())
self.assertEqual(result['items'][0]['head']['name'], 'level2-item1')
def test_auth(self, config):
from scopes.web.auth import oidc
oidc.startup() # todo: use generic app.startServices()
self.assertEqual(len(config.oidc_params['op_uris']), 8)
storage = config.storageFactory(config.dbschema)
response = publishRequest(config, storage, '/top/auth/login')
headers = dict(response.getHeaders())
logger.info('test_auth: response %s %s', response.getStatus(), headers)
self.assertEqual(response.getStatus(), 302)
uri = config.oidc_params['op_uris']['jwks_uri']
keys = oidc.loadOidcKeys(uri)
logger.info('test_auth keys: %s', keys)

26
scopes/util.py Normal file
View file

@ -0,0 +1,26 @@
# scopes.util
import base64
import hashlib
from secrets import choice
import string
# random strings, hashes, encodings
# for authentication, encryption, and other purposes
BASE = string.ascii_letters + string.digits
BASE2 = BASE + '-._~'
def rndstr(size=16):
return ''.join([choice(BASE) for _ in range(size)])
def rndstr2(size=64):
return ''.join([choice(BASE2) for _ in range(size)])
def b64e(b):
return base64.urlsafe_b64encode(b).rstrip(b'=')
def hashS256(s):
h = hashlib.sha256(s.encode('ascii')).digest()
return b64e(h).decode('ascii')

1
scopes/web/__init__.py Normal file
View file

@ -0,0 +1 @@
"""package scopes.web"""

View file

@ -1,22 +1,31 @@
# scopes.server.app
# scopes.web.app
import logging
from zope.i18n.interfaces import IUserPreferredCharsets
from zope.interface import implementer
from zope.publisher.base import DefaultPublication
from zope.publisher.browser import BrowserRequest
from zope.publisher.interfaces import NotFound
from zope.publisher.publish import publish
from scopes.interfaces import ITraversable, IView
from scopes.server.browser import getView
from scopes.web.browser import getView
import scopes.storage.concept # register container classes
from scopes.storage.folder import Root
@implementer(IUserPreferredCharsets)
class Request(BrowserRequest):
def getPreferredCharsets(self):
return ['UTF-8']
def zope_app_factory(config):
storageFactory = config.StorageFactory(config)
def zope_app(environ, start_response):
storage = storageFactory(config.dbschema)
appRoot = Root(storage)
request = BrowserRequest(environ['wsgi.input'], environ)
request = Request(environ['wsgi.input'], environ)
request.setPublication(Publication(appRoot))
request = publish(request, True)
response = request.response
@ -27,6 +36,12 @@ def zope_app_factory(config):
class Publication(DefaultPublication):
def beforeTraversal(self, request):
super(Publication, self).beforeTraversal(request)
from scopes.web.auth.oidc import authentication
prc = authentication.authenticate(request)
request.setPrincipal(prc)
def traverseName(self, request, ob, name):
next = getView(request, ob, name)
if next is not None:

View file

@ -0,0 +1 @@
"""package scopes.web.auth"""

239
scopes/web/auth/oidc.py Normal file
View file

@ -0,0 +1,239 @@
# scopes.web.auth.uidc
from cryptography.fernet import Fernet
from email.utils import formatdate
import json
import jwt
import logging
import requests
from time import time
from urllib.parse import urlencode
from zope.authentication.interfaces import IAuthentication, IPrincipal
from zope.interface import implementer
from zope.publisher.interfaces import Unauthorized
from zope.security.interfaces import IGroupAwarePrincipal
from scopes.web.browser import DefaultView, register
from scopes.storage.folder import DummyFolder, Root
from scopes import util
import config
logger = logging.getLogger('web.auth.oidc')
@implementer(IAuthentication)
class OidcAuthentication:
def __init__(self, baseAuth):
self.baseAuth = baseAuth
def authenticate(self, request):
auth = Authenticator(request)
prc = auth.authenticate()
if prc is None and self.baseAuth is not None:
prc = self.baseAuth.authenticate(request)
return prc
def getPrincipal(self, id):
if self.baseAuth is not None:
return self.baseAuth.getPrincipal(id)
def unauthenticatedPrincipal(self):
if self.baseAuth is not None:
return self.baseAuth.unauthenticatedPrincipal()
def unauthorized(self, id, request):
if self.baseAuth is not None:
return self.baseAuth.unauthorized(id, request)
Authenticator(request).login()
def logout(self, request):
Authenticator(request).logout()
authentication = OidcAuthentication(None)
@implementer(IGroupAwarePrincipal)
class Principal:
group_prefix = 'gloops.'
def __init__(self, id, data):
self.id = id
self.data = data
@property
def title(self):
return self.data['name']
@property
def groups(self):
groups = [self.group_prefix + g for g in self.data.get('groups', [])]
return groups
def asDict(self):
data = self.data.copy()
data['id'] = self.id
return data
class Authenticator(DummyFolder):
prefix = 'auth.oidc'
def __init__(self, request):
self.request = request
self.params = config.oidc_params
self.reqUrl = config.base_url
self.setCrypt(self.params.get('cookie_crypt'))
def setReqUrl(self, base, path):
self.reqUrl = '/'.join((base, path))
def setCrypt(self, key):
self.cookieCrypt = key and Fernet(key) or None
def authenticate(self):
''' return principal or None'''
data = self.loadSession()
logger.debug('authenticate: %s', data)
if data and 'userid' in data:
id = self.params.get('principal_prefix', '') + data.pop('userid')
return Principal(id, data)
return None
def login(self):
state = util.rndstr()
nonce = util.rndstr()
codeVerifier = util.rndstr2()
codeChallenge = util.hashS256(codeVerifier)
args = dict(
client_id=self.params['client_id'],
response_type='code', # 'code id_token token',
state=state, nonce=nonce,
code_challenge=codeChallenge, code_challenge_method='S256',
scope='openid profile email urn:zitadel:iam:user:resourceowner',
redirect_uri=self.params['callback_url'],
request_uri=self.reqUrl,
)
self.storeSession(dict(state=state, nonce=nonce, code_verifier=codeVerifier))
authUrl = self.params['op_uris']['authorization_endpoint']
loginUrl = '?'.join((authUrl, urlencode(args)))
logger.debug('login: URL %s', loginUrl)
self.request.response.redirect(loginUrl, trusted=True)
def callback(self):
req = self.request
logger.debug('callback: %s %s', self, req.form)
sdata = self.loadSession()
code = req.form['code']
# !check state: req.form['state'] == sdata['state']
args = dict(
grant_type='authorization_code',
code=code,
redirect_uri=self.params['callback_url'],
client_id=self.params['client_id'],
code_verifier=sdata['code_verifier']
)
# !set header: 'Content-Type: application/x-www-form-urlencoded'
tokenUrl = self.params['op_uris']['token_endpoint']
tokenResponse = requests.post(tokenUrl, data=args)
tdata = tokenResponse.json()
userData = self.getIdTokenData(tdata['id_token'])
groupInfo = userData.get('urn:zitadel:iam:org:project:roles', {})
ndata = dict(
userid=userData['preferred_username'],
name=userData['name'],
email=userData['email'],
groups=list(groupInfo.keys()),
access_token=tdata['access_token'],
session_id=userData['sid'],
)
self.storeSession(ndata)
logger.debug('callback: session data: %s', ndata)
req.response.redirect(self.reqUrl, trusted=True)
def logout(self):
cname = self.params['cookie_name']
logger.debug('logout, cookie: %s', cname)
self.request.response.expireCookie(cname, path='/')
self.request.response.redirect(config.base_url, trusted=True)
def storeSession(self, data):
lifetime = int(self.params['cookie_lifetime'])
options = dict(
path='/',
expires=formatdate(time() + lifetime, localtime=False, usegmt=True),
httponly=True,
)
options['max-age'] = lifetime
domain = self.params['cookie_domain']
if domain:
options['domain'] = domain
name = self.params['cookie_name']
value = json.dumps(data)
if self.cookieCrypt:
value = self.cookieCrypt.encrypt(value.encode('UTF-8')).decode('ASCII')
self.request.response.setCookie(name, value, **options)
def loadSession(self):
cookie = self.request.getCookies().get(self.params['cookie_name'])
if cookie is None:
return {}
if self.cookieCrypt:
cookie = self.cookieCrypt.decrypt(cookie)
# !error check: return None - or raise error?
data = json.loads(cookie)
return data
def getIdTokenData(self, token):
uri = self.params['op_uris']['jwks_uri']
keys = loadOidcKeys(uri)
header = jwt.get_unverified_header(token)
key = jwt.PyJWK(keys[header['kid']])
return jwt.decode(token, key, audience=self.params['client_id'])
@register('auth')
def authView(context, request):
return Authenticator(request)
@register('login', Authenticator)
def login(context, request):
context.login()
return DefaultView(context, request)
@register('callback', Authenticator)
def callback(context, request):
context.callback()
return DefaultView(context, request)
@register('logout', Authenticator)
def logout(context, request):
context.logout()
return DefaultView(context, request)
def startup():
loadOidcProviderData()
#app.Publication.registerBeforeTraversal(
# lambda req: req.setPrincipal(authentication.authenticate(req))
oidcProviderUris = ['authorization_endpoint', 'token_endpoint',
'introspection_endpoint', 'userinfo_endpoint',
'revocation_endpoint', 'end_session_endpoint',
'device_authorization_endpoint', 'jwks_uri']
def loadOidcProviderData(force=False):
params = config.oidc_params
if force or params.get('op_uris') is None:
uris = params['op_uris'] = {}
opData = requests.get(params['op_config_url']).json()
for key in oidcProviderUris:
uris[key] = opData[key]
#if force or params.get('op_keys') is None:
#params['op_keys'] = requests.get(uris['jwks_uri']).json()['keys']
def loadOidcKeys(uri):
return dict((item['kid'], item) for item in requests.get(uri).json()['keys'])

View file

@ -1,21 +1,25 @@
# scopes.server.browser
# scopes.web.browser
import json
import logging
from zope.interface import implementer
from scopes.interfaces import IContainer, IReference, IView
logger = logging.getLogger('web.browser')
views = {} # registry for all views: {name: {prefix: viewClass, ...}, ...}
def register(name, *contextClasses):
"""Use as decorator: `@register(name, class, ...).
class `None` means default view for all classes."""
def register(name, *contextTypes):
"""Use as decorator: `@register(name, class_or_prefix, ...).
No class (or `None` or `''`) means default view for all classes."""
def doRegister(factory):
implementer(IView)(factory)
nameEntry = views.setdefault(name, {})
for cl in contextClasses:
nameEntry[cl.prefix] = factory
else:
nameEntry[''] = factory
cts = contextTypes or ['']
for ct in cts:
if not isinstance(ct, str):
ct = ct.prefix
nameEntry[ct] = factory
return factory
return doRegister
@ -28,6 +32,7 @@ def getView(request, ob, name):
factory = nameEntry.get('')
if factory is None:
return None
logger.debug('getView: %s %s', ob, request['PATH_INFO'])
return factory(ob, request)
@ -54,10 +59,11 @@ class DefaultView:
result['target'] = target.asDict()
if IContainer.providedBy(target):
result['target']['items'] = [v.asDict() for v in target.values()]
prc = self.request.principal
if prc is not None:
result['principal'] = prc.asDict()
return result
def render(self, result):
self.request.response.setHeader('Content-type', 'application/json; charset=utf-8')
return json.dumps(result).encode('UTF-8')

View file

@ -1,17 +0,0 @@
# py-scopes/tests/config.py
#from scopes.server.app import demo_app, zope_app
# server / app settings
server_port = '8999'
#app = zope_app
# storage settings
# SQLite
dbengine = 'sqlite'
dbname = 'var/test.db'
dbuser = None
dbpassword = None
dbschema = None

View file

@ -1,26 +0,0 @@
# tests/tlib_server.py
"""Test implementation for the `scopes.server` package."""
import json
from zope.publisher.browser import TestRequest
from zope.publisher.publish import publish
from scopes.server.app import Publication
from scopes.storage.folder import Root
def publishRequest(config, storage, path):
appRoot = Root(storage)
request = TestRequest(environ=dict(PATH_INFO=path))
request.setPublication(Publication(appRoot))
request = publish(request, False)
return request.response
def test_app(self, config):
storage = config.storageFactory(config.dbschema)
response = publishRequest(config, storage, '/top')
result = json.loads(response.consumeBody())
self.assertEqual(result['items'][0]['head']['name'], 'level2-item1')